diff options
author | kauboy26 <vishnu.kaushik@mongodb.com> | 2022-04-04 18:11:43 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-04 19:07:46 +0000 |
commit | 77c0daa37cf81205dba35690851fd4024d3d8e21 (patch) | |
tree | af3252a6bb9576ae1d3519ba709e3764919f87cc /buildscripts/resmokelib/testing | |
parent | 6abfb1f99e65c451b233bce7e1b941d74669a7b5 (diff) | |
download | mongo-77c0daa37cf81205dba35690851fd4024d3d8e21.tar.gz |
SERVER-65000 Deleted fixtures and hooks related to the replicator.
Diffstat (limited to 'buildscripts/resmokelib/testing')
10 files changed, 6 insertions, 781 deletions
diff --git a/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py b/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py deleted file mode 100644 index fbc77d03944..00000000000 --- a/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py +++ /dev/null @@ -1,190 +0,0 @@ -"""Fixture with two clusters (for cluster to cluster replications) for executing JSTests against.""" - -import copy -import os.path - -import buildscripts.resmokelib.testing.fixtures.interface as interface - - -class ClusterToClusterFixture(interface.MultiClusterFixture): # pylint: disable=too-many-instance-attributes - """Fixture which provides two clusters to perform a cluster to cluster replication.""" - - def __init__( # pylint: disable=too-many-arguments,too-many-locals - self, logger, job_num, fixturelib, cluster0_options, cluster1_options, - replicator_options, dbpath_prefix=None, preserve_dbpath=False): - """Initialize with different options for the clusters.""" - - interface.MultiClusterFixture.__init__(self, logger, job_num, fixturelib, - dbpath_prefix=dbpath_prefix) - - self.clusters = [] - self.both_cluster_options = [] - parsed_options = [ - self.fixturelib.default_if_none(copy.deepcopy(cluster0_options), {}), - self.fixturelib.default_if_none(copy.deepcopy(cluster1_options), {}) - ] - - self.preserve_dbpath = preserve_dbpath - - # The usual command line params can lead to messy behavior in unlike topologies, since they - # may override the cluster-to-cluster behavior specified elsewhere on both clusters in an - # unexpected way. Therefore, forbid them. - if any( - v is not None for v in (self.config.MIXED_BIN_VERSIONS, self.config.NUM_SHARDS, - self.config.NUM_REPLSET_NODES)): - raise ValueError( - "ClusterToClusterFixture options must be specified through 'cluster0_options' and 'cluster1_options'." - ) - - for i, cluster_options in enumerate(parsed_options): - cluster_options["settings"] = self.fixturelib.default_if_none( - cluster_options["settings"], {}) - if "preserve_dbpath" not in cluster_options["settings"]\ - or cluster_options["settings"]["preserve_dbpath"] is None: - cluster_options["settings"]["preserve_dbpath"] = self.preserve_dbpath - - cluster_options["settings"]["dbpath_prefix"] = os.path.join( - self._dbpath_prefix, f"cluster{i}") - - if cluster_options["class"] == "ReplicaSetFixture": - cluster_options["settings"]["replicaset_logging_prefix"] = f"cl{i}" - elif cluster_options["class"] == "ShardedClusterFixture": - cluster_options["settings"]["cluster_logging_prefix"] = f"cl{i}" - else: - raise ValueError(f"Illegal fixture class: {cluster_options['class']}") - - self.logger.info(f"Cluster{i} configured with settings: {cluster_options}") - - self.both_cluster_options = parsed_options - - # The cluster that starts off with the data. - self.source_cluster_index = 0 - self.replicator_options = replicator_options - - for cluster_options in self.both_cluster_options: - if replicator_options["class"] == "MultipleReplicatorFixture": - if cluster_options["class"] != "ShardedClusterFixture": - raise ValueError( - "MultipleReplicatorFixture can only be run with ShardedClusterFixture") - cluster = self.fixturelib.make_fixture(cluster_options["class"], self.logger, - self.job_num, **cluster_options["settings"]) - self.clusters.append(cluster) - - replicator_logger = self.fixturelib.new_fixture_node_logger(replicator_options["class"], - self.job_num, "replicator") - - self.replicator = self.fixturelib.make_fixture(replicator_options["class"], - replicator_logger, self.job_num, - **self.replicator_options["settings"]) - - def setup(self): - """Set up the cluster to cluster fixture according to the options provided.""" - for i, cluster in enumerate(self.clusters): - self.logger.info(f"Setting up cluster {i}.") - cluster.setup() - - source_url = self.clusters[self.source_cluster_index].get_driver_connection_url() - dest_url = self.clusters[1 - self.source_cluster_index].get_driver_connection_url() - self.logger.info("Setting source cluster string: '%s', destination cluster string: '%s'", - source_url, dest_url) - self.replicator.set_cli_options({'cluster0': source_url, 'cluster1': dest_url}) - - # If we are using multiple replicators, we must get the list of shard ids from the source - # cluster and pass the ids as CLI options to each replicator. - if self.replicator_options["class"] == "MultipleReplicatorFixture": - # Wait for the source cluster to be fully running - self.clusters[self.source_cluster_index].await_ready() - try: - shard_ids = self.clusters[self.source_cluster_index].get_shard_ids() - except Exception as err: - msg = f"Error getting shard ids from source cluster: {err}" - self.logger.exception(msg) - raise self.fixturelib.ServerFailure(msg) - self.replicator.set_shard_ids(shard_ids) - - self.replicator.setup() - - def pids(self): - """:return: pids owned by this fixture if any.""" - out = [] - for i, cluster in enumerate(self.clusters): - self.logger.info(f"Gathering cluster {i} pids: {cluster.pids()}") - out.extend(cluster.pids()) - if not out: - self.logger.debug('No clusters when gathering cluster to cluster fixture pids.') - - replicator_pids = self.replicator.pids() - self.logger.info(f"Gathering replicator pids: {replicator_pids}") - out.extend(replicator_pids) - - return out - - def await_ready(self): - """Block until the fixture can be used for testing.""" - if self.replicator_options["class"] == "MultipleReplicatorFixture": - # We only need to call await_ready() on the dest cluster since await_ready() was already - # called on the source cluster in setup(). - self.clusters[1 - self.source_cluster_index].await_ready() - else: - for cluster in self.clusters: - cluster.await_ready() - - self.replicator.await_ready() - - def _do_teardown(self, mode=None): - """Shut down the clusters and the replicator.""" - running_at_start = self.is_running() - if not running_at_start: - self.logger.warning( - "All clusters and replicators were expected to be running before teardown, but weren't." - ) - - teardown_handler = interface.FixtureTeardownHandler(self.logger) - - self.logger.info("Stopping the replicator...") - teardown_handler.teardown(self.replicator, "replicator", mode=mode) - self.logger.info("Stopped the replicator...") - - self.logger.info("Stopping all clusters...") - for i, cluster in enumerate(self.clusters): - teardown_handler.teardown(cluster, f"cluster {i}", mode=mode) - - if teardown_handler.was_successful(): - self.logger.info("Successfully stopped all clusters and replicators.") - else: - self.logger.error("Stopping the fixture failed.") - raise self.fixturelib.ServerFailure(teardown_handler.get_error_message()) - - def is_running(self): - """Return true if all clusters and replicators are still operating.""" - return all(cluster.is_running() - for cluster in self.clusters) and self.replicator.is_running() - - def get_node_info(self): - """Return a list of dicts of NodeInfo objects.""" - output = [] - for cluster in self.clusters: - output += cluster.get_node_info() - output += self.replicator.get_node_info() - - return output - - def get_driver_connection_url(self): - """Return the driver connection URL to the cluster that starts out owning the data.""" - if not self.clusters: - raise ValueError("Must call setup() before calling get_driver_connection_url") - return self.clusters[self.source_cluster_index].get_driver_connection_url() - - def get_internal_connection_string(self): - """Return the internal connection string to the cluster that starts out owning the data.""" - if not self.clusters: - raise ValueError("Must call setup() before calling get_internal_connection_string") - return self.clusters[0].get_internal_connection_string() - - def get_independent_clusters(self): - """Return the clusters involved in cluster to cluster replication.""" - return self.clusters.copy() - - def reverse_replication_direction(self): - """Swap the source and destination clusters.""" - self.source_cluster_index = 1 - self.source_cluster_index diff --git a/buildscripts/resmokelib/testing/fixtures/multiple_replicator.py b/buildscripts/resmokelib/testing/fixtures/multiple_replicator.py deleted file mode 100644 index 16e48e0c5e0..00000000000 --- a/buildscripts/resmokelib/testing/fixtures/multiple_replicator.py +++ /dev/null @@ -1,139 +0,0 @@ -"""Multiple replicator fixture to handle launching and stopping multiple replicator binaries.""" - -import buildscripts.resmokelib.testing.fixtures.interface as interface - - -class MultipleReplicatorFixture(interface.Fixture): - """Fixture which spins up multiple replicators.""" - - REGISTERED_NAME = "MultipleReplicatorFixture" - - def __init__( # pylint: disable=too-many-arguments - self, logger, job_num, fixturelib, executable, quiesce_period=5, cli_options=None, - num_replicators=2, shard_ids=None): - """Initialize ReplicatorFixture with different options for the replicator process.""" - interface.Fixture.__init__(self, logger, job_num, fixturelib) - - self.executable = executable - self.quiesce_period = quiesce_period - self.cli_options = self.fixturelib.default_if_none(cli_options, {}) - self.shard_ids = self.fixturelib.default_if_none(shard_ids, []) - self.num_replicators = num_replicators - - # The running replicator processes. - self.replicators = [] - - if num_replicators < 2: - raise ValueError("The MultipleReplicatorFixture requires at least 2 replicators") - - for i in range(num_replicators): - individual_replicator_logger = self.fixturelib.new_fixture_node_logger( - "MultipleReplicatorFixture", self.job_num, f"replicator{i}") - replicator = self.fixturelib.make_fixture( - "ReplicatorFixture", individual_replicator_logger, self.job_num, - executable=executable, quiesce_period=quiesce_period, cli_options=cli_options) - self.replicators.append(replicator) - - def setup(self): - """ - Set up the multiple replicators. - - For each replicator set the shard ID and call setup, which launches the replicator process. - """ - if self.num_replicators != len(self.shard_ids): - raise ValueError( - "The number of replicators must match the number of shard ids provided.") - - for i, rep in enumerate(self.replicators): - rep.set_cli_options({"id": self.shard_ids[i]}) - rep.setup() - - def pids(self): - """:return: pids owned by this fixture if any.""" - pids = [] - for rep in self.replicators: - pids.extend(rep.pids()) - if len(pids) == 0: - self.logger.debug('Replicators not running when gathering replicator fixture pids.') - return pids - - def await_ready(self): - """ - Block until the fixture can be used for testing. - - For each replicator, call await_ready(). - """ - for rep in self.replicators: - rep.await_ready() - - def start(self): - """Start the replication process by sending the replicator a command.""" - self.logger.info("Starting multiple replicators...\n") - for i, rep in enumerate(self.replicators): - self.logger.info("Starting replicator #%d...", i) - rep.start() - - def commit(self): - """Commit the migration. This currently will just sleep for a quiesce period.""" - for rep in self.replicators: - rep.commit() - - def stop(self, mode=None): - """Stop the replicator binary.""" - self.logger.info("Stopping multiple replicators...\n") - for i, rep in enumerate(self.replicators): - try: - self.logger.info("Stopping replicator #%d...", i) - rep.stop(mode) - except Exception as err: - msg = f"Error stopping replicator #{i}: {err}" - self.logger.exception(msg) - raise self.fixturelib.ServerFailure(msg) - - def resume(self): - """NOOP.""" - pass - - def pause(self): - """NOOP.""" - pass - - def _do_teardown(self, mode=None): - """Teardown the fixture.""" - if not self.is_any_process_running(): - self.logger.info("All replicators already stopped; teardown is a NOOP.") - return - - self.logger.warning("The replicators had not been stopped at the time of teardown.") - self.stop(mode) - - def is_any_process_running(self): - """Return true if any of the replicator binaries is running as a process.""" - return any(rep.is_running() for rep in self.replicators) - - def is_running(self): - """Return true if all of the individual replicator fixtures are running and have not errorred.""" - return all(rep.is_running() for rep in self.replicators) - - def get_internal_connection_string(self): - """Return the internal connection string.""" - raise NotImplementedError("Multiple replicator cannot have internal connection strings.") - - def get_driver_connection_url(self): - """Return the driver connection URL.""" - raise NotImplementedError("Multiple replicator cannot have driver connection URLs.") - - def get_node_info(self): - """Return a list of NodeInfo objects.""" - return [rep.get_node_info()[0] for rep in self.replicators] - - def set_cli_options(self, cli_options): - """Set command line options.""" - for option, value in cli_options.items(): - self.cli_options[option] = value - for rep in self.replicators: - rep.set_cli_options(cli_options) - - def set_shard_ids(self, shard_ids): - """Set the list of shard ids.""" - self.shard_ids = shard_ids diff --git a/buildscripts/resmokelib/testing/fixtures/replicator.py b/buildscripts/resmokelib/testing/fixtures/replicator.py deleted file mode 100644 index b0fa9c92401..00000000000 --- a/buildscripts/resmokelib/testing/fixtures/replicator.py +++ /dev/null @@ -1,155 +0,0 @@ -"""Standalone replicator fixture to handle launching and stopping a replicator binary.""" - -import json -import time -from urllib import request - -import buildscripts.resmokelib.testing.fixtures.interface as interface - - -class ReplicatorFixture(interface.Fixture): - """Fixture which spins up a single replicator.""" - - REGISTERED_NAME = "ReplicatorFixture" - - def __init__( # pylint: disable=too-many-arguments - self, logger, job_num, fixturelib, executable, quiesce_period=5, cli_options=None): - """Initialize ReplicatorFixture with different options for the replicator process.""" - interface.Fixture.__init__(self, logger, job_num, fixturelib) - - self.executable = executable - self.quiesce_period = quiesce_period - self.cli_options = self.fixturelib.default_if_none(cli_options, {}) - self.port = self.fixturelib.get_next_port(job_num) - self.set_cli_options({"port": self.port}) - - # The running replicator process. - self.replicator = None - - def setup(self): - """Launch the replicator webserver to begin accepting replicator commands.""" - self._launch_replicator_process() - - def pids(self): - """:return: pids owned by this fixture if any.""" - out = [x.pid for x in [self.replicator] if x is not None] - if not out: - self.logger.debug('Replicator not running when gathering replicator fixture pid.') - return out - - def await_ready(self): - """ - Block until the fixture can be used for testing. - - NOOP since the binary is fully launched in `setup`. - """ - pass - - def _launch_replicator_process(self): - """Launch the replicator binary.""" - - replicator = self.fixturelib.generic_program(self.logger, [self.executable], self.job_num, - test_id=None, process_kwargs=None, - **self.cli_options) - try: - self.logger.info("Launch replicator webserver...\n%s", replicator.as_command()) - replicator.start() - self.logger.info("Replicator launched with pid %d on port %d.", replicator.pid, - self.port) - except Exception as err: - msg = "Failed to launch replicator: {}".format(err) - self.logger.exception(msg) - raise self.fixturelib.ServerFailure(msg) - - self.replicator = replicator - - def start(self): - """Start the replication process by sending the replicator a command.""" - url = self.get_api_url() + '/api/v1/start' - # Right now we set reversible to false, at some point this could be an - # argument to start. - data = '{"reversible": false, "source": "cluster0", "destination": "cluster1"}'.encode( - 'ascii') - headers = {'Content-Type': 'application/json'} - - req = request.Request(url=url, data=data, headers=headers) - self.logger.info("Sending start command to replicator: %s", req.data) - response = request.urlopen(req).read().decode('ascii') - self.logger.info("Replicator start command response was: %s", response) - - if not json.loads(response)["success"]: - msg = f"Replicator failed to start: {response}" - self.logger.exception(msg) - raise self.fixturelib.ServerFailure(msg) - - def commit(self): - """Commit the migration. This currently will just sleep for a quiesce period.""" - self.logger.info("Sleeping for %d s to allow replicator to finish up.", self.quiesce_period) - time.sleep(self.quiesce_period) - self.logger.info("Done sleeping through quiesce period.") - - def stop(self, mode=None): - """Stop the replicator binary.""" - mode = interface.TeardownMode.TERMINATE if mode is None else mode - - self.logger.info("Stopping replicator with pid %d...", self.replicator.pid) - if not self._is_process_running(): - exit_code = self.replicator.poll() - msg = ("Replicator was expected to be running, but wasn't. " - "Process exited with code {:d}.").format(exit_code) - self.logger.warning(msg) - raise self.fixturelib.ServerFailure(msg) - - self.replicator.stop(mode) - exit_code = self.replicator.wait() - # TODO (SERVER-63544): Check to make sure the error code is correct. - self.logger.info("Process exited with error code {:d}.".format(exit_code)) - - def resume(self): - """NOOP.""" - pass - - def pause(self): - """NOOP.""" - pass - - def _do_teardown(self, mode=None): - """Teardown the fixture.""" - if not self._is_process_running(): - self.logger.info("Replicator already stopped; teardown is a NOOP.") - return - - self.logger.warning("The replicator had not been stopped at the time of teardown.") - self.stop(mode) - - def _is_process_running(self): - """Return true if the replicator binary is running as a process.""" - return self.replicator is not None and self.replicator.poll() is None - - def is_running(self): - """Return if the fixture is running or has not errorred.""" - return self._is_process_running() - - def get_internal_connection_string(self): - """Return the internal connection string.""" - raise NotImplementedError("Replicator cannot have an internal connection string.") - - def get_driver_connection_url(self): - """Return the driver connection URL.""" - raise NotImplementedError("Replicator cannot have a driver connection URL.") - - def get_api_url(self): - """Return the URL used to send the replicator commands.""" - return f'http://localhost:{self.port}' - - def get_node_info(self): - """Return a list of NodeInfo objects.""" - info = interface.NodeInfo(full_name=self.logger.full_name, name=self.logger.name, - port=self.port, - pid=self.replicator.pid if self.replicator is not None else -1) - return [info] - - def set_cli_options(self, cli_options): - """Set command line options.""" - for option, value in cli_options.items(): - self.cli_options[option] = value diff --git a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_data_consistency.py b/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_data_consistency.py deleted file mode 100644 index d2623527f00..00000000000 --- a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_data_consistency.py +++ /dev/null @@ -1,23 +0,0 @@ -"""Test hook for verifying data consistency between two clusters in a cluster to cluster replication.""" -import copy -import os.path - -from buildscripts.resmokelib.testing.hooks import jsfile - - -class CheckClusterToClusterDataConsistency(jsfile.DataConsistencyHook): - """Check if the two clusters have the same data. - - This includes metadata such as the shard key (where applicable), indexes, collection options, - etc. - """ - - IS_BACKGROUND = False - - def __init__( # pylint: disable=super-init-not-called - self, hook_logger, fixture, shell_options=None): - """Initialize CheckClusterToClusterDataConsistency.""" - description = "Ensure that both clusters are data consistent by querying both clusters." - js_filename = os.path.join("jstests", "hooks", "run_cluster_to_cluster_data_consistency.js") - jsfile.JSHook.__init__( # pylint: disable=non-parent-init-called - self, hook_logger, fixture, js_filename, description, shell_options=shell_options) diff --git a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_dummy_replicator.py b/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_dummy_replicator.py deleted file mode 100644 index 2eb17d9c6a0..00000000000 --- a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_dummy_replicator.py +++ /dev/null @@ -1,52 +0,0 @@ -"""Test implementation for running the dummy replicator on two clusters.""" -import copy -import os.path - -from buildscripts.resmokelib.testing.hooks import jsfile -from buildscripts.resmokelib.testing.hooks import replicator_interface - - -class DummyReplicator(replicator_interface.ReplicatorInterface): - """A dummy implementation of the replicator.""" - - def __init__(self, logger, fixture): - """Initialize the dummy replicator.""" - replicator_interface.ReplicatorInterface.__init__(self, logger, fixture) - - def start(self, start_options=None): - """Start the dummy replicator.""" - self.logger.info("Starting the dummy replicator (NOOP).") - - def pause(self, pause_options=None): - """Pause the dummy replicator.""" - self.logger.info("Pausing the dummy replicator (NOOP).") - - def resume(self, resume_options=None): - """Resume the replicator.""" - self.logger.info("Resuming the dummy replicator (NOOP).") - - def stop(self, stop_options=None): - """Stop the replicator.""" - # Since the dummy replicator doesn't run while the cluster is live, we run the dummy - # replicator from start to finish here instead. - self.logger.info("Stopping and synchronizing the dummy replicator.") - replicator_runner = DummyReplicator._ClusterToClusterDummyReplicatorHook( - self.logger, self._fixture, stop_options["shell_options"]) - replicator_runner.before_suite(stop_options["test_report"]) - replicator_runner.before_test(stop_options["test"], stop_options["test_report"]) - replicator_runner.after_test(stop_options["test"], stop_options["test_report"]) - replicator_runner.after_suite(stop_options["test_report"]) - self.logger.info("Stopped and synchronized the dummy replicator.") - - class _ClusterToClusterDummyReplicatorHook(jsfile.JSHook): - """A hook that the DummyReplicator uses internally to copy documents from cluster to cluster.""" - - IS_BACKGROUND = False - - def __init__( # pylint: disable=super-init-not-called - self, hook_logger, fixture, shell_options=None): - """Initialize ClusterToClusterDummyReplicator.""" - description = "Run the dummy cluster to cluster replicator between two clusters." - js_filename = os.path.join("jstests", "hooks", "dummy_cluster_to_cluster_replicator.js") - jsfile.JSHook.__init__( # pylint: disable=non-parent-init-called - self, hook_logger, fixture, js_filename, description, shell_options=shell_options) diff --git a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_kill_replicator.py b/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_kill_replicator.py deleted file mode 100644 index 7ea363a3b09..00000000000 --- a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_kill_replicator.py +++ /dev/null @@ -1,26 +0,0 @@ -"""Test hook that continuously kills and restarts the replicator.""" - -import random -import time - -import buildscripts.resmokelib.testing.fixtures.interface as interface -from buildscripts.resmokelib.testing.hooks import bghook - - -class KillReplicator(bghook.BGHook): - """A hook that kills the replicator process and restarts it.""" - - def __init__(self, hook_logger, fixture, tests_per_cycle, min_sleep_secs=1, max_sleep_secs=3): - """Initialize KillReplicator.""" - bghook.BGHook.__init__(self, hook_logger, fixture, "Kill replicator hook", tests_per_cycle) - self.min_sleep_secs = min_sleep_secs - self.max_sleep_secs = max_sleep_secs - - def run_action(self): - """Sleep for a random amount of time, then kill and restart the replicator.""" - rand_sleep = random.uniform(self.min_sleep_secs, self.max_sleep_secs) - self.logger.info("Sleeping for %.2f seconds before killing the replicator", rand_sleep) - time.sleep(rand_sleep) - - self.fixture.replicator.stop(interface.TeardownMode.KILL) - self.fixture.replicator.setup() diff --git a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_replication.py b/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_replication.py deleted file mode 100644 index 75207715089..00000000000 --- a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_replication.py +++ /dev/null @@ -1,149 +0,0 @@ -"""Test hook that runs cluster to cluster replications continuously.""" - -import copy -import math -import random - -from buildscripts.resmokelib import config -from buildscripts.resmokelib import errors -from buildscripts.resmokelib.testing.fixtures import cluster_to_cluster -from buildscripts.resmokelib.testing.hooks import interface -from buildscripts.resmokelib.testing.hooks import cluster_to_cluster_data_consistency -from buildscripts.resmokelib.testing.hooks import dbhash - - -class ClusterToClusterReplication(interface.Hook): # pylint: disable=too-many-instance-attributes - """Starts a cluster to cluster replication thread at the beginning of each test.""" - - DESCRIPTION = ("Continuous cluster to cluster replications") - - IS_BACKGROUND = True - # By default, we pause / stop the replicator at the end of the suite and then perform data - # consistency checks. - DEFAULT_TESTS_PER_CYCLE = math.inf - - def __init__(self, hook_logger, fixture, shell_options, tests_per_cycle=None, - replicator_start_delay=None): - """Initialize the ClusterToClusterReplication. - - Args: - hook_logger: the logger instance for this hook. - fixture: the target ClusterToCluster fixture containing two clusters. - shell_options: contains the global_vars. - """ - interface.Hook.__init__(self, hook_logger, fixture, ClusterToClusterReplication.DESCRIPTION) - - if not isinstance(fixture, cluster_to_cluster.ClusterToClusterFixture): - raise ValueError( - "The ClusterToClusterReplication hook requires a ClusterToClusterFixture") - self._fixture = fixture - self._shell_options = copy.deepcopy(shell_options) - - # The number of tests executing so far. - self._test_num = 0 - # The number of tests we execute before running a data consistency check and restarting the - # replicator. - self._tests_per_cycle = self.DEFAULT_TESTS_PER_CYCLE if tests_per_cycle is None else tests_per_cycle - # The replicator is not started until some number of tests are run first. - self._replicator_start_delay = replicator_start_delay - random.seed(config.RANDOM_SEED) - - # The last test executed so far. - self._last_test = None - - self._replicator = self._fixture.replicator - - def before_suite(self, test_report): - """Before suite.""" - if not self._fixture: - raise ValueError("No ClusterToClusterFixture to run migrations on") - - if self._replicator_start_delay is None: - if math.isinf(self._tests_per_cycle): - self._replicator_start_delay = random.randint(0, 10) - else: - self._replicator_start_delay = random.randint(0, self._tests_per_cycle) - self.logger.info("Starting the replicator after %d tests are run.", - self._replicator_start_delay) - - def after_suite(self, test_report, teardown_flag=None): - """After suite.""" - self.logger.info("Ran %d tests in total.", self._test_num) - # Perform the following actions only if some tests have been run. - if self._test_num % self._tests_per_cycle > self._replicator_start_delay: - self._run_replicator_action(test_report, self._replicator.commit) - - self._run_data_consistency_check(self._last_test, test_report) - self._run_check_repl_db_hash(self._last_test, test_report) - - def before_test(self, test, test_report): - """Before test.""" - if self._test_num % self._tests_per_cycle == self._replicator_start_delay: - self._run_replicator_action(test_report, self._replicator.start) - - def after_test(self, test, test_report): - """After test.""" - self._test_num += 1 - self._last_test = test - - # Every 'n' tests, the replicator should be pause / stop the replicator and perform data - # consistency checks. - if self._test_num % self._tests_per_cycle == 0: - if self._tests_per_cycle == self._replicator_start_delay: - self._run_replicator_action(test_report, self._replicator.start) - - self._run_replicator_action(test_report, self._replicator.commit) - - self._run_data_consistency_check(test, test_report) - self._run_check_repl_db_hash(test, test_report) - - def _run_data_consistency_check(self, test, test_report): - """Run the data consistency check across both clusters.""" - # The TestData needs to be set to allow the data consistency hooks to run correctly. - clusters = self._fixture.get_independent_clusters() - source_url = clusters[self._fixture.source_cluster_index].get_driver_connection_url() - dest_url = clusters[1 - self._fixture.source_cluster_index].get_driver_connection_url() - - shell_options = copy.deepcopy(self._shell_options) - shell_options["global_vars"]["TestData"]["sourceConnectionString"] = source_url - shell_options["global_vars"]["TestData"]["destinationConnectionString"] = dest_url - - data_consistency = cluster_to_cluster_data_consistency.CheckClusterToClusterDataConsistency( - self.logger, self._fixture, shell_options) - data_consistency.before_suite(test_report) - data_consistency.before_test(test, test_report) - data_consistency.after_test(test, test_report) - data_consistency.after_suite(test_report) - - def _run_check_repl_db_hash(self, test, test_report): - """Check the repl DB hash on each cluster.""" - check_db_hash = dbhash.CheckReplDBHash(self.logger, self._fixture, self._shell_options) - check_db_hash.before_suite(test_report) - check_db_hash.before_test(test, test_report) - check_db_hash.after_test(test, test_report) - check_db_hash.after_suite(test_report) - - def _run_replicator_action(self, test_report, action): - self.logger.info(f"Running replicator action: {action.__name__}") - replicator_action_case = _ReplicatorActionTestCase(self.logger, self._last_test, self, - action) - replicator_action_case.run_dynamic_test(test_report) - self.logger.info(f"Ran replicator action: {action.__name__}") - - -class _ReplicatorActionTestCase(interface.DynamicTestCase): - """_ReplicatorActionTestCase class, to run a replicator action as a test.""" - - def __init__( # pylint: disable=too-many-arguments - self, logger, base_test_name, hook, action): - """Initialize _ReplicatorActionTestCase.""" - interface.DynamicTestCase.__init__(self, logger, f"replicator_action:{action.__name__}", - "Run a replicator action.", base_test_name, hook) - self._action = action - - def run_test(self): - try: - self._action() - except: - self.logger.exception("Failed to run replicator action '%s'.", self._action) - raise diff --git a/buildscripts/resmokelib/testing/hooks/replicator_interface.py b/buildscripts/resmokelib/testing/hooks/replicator_interface.py deleted file mode 100644 index 6747aa66836..00000000000 --- a/buildscripts/resmokelib/testing/hooks/replicator_interface.py +++ /dev/null @@ -1,39 +0,0 @@ -"""An interface to control a replicator in our test infrastructure.""" - - -class ReplicatorInterface(object): - """A replicator interface. - - Provides a way to start, pause, resume and stop the replicator. - """ - - def __init__(self, logger, fixture): - """Initialize the replicator.""" - self.logger = logger - self._fixture = fixture - - def start(self, start_options): - """Start the replicator. - - This method will return after the replicator has been started. - """ - - def stop(self, stop_options): - """Stop the replicator. - - This method will return after the replicator has been stopped, without requiring the - replicator to have finished synchronizing across clusters. - """ - - def pause(self, pause_options): - """Pause the replicator. - - This method will return after the replicator has been paused. This method requires that the - replicator has completed synchronizing across clusters. - """ - - def resume(self, resume_options): - """Resume the replicator. - - This method will return after the replicator has been resumed - """ diff --git a/buildscripts/resmokelib/testing/hooks/stepdown.py b/buildscripts/resmokelib/testing/hooks/stepdown.py index 91f0383b029..3199ea11109 100644 --- a/buildscripts/resmokelib/testing/hooks/stepdown.py +++ b/buildscripts/resmokelib/testing/hooks/stepdown.py @@ -14,7 +14,6 @@ from buildscripts.resmokelib.testing.fixtures import interface as fixture_interf from buildscripts.resmokelib.testing.fixtures import replicaset from buildscripts.resmokelib.testing.fixtures import shardedcluster from buildscripts.resmokelib.testing.fixtures import tenant_migration -from buildscripts.resmokelib.testing.fixtures import cluster_to_cluster from buildscripts.resmokelib.testing.hooks import interface @@ -26,6 +25,9 @@ class ContinuousStepdown(interface.Hook): # pylint: disable=too-many-instance-a IS_BACKGROUND = True + # The hook stops the fixture partially during its execution. + STOPS_FIXTURE = True + def __init__( # pylint: disable=too-many-arguments self, hook_logger, fixture, config_stepdown=True, shard_stepdown=True, stepdown_interval_ms=8000, terminate=False, kill=False, @@ -138,8 +140,8 @@ class ContinuousStepdown(interface.Hook): # pylint: disable=too-many-instance-a for rs_fixture in fixture.get_replsets(): self._rs_fixtures.append(rs_fixture) - elif isinstance(fixture, cluster_to_cluster.ClusterToClusterFixture): - # Recursively call _add_fixture on the source and destination clusters. + elif isinstance(fixture, fixture_interface.MultiClusterFixture): + # Recursively call _add_fixture on all the independent clusters. for cluster_fixture in fixture.get_independent_clusters(): self._add_fixture(cluster_fixture) diff --git a/buildscripts/resmokelib/testing/job.py b/buildscripts/resmokelib/testing/job.py index 1c3be065868..ba9f54e6912 100644 --- a/buildscripts/resmokelib/testing/job.py +++ b/buildscripts/resmokelib/testing/job.py @@ -8,8 +8,6 @@ from buildscripts.resmokelib import config from buildscripts.resmokelib import errors from buildscripts.resmokelib.testing import testcases from buildscripts.resmokelib.testing.fixtures.interface import create_fixture_table -from buildscripts.resmokelib.testing.hooks import stepdown -from buildscripts.resmokelib.testing.hooks import cluster_to_cluster_kill_replicator from buildscripts.resmokelib.testing.testcases import fixture as _fixture from buildscripts.resmokelib.utils import queue as _queue @@ -35,9 +33,7 @@ class Job(object): # pylint: disable=too-many-instance-attributes # expected, there is a race where fixture.is_running() could fail if called after the # primary was killed but before it was restarted. self._check_if_fixture_running = not any( - isinstance(hook, (stepdown.ContinuousStepdown, - cluster_to_cluster_kill_replicator.KillReplicator)) - for hook in self.hooks) + hasattr(hook, "STOPS_FIXTURE") and hook.STOPS_FIXTURE for hook in self.hooks) @property def job_num(self): |