summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tests/unit/test_zk.py27
-rw-r--r--zuul/model.py18
-rw-r--r--zuul/scheduler.py4
-rw-r--r--zuul/zk/branch_cache.py4
-rw-r--r--zuul/zk/zkobject.py65
5 files changed, 75 insertions, 43 deletions
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py
index 477fdb130..b91263ec4 100644
--- a/tests/unit/test_zk.py
+++ b/tests/unit/test_zk.py
@@ -1496,18 +1496,39 @@ class TestZKObject(ZooKeeperBaseTestCase):
'/zuul/pipeline/fake_tenant')
self.assertEqual(pipeline2.foo, 'bar')
+ def get_ltime(obj):
+ zstat = self.zk_client.client.exists(obj.getPath())
+ return zstat.last_modified_transaction_id
+
# Update an object
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
- pipeline1.updateAttributes(context, foo='baz')
- self.assertEqual(pipeline1.foo, 'baz')
+ ltime1 = get_ltime(pipeline1)
+ pipeline1.updateAttributes(context, foo='qux')
+ self.assertEqual(pipeline1.foo, 'qux')
+ ltime2 = get_ltime(pipeline1)
+ self.assertNotEqual(ltime1, ltime2)
+
+ # This should not produce an unnecessary write
+ pipeline1.updateAttributes(context, foo='qux')
+ ltime3 = get_ltime(pipeline1)
+ self.assertEqual(ltime2, ltime3)
# Update an object using an active context
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
+ ltime1 = get_ltime(pipeline1)
with pipeline1.activeContext(context):
pipeline1.foo = 'baz'
- self.assertEqual(pipeline1.foo, 'baz')
+ self.assertEqual(pipeline1.foo, 'baz')
+ ltime2 = get_ltime(pipeline1)
+ self.assertNotEqual(ltime1, ltime2)
+
+ # This should not produce an unnecessary write
+ with pipeline1.activeContext(context):
+ pipeline1.foo = 'baz'
+ ltime3 = get_ltime(pipeline1)
+ self.assertEqual(ltime2, ltime3)
# Update of object w/o active context should not work
with testtools.ExpectedException(Exception):
diff --git a/zuul/model.py b/zuul/model.py
index a1af8d61b..6dcfb7e1d 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1873,7 +1873,8 @@ class JobData(zkobject.ShardedZKObject):
obj = klass()
kw['hash'] = JobData.getHash(kw['data'])
obj._set(**kw)
- obj._save(context, create=True)
+ data = obj._trySerialize(context)
+ obj._save(context, data, create=True)
return obj
@staticmethod
@@ -1963,7 +1964,8 @@ class FrozenJob(zkobject.ZKObject):
v = None
kw['_' + k] = v
obj._set(**kw)
- obj._save(context, create=True)
+ data = obj._trySerialize(context)
+ obj._save(context, data, create=True)
# If we need to make any JobData entries, do that now.
update_kw = {}
@@ -3832,12 +3834,14 @@ class BuildSet(zkobject.ZKObject):
def removeJobNodeSetInfo(self, job_name):
if job_name not in self.nodeset_info:
raise Exception("No job nodeset for %s" % (job_name))
- del self.nodeset_info[job_name]
+ with self.activeContext(self.item.pipeline.manager.current_context):
+ del self.nodeset_info[job_name]
def setJobNodeRequestID(self, job_name, request_id):
if job_name in self.node_requests:
raise Exception("Prior node request for %s" % (job_name))
- self.node_requests[job_name] = request_id
+ with self.activeContext(self.item.pipeline.manager.current_context):
+ self.node_requests[job_name] = request_id
def getJobNodeRequestID(self, job_name):
return self.node_requests.get(job_name)
@@ -3858,7 +3862,8 @@ class BuildSet(zkobject.ZKObject):
info['zone'] = None
info['provider'] = node.provider
info['nodes'] = [n.id for n in nodeset.getNodes()]
- self.nodeset_info[job_name] = info
+ with self.activeContext(self.item.pipeline.manager.current_context):
+ self.nodeset_info[job_name] = info
def getTries(self, job_name):
return self.tries.get(job_name, 0)
@@ -3946,7 +3951,8 @@ class QueueItem(zkobject.ZKObject):
def new(klass, context, **kw):
obj = klass()
obj._set(**kw)
- obj._save(context, create=True)
+ data = obj._trySerialize(context)
+ obj._save(context, data, create=True)
files_state = (BuildSet.COMPLETE if obj.change.files is not None
else BuildSet.NEW)
obj.updateAttributes(context, current_build_set=BuildSet.new(
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 22a19df68..06b727c52 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -2200,7 +2200,9 @@ class Scheduler(threading.Thread):
build.end_time = event_result["end_time"]
build.setResultData(result_data, secret_result_data)
- build.build_set.warning_messages.extend(warnings)
+ build.build_set.updateAttributes(
+ pipeline.manager.current_context,
+ warning_messages=build.build_set.warning_messages + warnings)
build.held = event_result.get("held")
build.result = result
diff --git a/zuul/zk/branch_cache.py b/zuul/zk/branch_cache.py
index bb861b512..5e05243c3 100644
--- a/zuul/zk/branch_cache.py
+++ b/zuul/zk/branch_cache.py
@@ -66,8 +66,8 @@ class BranchCacheZKObject(ShardedZKObject):
}
return json.dumps(data).encode("utf8")
- def _save(self, context, create=False):
- super()._save(context, create)
+ def _save(self, context, data, create=False):
+ super()._save(context, data, create)
zstat = context.client.exists(self.getPath())
self._set(_zstat=zstat)
diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py
index 20190e9ef..7e09108be 100644
--- a/zuul/zk/zkobject.py
+++ b/zuul/zk/zkobject.py
@@ -98,13 +98,16 @@ class ZKObject:
call as possible for efficient network use.
"""
old = self.__dict__.copy()
+ oldserial = self._trySerialize(context)
self._set(**kw)
- try:
- self._save(context)
- except Exception:
- # Roll back our old values if we aren't able to update ZK.
- self._set(**old)
- raise
+ newserial = self._trySerialize(context)
+ if oldserial != newserial:
+ try:
+ self._save(context, newserial)
+ except Exception:
+ # Roll back our old values if we aren't able to update ZK.
+ self._set(**old)
+ raise
@contextlib.contextmanager
def activeContext(self, context):
@@ -113,14 +116,17 @@ class ZKObject:
f"Another context is already active {self._active_context}")
try:
old = self.__dict__.copy()
+ oldserial = self._trySerialize(context)
self._set(_active_context=context)
yield
- try:
- self._save(context)
- except Exception:
- # Roll back our old values if we aren't able to update ZK.
- self._set(**old)
- raise
+ newserial = self._trySerialize(context)
+ if oldserial != newserial:
+ try:
+ self._save(context, newserial)
+ except Exception:
+ # Roll back our old values if we aren't able to update ZK.
+ self._set(**old)
+ raise
finally:
self._set(_active_context=None)
@@ -129,7 +135,8 @@ class ZKObject:
"""Create a new instance and save it in ZooKeeper"""
obj = klass()
obj._set(**kw)
- obj._save(context, create=True)
+ data = obj._trySerialize(context)
+ obj._save(context, data, create=True)
return obj
@classmethod
@@ -144,6 +151,18 @@ class ZKObject:
"""Update data from ZK"""
self._load(context)
+ def _trySerialize(self, context):
+ if isinstance(context, LocalZKContext):
+ return b''
+ try:
+ return self.serialize()
+ except Exception:
+ # A higher level must handle this exception, but log
+ # ourself here so we know what object triggered it.
+ context.log.error(
+ "Exception serializing ZKObject %s", self)
+ raise
+
def delete(self, context):
path = self.getPath()
while context.sessionIsValid():
@@ -195,17 +214,9 @@ class ZKObject:
raise
raise Exception("ZooKeeper session or lock not valid")
- def _save(self, context, create=False):
+ def _save(self, context, data, create=False):
if isinstance(context, LocalZKContext):
return
- try:
- data = self.serialize()
- except Exception:
- # A higher level must handle this exception, but log
- # ourself here so we know what object triggered it.
- context.log.error(
- "Exception serializing ZKObject %s", self)
- raise
path = self.getPath()
while context.sessionIsValid():
try:
@@ -275,17 +286,9 @@ class ShardedZKObject(ZKObject):
raise InvalidObjectError from exc
raise Exception("ZooKeeper session or lock not valid")
- def _save(self, context, create=False):
+ def _save(self, context, data, create=False):
if isinstance(context, LocalZKContext):
return
- try:
- data = self.serialize()
- except Exception:
- # A higher level must handle this exception, but log
- # ourself here so we know what object triggered it.
- context.log.error(
- "Exception serializing ZKObject %s", self)
- raise
path = self.getPath()
while context.sessionIsValid():
try: