summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/testing
diff options
context:
space:
mode:
authorkauboy26 <vishnu.kaushik@mongodb.com>2022-04-04 18:11:43 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-04 19:07:46 +0000
commit77c0daa37cf81205dba35690851fd4024d3d8e21 (patch)
treeaf3252a6bb9576ae1d3519ba709e3764919f87cc /buildscripts/resmokelib/testing
parent6abfb1f99e65c451b233bce7e1b941d74669a7b5 (diff)
downloadmongo-77c0daa37cf81205dba35690851fd4024d3d8e21.tar.gz
SERVER-65000 Deleted fixtures and hooks related to the replicator.
Diffstat (limited to 'buildscripts/resmokelib/testing')
-rw-r--r--buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py190
-rw-r--r--buildscripts/resmokelib/testing/fixtures/multiple_replicator.py139
-rw-r--r--buildscripts/resmokelib/testing/fixtures/replicator.py155
-rw-r--r--buildscripts/resmokelib/testing/hooks/cluster_to_cluster_data_consistency.py23
-rw-r--r--buildscripts/resmokelib/testing/hooks/cluster_to_cluster_dummy_replicator.py52
-rw-r--r--buildscripts/resmokelib/testing/hooks/cluster_to_cluster_kill_replicator.py26
-rw-r--r--buildscripts/resmokelib/testing/hooks/cluster_to_cluster_replication.py149
-rw-r--r--buildscripts/resmokelib/testing/hooks/replicator_interface.py39
-rw-r--r--buildscripts/resmokelib/testing/hooks/stepdown.py8
-rw-r--r--buildscripts/resmokelib/testing/job.py6
10 files changed, 6 insertions, 781 deletions
diff --git a/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py b/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py
deleted file mode 100644
index fbc77d03944..00000000000
--- a/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py
+++ /dev/null
@@ -1,190 +0,0 @@
-"""Fixture with two clusters (for cluster to cluster replications) for executing JSTests against."""
-
-import copy
-import os.path
-
-import buildscripts.resmokelib.testing.fixtures.interface as interface
-
-
-class ClusterToClusterFixture(interface.MultiClusterFixture): # pylint: disable=too-many-instance-attributes
- """Fixture which provides two clusters to perform a cluster to cluster replication."""
-
- def __init__( # pylint: disable=too-many-arguments,too-many-locals
- self, logger, job_num, fixturelib, cluster0_options, cluster1_options,
- replicator_options, dbpath_prefix=None, preserve_dbpath=False):
- """Initialize with different options for the clusters."""
-
- interface.MultiClusterFixture.__init__(self, logger, job_num, fixturelib,
- dbpath_prefix=dbpath_prefix)
-
- self.clusters = []
- self.both_cluster_options = []
- parsed_options = [
- self.fixturelib.default_if_none(copy.deepcopy(cluster0_options), {}),
- self.fixturelib.default_if_none(copy.deepcopy(cluster1_options), {})
- ]
-
- self.preserve_dbpath = preserve_dbpath
-
- # The usual command line params can lead to messy behavior in unlike topologies, since they
- # may override the cluster-to-cluster behavior specified elsewhere on both clusters in an
- # unexpected way. Therefore, forbid them.
- if any(
- v is not None for v in (self.config.MIXED_BIN_VERSIONS, self.config.NUM_SHARDS,
- self.config.NUM_REPLSET_NODES)):
- raise ValueError(
- "ClusterToClusterFixture options must be specified through 'cluster0_options' and 'cluster1_options'."
- )
-
- for i, cluster_options in enumerate(parsed_options):
- cluster_options["settings"] = self.fixturelib.default_if_none(
- cluster_options["settings"], {})
- if "preserve_dbpath" not in cluster_options["settings"]\
- or cluster_options["settings"]["preserve_dbpath"] is None:
- cluster_options["settings"]["preserve_dbpath"] = self.preserve_dbpath
-
- cluster_options["settings"]["dbpath_prefix"] = os.path.join(
- self._dbpath_prefix, f"cluster{i}")
-
- if cluster_options["class"] == "ReplicaSetFixture":
- cluster_options["settings"]["replicaset_logging_prefix"] = f"cl{i}"
- elif cluster_options["class"] == "ShardedClusterFixture":
- cluster_options["settings"]["cluster_logging_prefix"] = f"cl{i}"
- else:
- raise ValueError(f"Illegal fixture class: {cluster_options['class']}")
-
- self.logger.info(f"Cluster{i} configured with settings: {cluster_options}")
-
- self.both_cluster_options = parsed_options
-
- # The cluster that starts off with the data.
- self.source_cluster_index = 0
- self.replicator_options = replicator_options
-
- for cluster_options in self.both_cluster_options:
- if replicator_options["class"] == "MultipleReplicatorFixture":
- if cluster_options["class"] != "ShardedClusterFixture":
- raise ValueError(
- "MultipleReplicatorFixture can only be run with ShardedClusterFixture")
- cluster = self.fixturelib.make_fixture(cluster_options["class"], self.logger,
- self.job_num, **cluster_options["settings"])
- self.clusters.append(cluster)
-
- replicator_logger = self.fixturelib.new_fixture_node_logger(replicator_options["class"],
- self.job_num, "replicator")
-
- self.replicator = self.fixturelib.make_fixture(replicator_options["class"],
- replicator_logger, self.job_num,
- **self.replicator_options["settings"])
-
- def setup(self):
- """Set up the cluster to cluster fixture according to the options provided."""
- for i, cluster in enumerate(self.clusters):
- self.logger.info(f"Setting up cluster {i}.")
- cluster.setup()
-
- source_url = self.clusters[self.source_cluster_index].get_driver_connection_url()
- dest_url = self.clusters[1 - self.source_cluster_index].get_driver_connection_url()
- self.logger.info("Setting source cluster string: '%s', destination cluster string: '%s'",
- source_url, dest_url)
- self.replicator.set_cli_options({'cluster0': source_url, 'cluster1': dest_url})
-
- # If we are using multiple replicators, we must get the list of shard ids from the source
- # cluster and pass the ids as CLI options to each replicator.
- if self.replicator_options["class"] == "MultipleReplicatorFixture":
- # Wait for the source cluster to be fully running
- self.clusters[self.source_cluster_index].await_ready()
- try:
- shard_ids = self.clusters[self.source_cluster_index].get_shard_ids()
- except Exception as err:
- msg = f"Error getting shard ids from source cluster: {err}"
- self.logger.exception(msg)
- raise self.fixturelib.ServerFailure(msg)
- self.replicator.set_shard_ids(shard_ids)
-
- self.replicator.setup()
-
- def pids(self):
- """:return: pids owned by this fixture if any."""
- out = []
- for i, cluster in enumerate(self.clusters):
- self.logger.info(f"Gathering cluster {i} pids: {cluster.pids()}")
- out.extend(cluster.pids())
- if not out:
- self.logger.debug('No clusters when gathering cluster to cluster fixture pids.')
-
- replicator_pids = self.replicator.pids()
- self.logger.info(f"Gathering replicator pids: {replicator_pids}")
- out.extend(replicator_pids)
-
- return out
-
- def await_ready(self):
- """Block until the fixture can be used for testing."""
- if self.replicator_options["class"] == "MultipleReplicatorFixture":
- # We only need to call await_ready() on the dest cluster since await_ready() was already
- # called on the source cluster in setup().
- self.clusters[1 - self.source_cluster_index].await_ready()
- else:
- for cluster in self.clusters:
- cluster.await_ready()
-
- self.replicator.await_ready()
-
- def _do_teardown(self, mode=None):
- """Shut down the clusters and the replicator."""
- running_at_start = self.is_running()
- if not running_at_start:
- self.logger.warning(
- "All clusters and replicators were expected to be running before teardown, but weren't."
- )
-
- teardown_handler = interface.FixtureTeardownHandler(self.logger)
-
- self.logger.info("Stopping the replicator...")
- teardown_handler.teardown(self.replicator, "replicator", mode=mode)
- self.logger.info("Stopped the replicator...")
-
- self.logger.info("Stopping all clusters...")
- for i, cluster in enumerate(self.clusters):
- teardown_handler.teardown(cluster, f"cluster {i}", mode=mode)
-
- if teardown_handler.was_successful():
- self.logger.info("Successfully stopped all clusters and replicators.")
- else:
- self.logger.error("Stopping the fixture failed.")
- raise self.fixturelib.ServerFailure(teardown_handler.get_error_message())
-
- def is_running(self):
- """Return true if all clusters and replicators are still operating."""
- return all(cluster.is_running()
- for cluster in self.clusters) and self.replicator.is_running()
-
- def get_node_info(self):
- """Return a list of dicts of NodeInfo objects."""
- output = []
- for cluster in self.clusters:
- output += cluster.get_node_info()
- output += self.replicator.get_node_info()
-
- return output
-
- def get_driver_connection_url(self):
- """Return the driver connection URL to the cluster that starts out owning the data."""
- if not self.clusters:
- raise ValueError("Must call setup() before calling get_driver_connection_url")
- return self.clusters[self.source_cluster_index].get_driver_connection_url()
-
- def get_internal_connection_string(self):
- """Return the internal connection string to the cluster that starts out owning the data."""
- if not self.clusters:
- raise ValueError("Must call setup() before calling get_internal_connection_string")
- return self.clusters[0].get_internal_connection_string()
-
- def get_independent_clusters(self):
- """Return the clusters involved in cluster to cluster replication."""
- return self.clusters.copy()
-
- def reverse_replication_direction(self):
- """Swap the source and destination clusters."""
- self.source_cluster_index = 1 - self.source_cluster_index
diff --git a/buildscripts/resmokelib/testing/fixtures/multiple_replicator.py b/buildscripts/resmokelib/testing/fixtures/multiple_replicator.py
deleted file mode 100644
index 16e48e0c5e0..00000000000
--- a/buildscripts/resmokelib/testing/fixtures/multiple_replicator.py
+++ /dev/null
@@ -1,139 +0,0 @@
-"""Multiple replicator fixture to handle launching and stopping multiple replicator binaries."""
-
-import buildscripts.resmokelib.testing.fixtures.interface as interface
-
-
-class MultipleReplicatorFixture(interface.Fixture):
- """Fixture which spins up multiple replicators."""
-
- REGISTERED_NAME = "MultipleReplicatorFixture"
-
- def __init__( # pylint: disable=too-many-arguments
- self, logger, job_num, fixturelib, executable, quiesce_period=5, cli_options=None,
- num_replicators=2, shard_ids=None):
- """Initialize ReplicatorFixture with different options for the replicator process."""
- interface.Fixture.__init__(self, logger, job_num, fixturelib)
-
- self.executable = executable
- self.quiesce_period = quiesce_period
- self.cli_options = self.fixturelib.default_if_none(cli_options, {})
- self.shard_ids = self.fixturelib.default_if_none(shard_ids, [])
- self.num_replicators = num_replicators
-
- # The running replicator processes.
- self.replicators = []
-
- if num_replicators < 2:
- raise ValueError("The MultipleReplicatorFixture requires at least 2 replicators")
-
- for i in range(num_replicators):
- individual_replicator_logger = self.fixturelib.new_fixture_node_logger(
- "MultipleReplicatorFixture", self.job_num, f"replicator{i}")
- replicator = self.fixturelib.make_fixture(
- "ReplicatorFixture", individual_replicator_logger, self.job_num,
- executable=executable, quiesce_period=quiesce_period, cli_options=cli_options)
- self.replicators.append(replicator)
-
- def setup(self):
- """
- Set up the multiple replicators.
-
- For each replicator set the shard ID and call setup, which launches the replicator process.
- """
- if self.num_replicators != len(self.shard_ids):
- raise ValueError(
- "The number of replicators must match the number of shard ids provided.")
-
- for i, rep in enumerate(self.replicators):
- rep.set_cli_options({"id": self.shard_ids[i]})
- rep.setup()
-
- def pids(self):
- """:return: pids owned by this fixture if any."""
- pids = []
- for rep in self.replicators:
- pids.extend(rep.pids())
- if len(pids) == 0:
- self.logger.debug('Replicators not running when gathering replicator fixture pids.')
- return pids
-
- def await_ready(self):
- """
- Block until the fixture can be used for testing.
-
- For each replicator, call await_ready().
- """
- for rep in self.replicators:
- rep.await_ready()
-
- def start(self):
- """Start the replication process by sending the replicator a command."""
- self.logger.info("Starting multiple replicators...\n")
- for i, rep in enumerate(self.replicators):
- self.logger.info("Starting replicator #%d...", i)
- rep.start()
-
- def commit(self):
- """Commit the migration. This currently will just sleep for a quiesce period."""
- for rep in self.replicators:
- rep.commit()
-
- def stop(self, mode=None):
- """Stop the replicator binary."""
- self.logger.info("Stopping multiple replicators...\n")
- for i, rep in enumerate(self.replicators):
- try:
- self.logger.info("Stopping replicator #%d...", i)
- rep.stop(mode)
- except Exception as err:
- msg = f"Error stopping replicator #{i}: {err}"
- self.logger.exception(msg)
- raise self.fixturelib.ServerFailure(msg)
-
- def resume(self):
- """NOOP."""
- pass
-
- def pause(self):
- """NOOP."""
- pass
-
- def _do_teardown(self, mode=None):
- """Teardown the fixture."""
- if not self.is_any_process_running():
- self.logger.info("All replicators already stopped; teardown is a NOOP.")
- return
-
- self.logger.warning("The replicators had not been stopped at the time of teardown.")
- self.stop(mode)
-
- def is_any_process_running(self):
- """Return true if any of the replicator binaries is running as a process."""
- return any(rep.is_running() for rep in self.replicators)
-
- def is_running(self):
- """Return true if all of the individual replicator fixtures are running and have not errorred."""
- return all(rep.is_running() for rep in self.replicators)
-
- def get_internal_connection_string(self):
- """Return the internal connection string."""
- raise NotImplementedError("Multiple replicator cannot have internal connection strings.")
-
- def get_driver_connection_url(self):
- """Return the driver connection URL."""
- raise NotImplementedError("Multiple replicator cannot have driver connection URLs.")
-
- def get_node_info(self):
- """Return a list of NodeInfo objects."""
- return [rep.get_node_info()[0] for rep in self.replicators]
-
- def set_cli_options(self, cli_options):
- """Set command line options."""
- for option, value in cli_options.items():
- self.cli_options[option] = value
- for rep in self.replicators:
- rep.set_cli_options(cli_options)
-
- def set_shard_ids(self, shard_ids):
- """Set the list of shard ids."""
- self.shard_ids = shard_ids
diff --git a/buildscripts/resmokelib/testing/fixtures/replicator.py b/buildscripts/resmokelib/testing/fixtures/replicator.py
deleted file mode 100644
index b0fa9c92401..00000000000
--- a/buildscripts/resmokelib/testing/fixtures/replicator.py
+++ /dev/null
@@ -1,155 +0,0 @@
-"""Standalone replicator fixture to handle launching and stopping a replicator binary."""
-
-import json
-import time
-from urllib import request
-
-import buildscripts.resmokelib.testing.fixtures.interface as interface
-
-
-class ReplicatorFixture(interface.Fixture):
- """Fixture which spins up a single replicator."""
-
- REGISTERED_NAME = "ReplicatorFixture"
-
- def __init__( # pylint: disable=too-many-arguments
- self, logger, job_num, fixturelib, executable, quiesce_period=5, cli_options=None):
- """Initialize ReplicatorFixture with different options for the replicator process."""
- interface.Fixture.__init__(self, logger, job_num, fixturelib)
-
- self.executable = executable
- self.quiesce_period = quiesce_period
- self.cli_options = self.fixturelib.default_if_none(cli_options, {})
- self.port = self.fixturelib.get_next_port(job_num)
- self.set_cli_options({"port": self.port})
-
- # The running replicator process.
- self.replicator = None
-
- def setup(self):
- """Launch the replicator webserver to begin accepting replicator commands."""
- self._launch_replicator_process()
-
- def pids(self):
- """:return: pids owned by this fixture if any."""
- out = [x.pid for x in [self.replicator] if x is not None]
- if not out:
- self.logger.debug('Replicator not running when gathering replicator fixture pid.')
- return out
-
- def await_ready(self):
- """
- Block until the fixture can be used for testing.
-
- NOOP since the binary is fully launched in `setup`.
- """
- pass
-
- def _launch_replicator_process(self):
- """Launch the replicator binary."""
-
- replicator = self.fixturelib.generic_program(self.logger, [self.executable], self.job_num,
- test_id=None, process_kwargs=None,
- **self.cli_options)
- try:
- self.logger.info("Launch replicator webserver...\n%s", replicator.as_command())
- replicator.start()
- self.logger.info("Replicator launched with pid %d on port %d.", replicator.pid,
- self.port)
- except Exception as err:
- msg = "Failed to launch replicator: {}".format(err)
- self.logger.exception(msg)
- raise self.fixturelib.ServerFailure(msg)
-
- self.replicator = replicator
-
- def start(self):
- """Start the replication process by sending the replicator a command."""
- url = self.get_api_url() + '/api/v1/start'
- # Right now we set reversible to false, at some point this could be an
- # argument to start.
- data = '{"reversible": false, "source": "cluster0", "destination": "cluster1"}'.encode(
- 'ascii')
- headers = {'Content-Type': 'application/json'}
-
- req = request.Request(url=url, data=data, headers=headers)
- self.logger.info("Sending start command to replicator: %s", req.data)
- response = request.urlopen(req).read().decode('ascii')
- self.logger.info("Replicator start command response was: %s", response)
-
- if not json.loads(response)["success"]:
- msg = f"Replicator failed to start: {response}"
- self.logger.exception(msg)
- raise self.fixturelib.ServerFailure(msg)
-
- def commit(self):
- """Commit the migration. This currently will just sleep for a quiesce period."""
- self.logger.info("Sleeping for %d s to allow replicator to finish up.", self.quiesce_period)
- time.sleep(self.quiesce_period)
- self.logger.info("Done sleeping through quiesce period.")
-
- def stop(self, mode=None):
- """Stop the replicator binary."""
- mode = interface.TeardownMode.TERMINATE if mode is None else mode
-
- self.logger.info("Stopping replicator with pid %d...", self.replicator.pid)
- if not self._is_process_running():
- exit_code = self.replicator.poll()
- msg = ("Replicator was expected to be running, but wasn't. "
- "Process exited with code {:d}.").format(exit_code)
- self.logger.warning(msg)
- raise self.fixturelib.ServerFailure(msg)
-
- self.replicator.stop(mode)
- exit_code = self.replicator.wait()
- # TODO (SERVER-63544): Check to make sure the error code is correct.
- self.logger.info("Process exited with error code {:d}.".format(exit_code))
-
- def resume(self):
- """NOOP."""
- pass
-
- def pause(self):
- """NOOP."""
- pass
-
- def _do_teardown(self, mode=None):
- """Teardown the fixture."""
- if not self._is_process_running():
- self.logger.info("Replicator already stopped; teardown is a NOOP.")
- return
-
- self.logger.warning("The replicator had not been stopped at the time of teardown.")
- self.stop(mode)
-
- def _is_process_running(self):
- """Return true if the replicator binary is running as a process."""
- return self.replicator is not None and self.replicator.poll() is None
-
- def is_running(self):
- """Return if the fixture is running or has not errorred."""
- return self._is_process_running()
-
- def get_internal_connection_string(self):
- """Return the internal connection string."""
- raise NotImplementedError("Replicator cannot have an internal connection string.")
-
- def get_driver_connection_url(self):
- """Return the driver connection URL."""
- raise NotImplementedError("Replicator cannot have a driver connection URL.")
-
- def get_api_url(self):
- """Return the URL used to send the replicator commands."""
- return f'http://localhost:{self.port}'
-
- def get_node_info(self):
- """Return a list of NodeInfo objects."""
- info = interface.NodeInfo(full_name=self.logger.full_name, name=self.logger.name,
- port=self.port,
- pid=self.replicator.pid if self.replicator is not None else -1)
- return [info]
-
- def set_cli_options(self, cli_options):
- """Set command line options."""
- for option, value in cli_options.items():
- self.cli_options[option] = value
diff --git a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_data_consistency.py b/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_data_consistency.py
deleted file mode 100644
index d2623527f00..00000000000
--- a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_data_consistency.py
+++ /dev/null
@@ -1,23 +0,0 @@
-"""Test hook for verifying data consistency between two clusters in a cluster to cluster replication."""
-import copy
-import os.path
-
-from buildscripts.resmokelib.testing.hooks import jsfile
-
-
-class CheckClusterToClusterDataConsistency(jsfile.DataConsistencyHook):
- """Check if the two clusters have the same data.
-
- This includes metadata such as the shard key (where applicable), indexes, collection options,
- etc.
- """
-
- IS_BACKGROUND = False
-
- def __init__( # pylint: disable=super-init-not-called
- self, hook_logger, fixture, shell_options=None):
- """Initialize CheckClusterToClusterDataConsistency."""
- description = "Ensure that both clusters are data consistent by querying both clusters."
- js_filename = os.path.join("jstests", "hooks", "run_cluster_to_cluster_data_consistency.js")
- jsfile.JSHook.__init__( # pylint: disable=non-parent-init-called
- self, hook_logger, fixture, js_filename, description, shell_options=shell_options)
diff --git a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_dummy_replicator.py b/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_dummy_replicator.py
deleted file mode 100644
index 2eb17d9c6a0..00000000000
--- a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_dummy_replicator.py
+++ /dev/null
@@ -1,52 +0,0 @@
-"""Test implementation for running the dummy replicator on two clusters."""
-import copy
-import os.path
-
-from buildscripts.resmokelib.testing.hooks import jsfile
-from buildscripts.resmokelib.testing.hooks import replicator_interface
-
-
-class DummyReplicator(replicator_interface.ReplicatorInterface):
- """A dummy implementation of the replicator."""
-
- def __init__(self, logger, fixture):
- """Initialize the dummy replicator."""
- replicator_interface.ReplicatorInterface.__init__(self, logger, fixture)
-
- def start(self, start_options=None):
- """Start the dummy replicator."""
- self.logger.info("Starting the dummy replicator (NOOP).")
-
- def pause(self, pause_options=None):
- """Pause the dummy replicator."""
- self.logger.info("Pausing the dummy replicator (NOOP).")
-
- def resume(self, resume_options=None):
- """Resume the replicator."""
- self.logger.info("Resuming the dummy replicator (NOOP).")
-
- def stop(self, stop_options=None):
- """Stop the replicator."""
- # Since the dummy replicator doesn't run while the cluster is live, we run the dummy
- # replicator from start to finish here instead.
- self.logger.info("Stopping and synchronizing the dummy replicator.")
- replicator_runner = DummyReplicator._ClusterToClusterDummyReplicatorHook(
- self.logger, self._fixture, stop_options["shell_options"])
- replicator_runner.before_suite(stop_options["test_report"])
- replicator_runner.before_test(stop_options["test"], stop_options["test_report"])
- replicator_runner.after_test(stop_options["test"], stop_options["test_report"])
- replicator_runner.after_suite(stop_options["test_report"])
- self.logger.info("Stopped and synchronized the dummy replicator.")
-
- class _ClusterToClusterDummyReplicatorHook(jsfile.JSHook):
- """A hook that the DummyReplicator uses internally to copy documents from cluster to cluster."""
-
- IS_BACKGROUND = False
-
- def __init__( # pylint: disable=super-init-not-called
- self, hook_logger, fixture, shell_options=None):
- """Initialize ClusterToClusterDummyReplicator."""
- description = "Run the dummy cluster to cluster replicator between two clusters."
- js_filename = os.path.join("jstests", "hooks", "dummy_cluster_to_cluster_replicator.js")
- jsfile.JSHook.__init__( # pylint: disable=non-parent-init-called
- self, hook_logger, fixture, js_filename, description, shell_options=shell_options)
diff --git a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_kill_replicator.py b/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_kill_replicator.py
deleted file mode 100644
index 7ea363a3b09..00000000000
--- a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_kill_replicator.py
+++ /dev/null
@@ -1,26 +0,0 @@
-"""Test hook that continuously kills and restarts the replicator."""
-
-import random
-import time
-
-import buildscripts.resmokelib.testing.fixtures.interface as interface
-from buildscripts.resmokelib.testing.hooks import bghook
-
-
-class KillReplicator(bghook.BGHook):
- """A hook that kills the replicator process and restarts it."""
-
- def __init__(self, hook_logger, fixture, tests_per_cycle, min_sleep_secs=1, max_sleep_secs=3):
- """Initialize KillReplicator."""
- bghook.BGHook.__init__(self, hook_logger, fixture, "Kill replicator hook", tests_per_cycle)
- self.min_sleep_secs = min_sleep_secs
- self.max_sleep_secs = max_sleep_secs
-
- def run_action(self):
- """Sleep for a random amount of time, then kill and restart the replicator."""
- rand_sleep = random.uniform(self.min_sleep_secs, self.max_sleep_secs)
- self.logger.info("Sleeping for %.2f seconds before killing the replicator", rand_sleep)
- time.sleep(rand_sleep)
-
- self.fixture.replicator.stop(interface.TeardownMode.KILL)
- self.fixture.replicator.setup()
diff --git a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_replication.py b/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_replication.py
deleted file mode 100644
index 75207715089..00000000000
--- a/buildscripts/resmokelib/testing/hooks/cluster_to_cluster_replication.py
+++ /dev/null
@@ -1,149 +0,0 @@
-"""Test hook that runs cluster to cluster replications continuously."""
-
-import copy
-import math
-import random
-
-from buildscripts.resmokelib import config
-from buildscripts.resmokelib import errors
-from buildscripts.resmokelib.testing.fixtures import cluster_to_cluster
-from buildscripts.resmokelib.testing.hooks import interface
-from buildscripts.resmokelib.testing.hooks import cluster_to_cluster_data_consistency
-from buildscripts.resmokelib.testing.hooks import dbhash
-
-
-class ClusterToClusterReplication(interface.Hook): # pylint: disable=too-many-instance-attributes
- """Starts a cluster to cluster replication thread at the beginning of each test."""
-
- DESCRIPTION = ("Continuous cluster to cluster replications")
-
- IS_BACKGROUND = True
- # By default, we pause / stop the replicator at the end of the suite and then perform data
- # consistency checks.
- DEFAULT_TESTS_PER_CYCLE = math.inf
-
- def __init__(self, hook_logger, fixture, shell_options, tests_per_cycle=None,
- replicator_start_delay=None):
- """Initialize the ClusterToClusterReplication.
-
- Args:
- hook_logger: the logger instance for this hook.
- fixture: the target ClusterToCluster fixture containing two clusters.
- shell_options: contains the global_vars.
- """
- interface.Hook.__init__(self, hook_logger, fixture, ClusterToClusterReplication.DESCRIPTION)
-
- if not isinstance(fixture, cluster_to_cluster.ClusterToClusterFixture):
- raise ValueError(
- "The ClusterToClusterReplication hook requires a ClusterToClusterFixture")
- self._fixture = fixture
- self._shell_options = copy.deepcopy(shell_options)
-
- # The number of tests executing so far.
- self._test_num = 0
- # The number of tests we execute before running a data consistency check and restarting the
- # replicator.
- self._tests_per_cycle = self.DEFAULT_TESTS_PER_CYCLE if tests_per_cycle is None else tests_per_cycle
- # The replicator is not started until some number of tests are run first.
- self._replicator_start_delay = replicator_start_delay
- random.seed(config.RANDOM_SEED)
-
- # The last test executed so far.
- self._last_test = None
-
- self._replicator = self._fixture.replicator
-
- def before_suite(self, test_report):
- """Before suite."""
- if not self._fixture:
- raise ValueError("No ClusterToClusterFixture to run migrations on")
-
- if self._replicator_start_delay is None:
- if math.isinf(self._tests_per_cycle):
- self._replicator_start_delay = random.randint(0, 10)
- else:
- self._replicator_start_delay = random.randint(0, self._tests_per_cycle)
- self.logger.info("Starting the replicator after %d tests are run.",
- self._replicator_start_delay)
-
- def after_suite(self, test_report, teardown_flag=None):
- """After suite."""
- self.logger.info("Ran %d tests in total.", self._test_num)
- # Perform the following actions only if some tests have been run.
- if self._test_num % self._tests_per_cycle > self._replicator_start_delay:
- self._run_replicator_action(test_report, self._replicator.commit)
-
- self._run_data_consistency_check(self._last_test, test_report)
- self._run_check_repl_db_hash(self._last_test, test_report)
-
- def before_test(self, test, test_report):
- """Before test."""
- if self._test_num % self._tests_per_cycle == self._replicator_start_delay:
- self._run_replicator_action(test_report, self._replicator.start)
-
- def after_test(self, test, test_report):
- """After test."""
- self._test_num += 1
- self._last_test = test
-
- # Every 'n' tests, the replicator should be pause / stop the replicator and perform data
- # consistency checks.
- if self._test_num % self._tests_per_cycle == 0:
- if self._tests_per_cycle == self._replicator_start_delay:
- self._run_replicator_action(test_report, self._replicator.start)
-
- self._run_replicator_action(test_report, self._replicator.commit)
-
- self._run_data_consistency_check(test, test_report)
- self._run_check_repl_db_hash(test, test_report)
-
- def _run_data_consistency_check(self, test, test_report):
- """Run the data consistency check across both clusters."""
- # The TestData needs to be set to allow the data consistency hooks to run correctly.
- clusters = self._fixture.get_independent_clusters()
- source_url = clusters[self._fixture.source_cluster_index].get_driver_connection_url()
- dest_url = clusters[1 - self._fixture.source_cluster_index].get_driver_connection_url()
-
- shell_options = copy.deepcopy(self._shell_options)
- shell_options["global_vars"]["TestData"]["sourceConnectionString"] = source_url
- shell_options["global_vars"]["TestData"]["destinationConnectionString"] = dest_url
-
- data_consistency = cluster_to_cluster_data_consistency.CheckClusterToClusterDataConsistency(
- self.logger, self._fixture, shell_options)
- data_consistency.before_suite(test_report)
- data_consistency.before_test(test, test_report)
- data_consistency.after_test(test, test_report)
- data_consistency.after_suite(test_report)
-
- def _run_check_repl_db_hash(self, test, test_report):
- """Check the repl DB hash on each cluster."""
- check_db_hash = dbhash.CheckReplDBHash(self.logger, self._fixture, self._shell_options)
- check_db_hash.before_suite(test_report)
- check_db_hash.before_test(test, test_report)
- check_db_hash.after_test(test, test_report)
- check_db_hash.after_suite(test_report)
-
- def _run_replicator_action(self, test_report, action):
- self.logger.info(f"Running replicator action: {action.__name__}")
- replicator_action_case = _ReplicatorActionTestCase(self.logger, self._last_test, self,
- action)
- replicator_action_case.run_dynamic_test(test_report)
- self.logger.info(f"Ran replicator action: {action.__name__}")
-
-
-class _ReplicatorActionTestCase(interface.DynamicTestCase):
- """_ReplicatorActionTestCase class, to run a replicator action as a test."""
-
- def __init__( # pylint: disable=too-many-arguments
- self, logger, base_test_name, hook, action):
- """Initialize _ReplicatorActionTestCase."""
- interface.DynamicTestCase.__init__(self, logger, f"replicator_action:{action.__name__}",
- "Run a replicator action.", base_test_name, hook)
- self._action = action
-
- def run_test(self):
- try:
- self._action()
- except:
- self.logger.exception("Failed to run replicator action '%s'.", self._action)
- raise
diff --git a/buildscripts/resmokelib/testing/hooks/replicator_interface.py b/buildscripts/resmokelib/testing/hooks/replicator_interface.py
deleted file mode 100644
index 6747aa66836..00000000000
--- a/buildscripts/resmokelib/testing/hooks/replicator_interface.py
+++ /dev/null
@@ -1,39 +0,0 @@
-"""An interface to control a replicator in our test infrastructure."""
-
-
-class ReplicatorInterface(object):
- """A replicator interface.
-
- Provides a way to start, pause, resume and stop the replicator.
- """
-
- def __init__(self, logger, fixture):
- """Initialize the replicator."""
- self.logger = logger
- self._fixture = fixture
-
- def start(self, start_options):
- """Start the replicator.
-
- This method will return after the replicator has been started.
- """
-
- def stop(self, stop_options):
- """Stop the replicator.
-
- This method will return after the replicator has been stopped, without requiring the
- replicator to have finished synchronizing across clusters.
- """
-
- def pause(self, pause_options):
- """Pause the replicator.
-
- This method will return after the replicator has been paused. This method requires that the
- replicator has completed synchronizing across clusters.
- """
-
- def resume(self, resume_options):
- """Resume the replicator.
-
- This method will return after the replicator has been resumed
- """
diff --git a/buildscripts/resmokelib/testing/hooks/stepdown.py b/buildscripts/resmokelib/testing/hooks/stepdown.py
index 91f0383b029..3199ea11109 100644
--- a/buildscripts/resmokelib/testing/hooks/stepdown.py
+++ b/buildscripts/resmokelib/testing/hooks/stepdown.py
@@ -14,7 +14,6 @@ from buildscripts.resmokelib.testing.fixtures import interface as fixture_interf
from buildscripts.resmokelib.testing.fixtures import replicaset
from buildscripts.resmokelib.testing.fixtures import shardedcluster
from buildscripts.resmokelib.testing.fixtures import tenant_migration
-from buildscripts.resmokelib.testing.fixtures import cluster_to_cluster
from buildscripts.resmokelib.testing.hooks import interface
@@ -26,6 +25,9 @@ class ContinuousStepdown(interface.Hook): # pylint: disable=too-many-instance-a
IS_BACKGROUND = True
+ # The hook stops the fixture partially during its execution.
+ STOPS_FIXTURE = True
+
def __init__( # pylint: disable=too-many-arguments
self, hook_logger, fixture, config_stepdown=True, shard_stepdown=True,
stepdown_interval_ms=8000, terminate=False, kill=False,
@@ -138,8 +140,8 @@ class ContinuousStepdown(interface.Hook): # pylint: disable=too-many-instance-a
for rs_fixture in fixture.get_replsets():
self._rs_fixtures.append(rs_fixture)
- elif isinstance(fixture, cluster_to_cluster.ClusterToClusterFixture):
- # Recursively call _add_fixture on the source and destination clusters.
+ elif isinstance(fixture, fixture_interface.MultiClusterFixture):
+ # Recursively call _add_fixture on all the independent clusters.
for cluster_fixture in fixture.get_independent_clusters():
self._add_fixture(cluster_fixture)
diff --git a/buildscripts/resmokelib/testing/job.py b/buildscripts/resmokelib/testing/job.py
index 1c3be065868..ba9f54e6912 100644
--- a/buildscripts/resmokelib/testing/job.py
+++ b/buildscripts/resmokelib/testing/job.py
@@ -8,8 +8,6 @@ from buildscripts.resmokelib import config
from buildscripts.resmokelib import errors
from buildscripts.resmokelib.testing import testcases
from buildscripts.resmokelib.testing.fixtures.interface import create_fixture_table
-from buildscripts.resmokelib.testing.hooks import stepdown
-from buildscripts.resmokelib.testing.hooks import cluster_to_cluster_kill_replicator
from buildscripts.resmokelib.testing.testcases import fixture as _fixture
from buildscripts.resmokelib.utils import queue as _queue
@@ -35,9 +33,7 @@ class Job(object): # pylint: disable=too-many-instance-attributes
# expected, there is a race where fixture.is_running() could fail if called after the
# primary was killed but before it was restarted.
self._check_if_fixture_running = not any(
- isinstance(hook, (stepdown.ContinuousStepdown,
- cluster_to_cluster_kill_replicator.KillReplicator))
- for hook in self.hooks)
+ hasattr(hook, "STOPS_FIXTURE") and hook.STOPS_FIXTURE for hook in self.hooks)
@property
def job_num(self):