diff options
author | Huayu Ouyang <huayu.ouyang@mongodb.com> | 2022-03-25 22:55:19 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-25 23:43:40 +0000 |
commit | b253651b0d72ac2d41cae553ba819b731a275b18 (patch) | |
tree | 02b81d191e12b2c2d7ce0d68f8e138d5f0a467de /buildscripts | |
parent | 26838030f0aedd4437effcb0418e5c4d6c53f538 (diff) | |
download | mongo-b253651b0d72ac2d41cae553ba819b731a275b18.tar.gz |
SERVER-64445 Create MultipleReplicatorFixture
Diffstat (limited to 'buildscripts')
4 files changed, 177 insertions, 8 deletions
diff --git a/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py b/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py index 93a3a8cbce3..fbc77d03944 100644 --- a/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py +++ b/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py @@ -59,22 +59,26 @@ class ClusterToClusterFixture(interface.MultiClusterFixture): # pylint: disable # The cluster that starts off with the data. self.source_cluster_index = 0 + self.replicator_options = replicator_options for cluster_options in self.both_cluster_options: + if replicator_options["class"] == "MultipleReplicatorFixture": + if cluster_options["class"] != "ShardedClusterFixture": + raise ValueError( + "MultipleReplicatorFixture can only be run with ShardedClusterFixture") cluster = self.fixturelib.make_fixture(cluster_options["class"], self.logger, self.job_num, **cluster_options["settings"]) self.clusters.append(cluster) - self.replicator_options = replicator_options replicator_logger = self.fixturelib.new_fixture_node_logger(replicator_options["class"], self.job_num, "replicator") + self.replicator = self.fixturelib.make_fixture(replicator_options["class"], replicator_logger, self.job_num, - **replicator_options["settings"]) + **self.replicator_options["settings"]) def setup(self): """Set up the cluster to cluster fixture according to the options provided.""" - for i, cluster in enumerate(self.clusters): self.logger.info(f"Setting up cluster {i}.") cluster.setup() @@ -85,6 +89,19 @@ class ClusterToClusterFixture(interface.MultiClusterFixture): # pylint: disable source_url, dest_url) self.replicator.set_cli_options({'cluster0': source_url, 'cluster1': dest_url}) + # If we are using multiple replicators, we must get the list of shard ids from the source + # cluster and pass the ids as CLI options to each replicator. + if self.replicator_options["class"] == "MultipleReplicatorFixture": + # Wait for the source cluster to be fully running + self.clusters[self.source_cluster_index].await_ready() + try: + shard_ids = self.clusters[self.source_cluster_index].get_shard_ids() + except Exception as err: + msg = f"Error getting shard ids from source cluster: {err}" + self.logger.exception(msg) + raise self.fixturelib.ServerFailure(msg) + self.replicator.set_shard_ids(shard_ids) + self.replicator.setup() def pids(self): @@ -104,9 +121,14 @@ class ClusterToClusterFixture(interface.MultiClusterFixture): # pylint: disable def await_ready(self): """Block until the fixture can be used for testing.""" - # Wait for each of the clusters and the replicator. - for cluster in self.clusters: - cluster.await_ready() + if self.replicator_options["class"] == "MultipleReplicatorFixture": + # We only need to call await_ready() on the dest cluster since await_ready() was already + # called on the source cluster in setup(). + self.clusters[1 - self.source_cluster_index].await_ready() + else: + for cluster in self.clusters: + cluster.await_ready() + self.replicator.await_ready() def _do_teardown(self, mode=None): @@ -114,7 +136,8 @@ class ClusterToClusterFixture(interface.MultiClusterFixture): # pylint: disable running_at_start = self.is_running() if not running_at_start: self.logger.warning( - "All clusters and replicators were expected to be running, but weren't.") + "All clusters and replicators were expected to be running before teardown, but weren't." + ) teardown_handler = interface.FixtureTeardownHandler(self.logger) diff --git a/buildscripts/resmokelib/testing/fixtures/multiple_replicator.py b/buildscripts/resmokelib/testing/fixtures/multiple_replicator.py new file mode 100644 index 00000000000..16e48e0c5e0 --- /dev/null +++ b/buildscripts/resmokelib/testing/fixtures/multiple_replicator.py @@ -0,0 +1,139 @@ +"""Multiple replicator fixture to handle launching and stopping multiple replicator binaries.""" + +import buildscripts.resmokelib.testing.fixtures.interface as interface + + +class MultipleReplicatorFixture(interface.Fixture): + """Fixture which spins up multiple replicators.""" + + REGISTERED_NAME = "MultipleReplicatorFixture" + + def __init__( # pylint: disable=too-many-arguments + self, logger, job_num, fixturelib, executable, quiesce_period=5, cli_options=None, + num_replicators=2, shard_ids=None): + """Initialize ReplicatorFixture with different options for the replicator process.""" + interface.Fixture.__init__(self, logger, job_num, fixturelib) + + self.executable = executable + self.quiesce_period = quiesce_period + self.cli_options = self.fixturelib.default_if_none(cli_options, {}) + self.shard_ids = self.fixturelib.default_if_none(shard_ids, []) + self.num_replicators = num_replicators + + # The running replicator processes. + self.replicators = [] + + if num_replicators < 2: + raise ValueError("The MultipleReplicatorFixture requires at least 2 replicators") + + for i in range(num_replicators): + individual_replicator_logger = self.fixturelib.new_fixture_node_logger( + "MultipleReplicatorFixture", self.job_num, f"replicator{i}") + replicator = self.fixturelib.make_fixture( + "ReplicatorFixture", individual_replicator_logger, self.job_num, + executable=executable, quiesce_period=quiesce_period, cli_options=cli_options) + self.replicators.append(replicator) + + def setup(self): + """ + Set up the multiple replicators. + + For each replicator set the shard ID and call setup, which launches the replicator process. + """ + if self.num_replicators != len(self.shard_ids): + raise ValueError( + "The number of replicators must match the number of shard ids provided.") + + for i, rep in enumerate(self.replicators): + rep.set_cli_options({"id": self.shard_ids[i]}) + rep.setup() + + def pids(self): + """:return: pids owned by this fixture if any.""" + pids = [] + for rep in self.replicators: + pids.extend(rep.pids()) + if len(pids) == 0: + self.logger.debug('Replicators not running when gathering replicator fixture pids.') + return pids + + def await_ready(self): + """ + Block until the fixture can be used for testing. + + For each replicator, call await_ready(). + """ + for rep in self.replicators: + rep.await_ready() + + def start(self): + """Start the replication process by sending the replicator a command.""" + self.logger.info("Starting multiple replicators...\n") + for i, rep in enumerate(self.replicators): + self.logger.info("Starting replicator #%d...", i) + rep.start() + + def commit(self): + """Commit the migration. This currently will just sleep for a quiesce period.""" + for rep in self.replicators: + rep.commit() + + def stop(self, mode=None): + """Stop the replicator binary.""" + self.logger.info("Stopping multiple replicators...\n") + for i, rep in enumerate(self.replicators): + try: + self.logger.info("Stopping replicator #%d...", i) + rep.stop(mode) + except Exception as err: + msg = f"Error stopping replicator #{i}: {err}" + self.logger.exception(msg) + raise self.fixturelib.ServerFailure(msg) + + def resume(self): + """NOOP.""" + pass + + def pause(self): + """NOOP.""" + pass + + def _do_teardown(self, mode=None): + """Teardown the fixture.""" + if not self.is_any_process_running(): + self.logger.info("All replicators already stopped; teardown is a NOOP.") + return + + self.logger.warning("The replicators had not been stopped at the time of teardown.") + self.stop(mode) + + def is_any_process_running(self): + """Return true if any of the replicator binaries is running as a process.""" + return any(rep.is_running() for rep in self.replicators) + + def is_running(self): + """Return true if all of the individual replicator fixtures are running and have not errorred.""" + return all(rep.is_running() for rep in self.replicators) + + def get_internal_connection_string(self): + """Return the internal connection string.""" + raise NotImplementedError("Multiple replicator cannot have internal connection strings.") + + def get_driver_connection_url(self): + """Return the driver connection URL.""" + raise NotImplementedError("Multiple replicator cannot have driver connection URLs.") + + def get_node_info(self): + """Return a list of NodeInfo objects.""" + return [rep.get_node_info()[0] for rep in self.replicators] + + def set_cli_options(self, cli_options): + """Set command line options.""" + for option, value in cli_options.items(): + self.cli_options[option] = value + for rep in self.replicators: + rep.set_cli_options(cli_options) + + def set_shard_ids(self, shard_ids): + """Set the list of shard ids.""" + self.shard_ids = shard_ids diff --git a/buildscripts/resmokelib/testing/fixtures/replicator.py b/buildscripts/resmokelib/testing/fixtures/replicator.py index b4a6e19aae5..b0fa9c92401 100644 --- a/buildscripts/resmokelib/testing/fixtures/replicator.py +++ b/buildscripts/resmokelib/testing/fixtures/replicator.py @@ -41,7 +41,7 @@ class ReplicatorFixture(interface.Fixture): """ Block until the fixture can be used for testing. - NOOP by default since on `setup` nothing is done. + NOOP since the binary is fully launched in `setup`. """ pass diff --git a/buildscripts/resmokelib/testing/fixtures/shardedcluster.py b/buildscripts/resmokelib/testing/fixtures/shardedcluster.py index 0e830dfae19..75b49110900 100644 --- a/buildscripts/resmokelib/testing/fixtures/shardedcluster.py +++ b/buildscripts/resmokelib/testing/fixtures/shardedcluster.py @@ -108,6 +108,13 @@ class ShardedClusterFixture(interface.Fixture): # pylint: disable=too-many-inst "command and continuing to wait") target.await_last_op_committed(target.AWAIT_REPL_TIMEOUT_FOREVER_MINS * 60) + def get_shard_ids(self): + """Get the list of shard ids in the cluster.""" + client = self.mongo_client() + interface.authenticate(client, self.auth_options) + res = client.admin.command("listShards") + return [shard_info["_id"] for shard_info in res["shards"]] + def await_ready(self): """Block until the fixture can be used for testing.""" # Wait for the config server |