diff options
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/ansible/logconfig.py | 3 | ||||
-rw-r--r-- | zuul/configloader.py | 47 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritconnection.py | 9 | ||||
-rw-r--r-- | zuul/driver/sql/alembic/env.py | 3 | ||||
-rw-r--r-- | zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py | 17 | ||||
-rw-r--r-- | zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py | 17 | ||||
-rw-r--r-- | zuul/driver/sql/sqlconnection.py | 40 | ||||
-rw-r--r-- | zuul/executor/common.py | 1 | ||||
-rw-r--r-- | zuul/executor/server.py | 25 | ||||
-rw-r--r-- | zuul/lib/fingergw.py | 30 | ||||
-rw-r--r-- | zuul/lib/repl.py | 6 | ||||
-rw-r--r-- | zuul/lib/streamer_utils.py | 2 | ||||
-rw-r--r-- | zuul/manager/__init__.py | 69 | ||||
-rw-r--r-- | zuul/model.py | 162 | ||||
-rw-r--r-- | zuul/scheduler.py | 14 | ||||
-rwxr-xr-x | zuul/web/__init__.py | 2 | ||||
-rw-r--r-- | zuul/zk/zkobject.py | 11 |
17 files changed, 315 insertions, 143 deletions
diff --git a/zuul/ansible/logconfig.py b/zuul/ansible/logconfig.py index 66881336a..2d7c37463 100644 --- a/zuul/ansible/logconfig.py +++ b/zuul/ansible/logconfig.py @@ -140,7 +140,8 @@ def _read_config_file(filename: str): raise ValueError("Unable to read logging config file at %s" % filename) if os.path.splitext(filename)[1] in ('.yml', '.yaml', '.json'): - return yaml.safe_load(open(filename, 'r')) + with open(filename, 'r') as f: + return yaml.safe_load(f) return filename diff --git a/zuul/configloader.py b/zuul/configloader.py index 7f8346382..f01be597b 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -437,6 +437,30 @@ def ansible_vars_dict(value): ansible_var_name(key) +def copy_safe_config(conf): + """Return a deep copy of a config dictionary. + + This lets us assign values of a config dictionary to configuration + objects, even if those values are nested dictionaries. This way + we can safely freeze the configuration object (the process of + which mutates dictionaries) without mutating the original + configuration. + + Meanwhile, this does retain the original context information as a + single object (some behaviors rely on mutating the source context + (e.g., pragma)). + + """ + ret = copy.deepcopy(conf) + for key in ( + '_source_context', + '_start_mark', + ): + if key in conf: + ret[key] = conf[key] + return ret + + class PragmaParser(object): pragma = { 'implied-branch-matchers': bool, @@ -452,6 +476,7 @@ class PragmaParser(object): self.pcontext = pcontext def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) bm = conf.get('implied-branch-matchers') @@ -512,6 +537,7 @@ class NodeSetParser(object): return vs.Schema(nodeset) def fromYaml(self, conf, anonymous=False): + conf = copy_safe_config(conf) if anonymous: self.anon_schema(conf) self.anonymous = True @@ -599,6 +625,7 @@ class SecretParser(object): return vs.Schema(secret) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) s = model.Secret(conf['name'], conf['_source_context']) s.source_context = conf['_source_context'] @@ -723,6 +750,7 @@ class JobParser(object): def fromYaml(self, conf, project_pipeline=False, name=None, validate=True): + conf = copy_safe_config(conf) if validate: self.schema(conf) @@ -1075,6 +1103,7 @@ class ProjectTemplateParser(object): return vs.Schema(project) def fromYaml(self, conf, validate=True, freeze=True): + conf = copy_safe_config(conf) if validate: self.schema(conf) source_context = conf['_source_context'] @@ -1165,6 +1194,7 @@ class ProjectParser(object): return vs.Schema(project) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) project_name = conf.get('name') @@ -1328,6 +1358,7 @@ class PipelineParser(object): return vs.Schema(pipeline) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) pipeline = model.Pipeline(conf['name'], self.pcontext.tenant) pipeline.source_context = conf['_source_context'] @@ -1469,6 +1500,7 @@ class SemaphoreParser(object): return vs.Schema(semaphore) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) semaphore = model.Semaphore(conf['name'], conf.get('max', 1)) semaphore.source_context = conf.get('_source_context') @@ -1494,6 +1526,7 @@ class QueueParser: return vs.Schema(queue) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) queue = model.Queue( conf['name'], @@ -1523,6 +1556,7 @@ class AuthorizationRuleParser(object): return vs.Schema(authRule) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) a = model.AuthZRuleTree(conf['name']) @@ -1556,6 +1590,7 @@ class GlobalSemaphoreParser(object): return vs.Schema(semaphore) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) semaphore = model.Semaphore(conf['name'], conf.get('max', 1), global_scope=True) @@ -1576,6 +1611,7 @@ class ApiRootParser(object): return vs.Schema(api_root) def fromYaml(self, conf): + conf = copy_safe_config(conf) self.schema(conf) api_root = model.ApiRoot(conf.get('authentication-realm')) api_root.access_rules = conf.get('access-rules', []) @@ -1770,8 +1806,10 @@ class TenantParser(object): for branch_future in as_completed(branch_futures.keys()): tpc = branch_futures[branch_future] - source_context = model.ProjectContext( - tpc.project.canonical_name, tpc.project.name) + trusted, _ = tenant.getProject(tpc.project.canonical_name) + source_context = model.SourceContext( + tpc.project.canonical_name, tpc.project.name, + tpc.project.connection_name, None, None, trusted) with project_configuration_exceptions(source_context, loading_errors): self._getProjectBranches(tenant, tpc, branch_cache_min_ltimes) @@ -2596,8 +2634,9 @@ class TenantParser(object): project_metadata.merge_mode = model.MERGER_MAP[mode] tpc = tenant.project_configs[project.canonical_name] if tpc.merge_modes is not None: - source_context = model.ProjectContext( - project.canonical_name, project.name) + source_context = model.SourceContext( + project.canonical_name, project.name, + project.connection_name, None, None, trusted) with project_configuration_exceptions(source_context, layout.loading_errors): if project_metadata.merge_mode not in tpc.merge_modes: diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 0a1f0ee61..276365e1d 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -1643,7 +1643,10 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): def getInfoRefs(self, project: Project) -> Dict[str, str]: try: - data = self._uploadPack(project) + # Encode the UTF-8 data back to a byte array, as the size of + # each record in the pack is in bytes, and so the slicing must + # also be done on a byte-basis. + data = self._uploadPack(project).encode("utf-8") except Exception: self.log.error("Cannot get references from %s" % project) raise # keeps error information @@ -1662,7 +1665,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): plen -= 4 if len(data) - i < plen: raise Exception("Invalid data in info/refs") - line = data[i:i + plen] + # Once the pack data is sliced, we can safely decode it back + # into a (UTF-8) string. + line = data[i:i + plen].decode("utf-8") i += plen if not read_advertisement: read_advertisement = True diff --git a/zuul/driver/sql/alembic/env.py b/zuul/driver/sql/alembic/env.py index da7b3207f..17b67805e 100644 --- a/zuul/driver/sql/alembic/env.py +++ b/zuul/driver/sql/alembic/env.py @@ -53,7 +53,8 @@ def run_migrations_online(): connectable = engine_from_config( config.get_section(config.config_ini_section), prefix='sqlalchemy.', - poolclass=pool.NullPool) + poolclass=pool.NullPool, + future=True) # we can get the table prefix via the tag object tag = context.get_tag_argument() diff --git a/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py index 67581a6f9..1735d35f3 100644 --- a/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py +++ b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py @@ -24,13 +24,16 @@ def upgrade(table_prefix=''): connection = op.get_bind() connection.execute( - """ - UPDATE {buildset_table} - SET result=( - SELECT CASE score - WHEN 1 THEN 'SUCCESS' - ELSE 'FAILURE' END) - """.format(buildset_table=table_prefix + BUILDSET_TABLE)) + sa.text( + """ + UPDATE {buildset_table} + SET result=( + SELECT CASE score + WHEN 1 THEN 'SUCCESS' + ELSE 'FAILURE' END) + """.format(buildset_table=table_prefix + BUILDSET_TABLE) + ) + ) op.drop_column(table_prefix + BUILDSET_TABLE, 'score') diff --git a/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py b/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py index abfba7247..99d12d750 100644 --- a/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py +++ b/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py @@ -34,13 +34,16 @@ def upgrade(table_prefix=''): connection = op.get_bind() connection.execute( - """ - UPDATE {buildset_table} - SET updated=greatest( - coalesce(first_build_start_time, '1970-01-01 00:00:00'), - coalesce(last_build_end_time, '1970-01-01 00:00:00'), - coalesce(event_timestamp, '1970-01-01 00:00:00')) - """.format(buildset_table=table_prefix + "zuul_buildset")) + sa.text( + """ + UPDATE {buildset_table} + SET updated=greatest( + coalesce(first_build_start_time, '1970-01-01 00:00:00'), + coalesce(last_build_end_time, '1970-01-01 00:00:00'), + coalesce(event_timestamp, '1970-01-01 00:00:00')) + """.format(buildset_table=table_prefix + "zuul_buildset") + ) + ) def downgrade(): diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py index b89653bba..2d5c39ec3 100644 --- a/zuul/driver/sql/sqlconnection.py +++ b/zuul/driver/sql/sqlconnection.py @@ -308,27 +308,31 @@ class SQLConnection(BaseConnection): def _migrate(self, revision='head'): """Perform the alembic migrations for this connection""" + # Note that this method needs to be called with an external lock held. + # The reason for this is we retrieve the alembic version and run the + # alembic migrations in different database transactions which opens + # us to races without an external lock. with self.engine.begin() as conn: context = alembic.migration.MigrationContext.configure(conn) current_rev = context.get_current_revision() - self.log.debug('Current migration revision: %s' % current_rev) - - config = alembic.config.Config() - config.set_main_option("script_location", - "zuul:driver/sql/alembic") - config.set_main_option("sqlalchemy.url", - self.connection_config.get('dburi'). - replace('%', '%%')) - - # Alembic lets us add arbitrary data in the tag argument. We can - # leverage that to tell the upgrade scripts about the table prefix. - tag = {'table_prefix': self.table_prefix} - - if current_rev is None and not self.force_migrations: - self.metadata.create_all(self.engine) - alembic.command.stamp(config, revision, tag=tag) - else: - alembic.command.upgrade(config, revision, tag=tag) + self.log.debug('Current migration revision: %s' % current_rev) + + config = alembic.config.Config() + config.set_main_option("script_location", + "zuul:driver/sql/alembic") + config.set_main_option("sqlalchemy.url", + self.connection_config.get('dburi'). + replace('%', '%%')) + + # Alembic lets us add arbitrary data in the tag argument. We can + # leverage that to tell the upgrade scripts about the table prefix. + tag = {'table_prefix': self.table_prefix} + + if current_rev is None and not self.force_migrations: + self.metadata.create_all(self.engine) + alembic.command.stamp(config, revision, tag=tag) + else: + alembic.command.upgrade(config, revision, tag=tag) def onLoad(self, zk_client, component_registry=None): safe_connection = quote_plus(self.connection_name) diff --git a/zuul/executor/common.py b/zuul/executor/common.py index ff4522d22..b8393903e 100644 --- a/zuul/executor/common.py +++ b/zuul/executor/common.py @@ -65,6 +65,7 @@ def construct_build_params(uuid, connections, job, item, pipeline, zuul_params['patchset'] = str(item.change.patchset) if hasattr(item.change, 'message'): zuul_params['message'] = strings.b64encode(item.change.message) + zuul_params['change_message'] = item.change.message if (hasattr(item.change, 'oldrev') and item.change.oldrev and item.change.oldrev != '0' * 40): zuul_params['oldrev'] = item.change.oldrev diff --git a/zuul/executor/server.py b/zuul/executor/server.py index c3737b5cc..a49bbbbbf 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -14,6 +14,7 @@ # under the License. import collections +import copy import datetime import json import logging @@ -1049,7 +1050,7 @@ class AnsibleJob(object): # The same, but frozen self.frozen_hostvars = {} # The zuul.* vars - self.zuul_vars = {} + self.debug_zuul_vars = {} self.waiting_for_semaphores = False def run(self): @@ -1888,7 +1889,8 @@ class AnsibleJob(object): logfile=json_output)) return try: - output = json.load(open(json_output, 'r')) + with open(json_output, 'r') as f: + output = json.load(f) last_playbook = output[-1] # Transform json to yaml - because it's easier to read and given # the size of the data it'll be extra-hard to read this as an @@ -2332,7 +2334,8 @@ class AnsibleJob(object): def prepareKubeConfig(self, jobdir, data): kube_cfg_path = jobdir.kubeconfig if os.path.exists(kube_cfg_path): - kube_cfg = yaml.safe_load(open(kube_cfg_path)) + with open(kube_cfg_path) as f: + kube_cfg = yaml.safe_load(f) else: kube_cfg = { 'apiVersion': 'v1', @@ -2495,10 +2498,18 @@ class AnsibleJob(object): if ri.role_path is not None], )) + # The zuul vars in the debug inventory.yaml file should not + # have any !unsafe tags, so save those before we update the + # execution version of those. + self.debug_zuul_vars = copy.deepcopy(zuul_vars) + if 'change_message' in zuul_vars: + zuul_vars['change_message'] = yaml.mark_strings_unsafe( + zuul_vars['change_message']) + with open(self.jobdir.zuul_vars, 'w') as zuul_vars_yaml: zuul_vars_yaml.write( - yaml.safe_dump({'zuul': zuul_vars}, default_flow_style=False)) - self.zuul_vars = zuul_vars + yaml.ansible_unsafe_dump({'zuul': zuul_vars}, + default_flow_style=False)) # Squash all and extra vars into localhost (it's not # explicitly listed). @@ -2552,7 +2563,7 @@ class AnsibleJob(object): inventory = make_inventory_dict( self.host_list, self.nodeset, self.original_hostvars) - inventory['all']['vars']['zuul'] = self.zuul_vars + inventory['all']['vars']['zuul'] = self.debug_zuul_vars with open(self.jobdir.inventory, 'w') as inventory_yaml: inventory_yaml.write( yaml.ansible_unsafe_dump( @@ -3481,6 +3492,8 @@ class ExecutorServer(BaseMergeServer): self.statsd.gauge(base_key + '.load_average', 0) self.statsd.gauge(base_key + '.pct_used_ram', 0) self.statsd.gauge(base_key + '.running_builds', 0) + self.statsd.close() + self.statsd = None # Use the BaseMergeServer's stop method to disconnect from # ZooKeeper. We do this as one of the last steps to ensure diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py index ad945c1b7..184c9762d 100644 --- a/zuul/lib/fingergw.py +++ b/zuul/lib/fingergw.py @@ -47,6 +47,18 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler): self.fingergw = kwargs.pop('fingergw') super(RequestHandler, self).__init__(*args, **kwargs) + def _readSocket(self, sock, build_uuid): + # timeout only on the connection, let recv() wait forever + sock.settimeout(None) + msg = "%s\n" % build_uuid # Must have a trailing newline! + sock.sendall(msg.encode('utf-8')) + while True: + data = sock.recv(1024) + if data: + self.request.sendall(data) + else: + break + def _fingerClient(self, server, port, build_uuid, use_ssl): ''' Open a finger connection and return all streaming results. @@ -59,24 +71,16 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler): ''' with socket.create_connection((server, port), timeout=10) as s: if use_ssl: - context = ssl.SSLContext(ssl.PROTOCOL_TLS) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.verify_mode = ssl.CERT_REQUIRED context.check_hostname = self.fingergw.tls_verify_hostnames context.load_cert_chain(self.fingergw.tls_cert, self.fingergw.tls_key) context.load_verify_locations(self.fingergw.tls_ca) - s = context.wrap_socket(s, server_hostname=server) - - # timeout only on the connection, let recv() wait forever - s.settimeout(None) - msg = "%s\n" % build_uuid # Must have a trailing newline! - s.sendall(msg.encode('utf-8')) - while True: - data = s.recv(1024) - if data: - self.request.sendall(data) - else: - break + with context.wrap_socket(s, server_hostname=server) as s: + self._readSocket(s, build_uuid) + else: + self._readSocket(s, build_uuid) def handle(self): ''' diff --git a/zuul/lib/repl.py b/zuul/lib/repl.py index ecefae9ea..63a800406 100644 --- a/zuul/lib/repl.py +++ b/zuul/lib/repl.py @@ -26,14 +26,14 @@ class ThreadLocalProxy(object): self.default = default def __getattr__(self, name): - obj = self.files.get(threading.currentThread(), self.default) + obj = self.files.get(threading.current_thread(), self.default) return getattr(obj, name) def register(self, obj): - self.files[threading.currentThread()] = obj + self.files[threading.current_thread()] = obj def unregister(self): - self.files.pop(threading.currentThread()) + self.files.pop(threading.current_thread()) class REPLHandler(socketserver.StreamRequestHandler): diff --git a/zuul/lib/streamer_utils.py b/zuul/lib/streamer_utils.py index 04de4b8cb..a50fb4142 100644 --- a/zuul/lib/streamer_utils.py +++ b/zuul/lib/streamer_utils.py @@ -168,7 +168,7 @@ class CustomThreadingTCPServer(socketserver.ThreadingTCPServer): if all([self.server_ssl_key, self.server_ssl_cert, self.server_ssl_ca]): - context = ssl.SSLContext(ssl.PROTOCOL_TLS) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.load_cert_chain(self.server_ssl_cert, self.server_ssl_key) context.load_verify_locations(self.server_ssl_ca) context.verify_mode = ssl.CERT_REQUIRED diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 60eb479e0..36361df11 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -28,6 +28,8 @@ from zuul.model import ( ) from zuul.zk.change_cache import ChangeKey from zuul.zk.components import COMPONENT_REGISTRY +from zuul.zk.exceptions import LockException +from zuul.zk.locks import pipeline_lock from opentelemetry import trace @@ -95,21 +97,46 @@ class PipelineManager(metaclass=ABCMeta): def _postConfig(self): layout = self.pipeline.tenant.layout self.buildChangeQueues(layout) - with self.sched.createZKContext(None, self.log) as ctx,\ - self.currentContext(ctx): - # Make sure we have state and change list objects, and - # ensure that they exist in ZK. We don't hold the - # pipeline lock, but if they don't exist, that means they - # are new, so no one else will either, so the write on - # create is okay. If they do exist and we have an old - # object, we'll just reuse it. If it does exist and we - # don't have an old object, we'll get a new empty one. - # Regardless, these will not automatically refresh now, so - # they will be out of date until they are refreshed later. - self.pipeline.state = PipelineState.create( - self.pipeline, layout.uuid, self.pipeline.state) - self.pipeline.change_list = PipelineChangeList.create( - self.pipeline) + # Make sure we have state and change list objects. We + # don't actually ensure they exist in ZK here; these are + # just local objects until they are serialized the first + # time. Since we don't hold the pipeline lock, we can't + # reliably perform any read or write operations; we just + # need to ensure we have in-memory objects to work with + # and they will be initialized or loaded on the next + # refresh. + + # These will be out of date until they are refreshed later. + self.pipeline.state = PipelineState.create( + self.pipeline, self.pipeline.state) + self.pipeline.change_list = PipelineChangeList.create( + self.pipeline) + + # Now, try to acquire a non-blocking pipeline lock and refresh + # them for the side effect of initializing them if necessary. + # In the case of a new pipeline, no one else should have a + # lock anyway, and this helps us avoid emitting a whole bunch + # of errors elsewhere on startup when these objects don't + # exist. If the pipeline already exists and we can't acquire + # the lock, that's fine, we're much less likely to encounter + # read errors elsewhere in that case anyway. + try: + with pipeline_lock( + self.sched.zk_client, self.pipeline.tenant.name, + self.pipeline.name, blocking=False) as lock,\ + self.sched.createZKContext(lock, self.log) as ctx,\ + self.currentContext(ctx): + if not self.pipeline.state.exists(ctx): + # We only do this if the pipeline doesn't exist in + # ZK because in that case, this process should be + # fast since it's empty. If it does exist, + # refreshing it may be slow and since other actors + # won't encounter errors due to its absence, we + # would rather defer the work to later. + self.pipeline.state.refresh(ctx) + self.pipeline.change_list.refresh(ctx) + except LockException: + pass def buildChangeQueues(self, layout): self.log.debug("Building relative_priority queues") @@ -276,19 +303,19 @@ class PipelineManager(metaclass=ABCMeta): if not isinstance(change, model.Change): return - change_in_pipeline = False + to_refresh = set() for item in self.pipeline.getAllItems(): if not isinstance(item.change, model.Change): continue + if item.change.equals(change): + to_refresh.add(item.change) for dep_change_ref in item.change.commit_needs_changes: - if item.change.equals(change): - change_in_pipeline = True dep_change_key = ChangeKey.fromReference(dep_change_ref) if dep_change_key.isSameChange(change.cache_stat.key): - self.updateCommitDependencies(item.change, None, event) + to_refresh.add(item.change) - if change_in_pipeline: - self.updateCommitDependencies(change, None, event) + for existing_change in to_refresh: + self.updateCommitDependencies(existing_change, None, event) def reportEnqueue(self, item): if not self.pipeline.state.disabled: diff --git a/zuul/model.py b/zuul/model.py index 41dc246e5..2674232dc 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -620,6 +620,18 @@ class PipelineState(zkobject.ZKObject): _read_only=False, ) + def _lateInitData(self): + # If we're initializing the object on our initial refresh, + # reset the data to this. + return dict( + state=Pipeline.STATE_NORMAL, + queues=[], + old_queues=[], + consecutive_failures=0, + disabled=False, + layout_uuid=self.pipeline.tenant.layout.uuid, + ) + @classmethod def fromZK(klass, context, path, pipeline, **kw): obj = klass() @@ -631,21 +643,23 @@ class PipelineState(zkobject.ZKObject): return obj @classmethod - def create(cls, pipeline, layout_uuid, old_state=None): - # If the object does not exist in ZK, create it with the - # default attributes and the supplied layout UUID. Otherwise, - # return an initialized object (or the old object for reuse) - # without loading any data so that data can be loaded on the - # next refresh. - ctx = pipeline.manager.current_context + def create(cls, pipeline, old_state=None): + # If we are resetting an existing pipeline, we will have an + # old_state, so just clean up the object references there and + # let the next refresh handle updating any data. + if old_state: + old_state._resetObjectRefs() + return old_state + + # Otherwise, we are initializing a pipeline that we haven't + # seen before. It still might exist in ZK, but since we + # haven't seen it, we don't have any object references to + # clean up. We can just start with a clean object, set the + # pipeline reference, and let the next refresh deal with + # whether there might be any data in ZK. state = cls() state._set(pipeline=pipeline) - if state.exists(ctx): - if old_state: - old_state._resetObjectRefs() - return old_state - return state - return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid) + return state def _resetObjectRefs(self): # Update the pipeline references on the queue objects. @@ -712,8 +726,34 @@ class PipelineState(zkobject.ZKObject): # This is so that we can refresh the object in circumstances # where we haven't verified that our local layout matches # what's in ZK. + + # Notably, this need not prevent us from performing the + # initialization below if necessary. The case of the object + # being brand new in ZK supercedes our worry that our old copy + # might be out of date since our old copy is, itself, brand + # new. self._set(_read_only=read_only) - return super().refresh(context) + try: + return super().refresh(context) + except NoNodeError: + # If the object doesn't exist we will receive a + # NoNodeError. This happens because the postConfig call + # creates this object without holding the pipeline lock, + # so it can't determine whether or not it exists in ZK. + # We do hold the pipeline lock here, so if we get this + # error, we know we're initializing the object, and we + # should write it to ZK. + + # Note that typically this code is not used since + # currently other objects end up creating the pipeline + # path in ZK first. It is included in case that ever + # changes. Currently the empty byte-string code path in + # deserialize() is used instead. + context.log.warning("Initializing pipeline state for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + self._set(**self._lateInitData()) + self.internalCreate(context) def deserialize(self, raw, context): # We may have old change objects in the pipeline cache, so @@ -721,6 +761,20 @@ class PipelineState(zkobject.ZKObject): # source change cache. self.pipeline.manager.clearCache() + # If the object doesn't exist we will get back an empty byte + # string. This happens because the postConfig call creates + # this object without holding the pipeline lock, so it can't + # determine whether or not it exists in ZK. We do hold the + # pipeline lock here, so if we get the empty byte string, we + # know we're initializing the object. In that case, we should + # initialize the layout id to the current layout. Nothing + # else needs to be set. + if raw == b'': + context.log.warning("Initializing pipeline state for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + return self._lateInitData() + data = super().deserialize(raw, context) if not self._read_only: @@ -898,9 +952,31 @@ class PipelineChangeList(zkobject.ShardedZKObject): _change_keys=[], ) - def refresh(self, context): - self._retry(context, super().refresh, - context, max_tries=5) + def refresh(self, context, allow_init=True): + # Set allow_init to false to indicate that we don't hold the + # lock and we should not try to initialize the object in ZK if + # it does not exist. + try: + self._retry(context, super().refresh, + context, max_tries=5) + except NoNodeError: + # If the object doesn't exist we will receive a + # NoNodeError. This happens because the postConfig call + # creates this object without holding the pipeline lock, + # so it can't determine whether or not it exists in ZK. + # We do hold the pipeline lock here, so if we get this + # error, we know we're initializing the object, and + # we should write it to ZK. + if allow_init: + context.log.warning( + "Initializing pipeline change list for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + self.internalCreate(context) + else: + # If we're called from a context where we can't + # initialize the change list, re-raise the exception. + raise def getPath(self): return self.getChangeListPath(self.pipeline) @@ -911,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject): return pipeline_path + '/change_list' @classmethod - def create(cls, pipeline, old_list=None): - # If the object does not exist in ZK, create it with the - # default attributes. Otherwise, return an initialized object - # (or the old object for reuse) without loading any data so - # that data can be loaded on the next refresh. - ctx = pipeline.manager.current_context + def create(cls, pipeline): + # This object may or may not exist in ZK, but we using any of + # that data here. We can just start with a clean object, set + # the pipeline reference, and let the next refresh deal with + # whether there might be any data in ZK. change_list = cls() change_list._set(pipeline=pipeline) - if change_list.exists(ctx): - if old_list: - return old_list - return change_list - return cls.new(ctx, pipeline=pipeline) + return change_list def serialize(self, context): data = { @@ -931,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject): } return json.dumps(data, sort_keys=True).encode("utf8") - def deserialize(self, data, context): - data = super().deserialize(data, context) + def deserialize(self, raw, context): + data = super().deserialize(raw, context) change_keys = [] # We must have a dictionary with a 'changes' key; otherwise we # may be reading immediately after truncating. Allow the @@ -1811,24 +1882,6 @@ class FrozenSecret(ConfigObject): ) -class ProjectContext(ConfigObject): - - def __init__(self, project_canonical_name, project_name): - super().__init__() - self.project_canonical_name = project_canonical_name - self.project_name = project_name - self.branch = None - self.path = None - - def __str__(self): - return self.project_name - - def toDict(self): - return dict( - project=self.project_name, - ) - - class SourceContext(ConfigObject): """A reference to the branch of a project in configuration. @@ -4396,9 +4449,8 @@ class BuildSet(zkobject.ZKObject): version = build.getZKVersion() # If zstat is None, we created the object if version is not None: - versions = self.build_versions.copy() - versions[build.uuid] = version + 1 - self.updateAttributes(context, build_versions=versions) + self.build_versions[build.uuid] = version + 1 + self.updateAttributes(context, build_versions=self.build_versions) def updateJobVersion(self, context, job): if (COMPONENT_REGISTRY.model_api < 12): @@ -4406,9 +4458,8 @@ class BuildSet(zkobject.ZKObject): version = job.getZKVersion() if version is not None: - versions = self.job_versions.copy() - versions[job.name] = version + 1 - self.updateAttributes(context, job_versions=versions) + self.job_versions[job.name] = version + 1 + self.updateAttributes(context, job_versions=self.job_versions) def shouldRefreshBuild(self, build): # Unless all schedulers are updating versions, we can't trust @@ -5977,8 +6028,7 @@ class Bundle: def deserialize(cls, context, queue, items_by_path, data): bundle = cls(data["uuid"]) bundle.items = [ - items_by_path.get(p) or QueueItem.fromZK( - context, p, pipeline=queue.pipeline, queue=queue) + items_by_path.get(p) or QueueItem.fromZK(context, p, queue=queue) for p in data["items"] ] bundle.started_reporting = data["started_reporting"] diff --git a/zuul/scheduler.py b/zuul/scheduler.py index f22d20a05..63f3d7389 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2358,7 +2358,9 @@ class Scheduler(threading.Thread): for pipeline in tenant.layout.pipelines.values(): self.log.debug("Gather relevant cache items for: %s %s", tenant.name, pipeline.name) - pipeline.change_list.refresh(ctx) + # This will raise an exception and abort the process if + # unable to refresh the change list. + pipeline.change_list.refresh(ctx, allow_init=False) change_keys = pipeline.change_list.getChangeKeys() relevant_changes = pipeline.manager.resolveChangeKeys( change_keys) @@ -2387,8 +2389,16 @@ class Scheduler(threading.Thread): # Update the pipeline changes ctx = self.createZKContext(None, self.log) for pipeline in tenant.layout.pipelines.values(): + # This will raise an exception if it is unable to + # refresh the change list. We will proceed anyway + # and use our data from the last time we did + # refresh in order to avoid stalling trigger + # processing. In this case we may not forward + # some events which are related to changes in the + # pipeline but don't match the pipeline trigger + # criteria. try: - pipeline.change_list.refresh(ctx) + pipeline.change_list.refresh(ctx, allow_init=False) except Exception: self.log.exception( "Unable to refresh pipeline change list for %s", diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index 7f27cd970..47226fd7d 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -395,7 +395,7 @@ class LogStreamer(object): self.finger_socket = socket.create_connection( (server, port), timeout=10) if use_ssl: - context = ssl.SSLContext(ssl.PROTOCOL_TLS) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.verify_mode = ssl.CERT_REQUIRED context.check_hostname = self.zuulweb.finger_tls_verify_hostnames context.load_cert_chain( diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index b228ecaa4..87d76bca6 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -233,7 +233,18 @@ class ZKObject: obj._load(context, path=path) return obj + def internalCreate(self, context): + """Create the object in ZK from an existing ZKObject + + This should only be used in special circumstances: when we + know it's safe to start using a ZKObject before it's actually + created in ZK. Normally use .new() + """ + data = self._trySerialize(context) + self._save(context, data, create=True) + def refresh(self, context): + """Update data from ZK""" self._load(context) |