diff options
-rw-r--r-- | tests/unit/test_zk.py | 27 | ||||
-rw-r--r-- | zuul/model.py | 18 | ||||
-rw-r--r-- | zuul/scheduler.py | 4 | ||||
-rw-r--r-- | zuul/zk/branch_cache.py | 4 | ||||
-rw-r--r-- | zuul/zk/zkobject.py | 65 |
5 files changed, 75 insertions, 43 deletions
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index 477fdb130..b91263ec4 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -1496,18 +1496,39 @@ class TestZKObject(ZooKeeperBaseTestCase): '/zuul/pipeline/fake_tenant') self.assertEqual(pipeline2.foo, 'bar') + def get_ltime(obj): + zstat = self.zk_client.client.exists(obj.getPath()) + return zstat.last_modified_transaction_id + # Update an object with tenant_write_lock(self.zk_client, tenant_name) as lock: context = ZKContext(self.zk_client, lock, stop_event, self.log) - pipeline1.updateAttributes(context, foo='baz') - self.assertEqual(pipeline1.foo, 'baz') + ltime1 = get_ltime(pipeline1) + pipeline1.updateAttributes(context, foo='qux') + self.assertEqual(pipeline1.foo, 'qux') + ltime2 = get_ltime(pipeline1) + self.assertNotEqual(ltime1, ltime2) + + # This should not produce an unnecessary write + pipeline1.updateAttributes(context, foo='qux') + ltime3 = get_ltime(pipeline1) + self.assertEqual(ltime2, ltime3) # Update an object using an active context with tenant_write_lock(self.zk_client, tenant_name) as lock: context = ZKContext(self.zk_client, lock, stop_event, self.log) + ltime1 = get_ltime(pipeline1) with pipeline1.activeContext(context): pipeline1.foo = 'baz' - self.assertEqual(pipeline1.foo, 'baz') + self.assertEqual(pipeline1.foo, 'baz') + ltime2 = get_ltime(pipeline1) + self.assertNotEqual(ltime1, ltime2) + + # This should not produce an unnecessary write + with pipeline1.activeContext(context): + pipeline1.foo = 'baz' + ltime3 = get_ltime(pipeline1) + self.assertEqual(ltime2, ltime3) # Update of object w/o active context should not work with testtools.ExpectedException(Exception): diff --git a/zuul/model.py b/zuul/model.py index a1af8d61b..6dcfb7e1d 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -1873,7 +1873,8 @@ class JobData(zkobject.ShardedZKObject): obj = klass() kw['hash'] = JobData.getHash(kw['data']) obj._set(**kw) - obj._save(context, create=True) + data = obj._trySerialize(context) + obj._save(context, data, create=True) return obj @staticmethod @@ -1963,7 +1964,8 @@ class FrozenJob(zkobject.ZKObject): v = None kw['_' + k] = v obj._set(**kw) - obj._save(context, create=True) + data = obj._trySerialize(context) + obj._save(context, data, create=True) # If we need to make any JobData entries, do that now. update_kw = {} @@ -3832,12 +3834,14 @@ class BuildSet(zkobject.ZKObject): def removeJobNodeSetInfo(self, job_name): if job_name not in self.nodeset_info: raise Exception("No job nodeset for %s" % (job_name)) - del self.nodeset_info[job_name] + with self.activeContext(self.item.pipeline.manager.current_context): + del self.nodeset_info[job_name] def setJobNodeRequestID(self, job_name, request_id): if job_name in self.node_requests: raise Exception("Prior node request for %s" % (job_name)) - self.node_requests[job_name] = request_id + with self.activeContext(self.item.pipeline.manager.current_context): + self.node_requests[job_name] = request_id def getJobNodeRequestID(self, job_name): return self.node_requests.get(job_name) @@ -3858,7 +3862,8 @@ class BuildSet(zkobject.ZKObject): info['zone'] = None info['provider'] = node.provider info['nodes'] = [n.id for n in nodeset.getNodes()] - self.nodeset_info[job_name] = info + with self.activeContext(self.item.pipeline.manager.current_context): + self.nodeset_info[job_name] = info def getTries(self, job_name): return self.tries.get(job_name, 0) @@ -3946,7 +3951,8 @@ class QueueItem(zkobject.ZKObject): def new(klass, context, **kw): obj = klass() obj._set(**kw) - obj._save(context, create=True) + data = obj._trySerialize(context) + obj._save(context, data, create=True) files_state = (BuildSet.COMPLETE if obj.change.files is not None else BuildSet.NEW) obj.updateAttributes(context, current_build_set=BuildSet.new( diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 22a19df68..06b727c52 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2200,7 +2200,9 @@ class Scheduler(threading.Thread): build.end_time = event_result["end_time"] build.setResultData(result_data, secret_result_data) - build.build_set.warning_messages.extend(warnings) + build.build_set.updateAttributes( + pipeline.manager.current_context, + warning_messages=build.build_set.warning_messages + warnings) build.held = event_result.get("held") build.result = result diff --git a/zuul/zk/branch_cache.py b/zuul/zk/branch_cache.py index bb861b512..5e05243c3 100644 --- a/zuul/zk/branch_cache.py +++ b/zuul/zk/branch_cache.py @@ -66,8 +66,8 @@ class BranchCacheZKObject(ShardedZKObject): } return json.dumps(data).encode("utf8") - def _save(self, context, create=False): - super()._save(context, create) + def _save(self, context, data, create=False): + super()._save(context, data, create) zstat = context.client.exists(self.getPath()) self._set(_zstat=zstat) diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index 20190e9ef..7e09108be 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -98,13 +98,16 @@ class ZKObject: call as possible for efficient network use. """ old = self.__dict__.copy() + oldserial = self._trySerialize(context) self._set(**kw) - try: - self._save(context) - except Exception: - # Roll back our old values if we aren't able to update ZK. - self._set(**old) - raise + newserial = self._trySerialize(context) + if oldserial != newserial: + try: + self._save(context, newserial) + except Exception: + # Roll back our old values if we aren't able to update ZK. + self._set(**old) + raise @contextlib.contextmanager def activeContext(self, context): @@ -113,14 +116,17 @@ class ZKObject: f"Another context is already active {self._active_context}") try: old = self.__dict__.copy() + oldserial = self._trySerialize(context) self._set(_active_context=context) yield - try: - self._save(context) - except Exception: - # Roll back our old values if we aren't able to update ZK. - self._set(**old) - raise + newserial = self._trySerialize(context) + if oldserial != newserial: + try: + self._save(context, newserial) + except Exception: + # Roll back our old values if we aren't able to update ZK. + self._set(**old) + raise finally: self._set(_active_context=None) @@ -129,7 +135,8 @@ class ZKObject: """Create a new instance and save it in ZooKeeper""" obj = klass() obj._set(**kw) - obj._save(context, create=True) + data = obj._trySerialize(context) + obj._save(context, data, create=True) return obj @classmethod @@ -144,6 +151,18 @@ class ZKObject: """Update data from ZK""" self._load(context) + def _trySerialize(self, context): + if isinstance(context, LocalZKContext): + return b'' + try: + return self.serialize() + except Exception: + # A higher level must handle this exception, but log + # ourself here so we know what object triggered it. + context.log.error( + "Exception serializing ZKObject %s", self) + raise + def delete(self, context): path = self.getPath() while context.sessionIsValid(): @@ -195,17 +214,9 @@ class ZKObject: raise raise Exception("ZooKeeper session or lock not valid") - def _save(self, context, create=False): + def _save(self, context, data, create=False): if isinstance(context, LocalZKContext): return - try: - data = self.serialize() - except Exception: - # A higher level must handle this exception, but log - # ourself here so we know what object triggered it. - context.log.error( - "Exception serializing ZKObject %s", self) - raise path = self.getPath() while context.sessionIsValid(): try: @@ -275,17 +286,9 @@ class ShardedZKObject(ZKObject): raise InvalidObjectError from exc raise Exception("ZooKeeper session or lock not valid") - def _save(self, context, create=False): + def _save(self, context, data, create=False): if isinstance(context, LocalZKContext): return - try: - data = self.serialize() - except Exception: - # A higher level must handle this exception, but log - # ourself here so we know what object triggered it. - context.log.error( - "Exception serializing ZKObject %s", self) - raise path = self.getPath() while context.sessionIsValid(): try: |