summaryrefslogtreecommitdiff
path: root/buildscripts
diff options
context:
space:
mode:
authorHuayu Ouyang <huayu.ouyang@mongodb.com>2022-03-25 22:55:19 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-25 23:43:40 +0000
commitb253651b0d72ac2d41cae553ba819b731a275b18 (patch)
tree02b81d191e12b2c2d7ce0d68f8e138d5f0a467de /buildscripts
parent26838030f0aedd4437effcb0418e5c4d6c53f538 (diff)
downloadmongo-b253651b0d72ac2d41cae553ba819b731a275b18.tar.gz
SERVER-64445 Create MultipleReplicatorFixture
Diffstat (limited to 'buildscripts')
-rw-r--r--buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py37
-rw-r--r--buildscripts/resmokelib/testing/fixtures/multiple_replicator.py139
-rw-r--r--buildscripts/resmokelib/testing/fixtures/replicator.py2
-rw-r--r--buildscripts/resmokelib/testing/fixtures/shardedcluster.py7
4 files changed, 177 insertions, 8 deletions
diff --git a/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py b/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py
index 93a3a8cbce3..fbc77d03944 100644
--- a/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py
+++ b/buildscripts/resmokelib/testing/fixtures/cluster_to_cluster.py
@@ -59,22 +59,26 @@ class ClusterToClusterFixture(interface.MultiClusterFixture): # pylint: disable
# The cluster that starts off with the data.
self.source_cluster_index = 0
+ self.replicator_options = replicator_options
for cluster_options in self.both_cluster_options:
+ if replicator_options["class"] == "MultipleReplicatorFixture":
+ if cluster_options["class"] != "ShardedClusterFixture":
+ raise ValueError(
+ "MultipleReplicatorFixture can only be run with ShardedClusterFixture")
cluster = self.fixturelib.make_fixture(cluster_options["class"], self.logger,
self.job_num, **cluster_options["settings"])
self.clusters.append(cluster)
- self.replicator_options = replicator_options
replicator_logger = self.fixturelib.new_fixture_node_logger(replicator_options["class"],
self.job_num, "replicator")
+
self.replicator = self.fixturelib.make_fixture(replicator_options["class"],
replicator_logger, self.job_num,
- **replicator_options["settings"])
+ **self.replicator_options["settings"])
def setup(self):
"""Set up the cluster to cluster fixture according to the options provided."""
-
for i, cluster in enumerate(self.clusters):
self.logger.info(f"Setting up cluster {i}.")
cluster.setup()
@@ -85,6 +89,19 @@ class ClusterToClusterFixture(interface.MultiClusterFixture): # pylint: disable
source_url, dest_url)
self.replicator.set_cli_options({'cluster0': source_url, 'cluster1': dest_url})
+ # If we are using multiple replicators, we must get the list of shard ids from the source
+ # cluster and pass the ids as CLI options to each replicator.
+ if self.replicator_options["class"] == "MultipleReplicatorFixture":
+ # Wait for the source cluster to be fully running
+ self.clusters[self.source_cluster_index].await_ready()
+ try:
+ shard_ids = self.clusters[self.source_cluster_index].get_shard_ids()
+ except Exception as err:
+ msg = f"Error getting shard ids from source cluster: {err}"
+ self.logger.exception(msg)
+ raise self.fixturelib.ServerFailure(msg)
+ self.replicator.set_shard_ids(shard_ids)
+
self.replicator.setup()
def pids(self):
@@ -104,9 +121,14 @@ class ClusterToClusterFixture(interface.MultiClusterFixture): # pylint: disable
def await_ready(self):
"""Block until the fixture can be used for testing."""
- # Wait for each of the clusters and the replicator.
- for cluster in self.clusters:
- cluster.await_ready()
+ if self.replicator_options["class"] == "MultipleReplicatorFixture":
+ # We only need to call await_ready() on the dest cluster since await_ready() was already
+ # called on the source cluster in setup().
+ self.clusters[1 - self.source_cluster_index].await_ready()
+ else:
+ for cluster in self.clusters:
+ cluster.await_ready()
+
self.replicator.await_ready()
def _do_teardown(self, mode=None):
@@ -114,7 +136,8 @@ class ClusterToClusterFixture(interface.MultiClusterFixture): # pylint: disable
running_at_start = self.is_running()
if not running_at_start:
self.logger.warning(
- "All clusters and replicators were expected to be running, but weren't.")
+ "All clusters and replicators were expected to be running before teardown, but weren't."
+ )
teardown_handler = interface.FixtureTeardownHandler(self.logger)
diff --git a/buildscripts/resmokelib/testing/fixtures/multiple_replicator.py b/buildscripts/resmokelib/testing/fixtures/multiple_replicator.py
new file mode 100644
index 00000000000..16e48e0c5e0
--- /dev/null
+++ b/buildscripts/resmokelib/testing/fixtures/multiple_replicator.py
@@ -0,0 +1,139 @@
+"""Multiple replicator fixture to handle launching and stopping multiple replicator binaries."""
+
+import buildscripts.resmokelib.testing.fixtures.interface as interface
+
+
+class MultipleReplicatorFixture(interface.Fixture):
+ """Fixture which spins up multiple replicators."""
+
+ REGISTERED_NAME = "MultipleReplicatorFixture"
+
+ def __init__( # pylint: disable=too-many-arguments
+ self, logger, job_num, fixturelib, executable, quiesce_period=5, cli_options=None,
+ num_replicators=2, shard_ids=None):
+ """Initialize ReplicatorFixture with different options for the replicator process."""
+ interface.Fixture.__init__(self, logger, job_num, fixturelib)
+
+ self.executable = executable
+ self.quiesce_period = quiesce_period
+ self.cli_options = self.fixturelib.default_if_none(cli_options, {})
+ self.shard_ids = self.fixturelib.default_if_none(shard_ids, [])
+ self.num_replicators = num_replicators
+
+ # The running replicator processes.
+ self.replicators = []
+
+ if num_replicators < 2:
+ raise ValueError("The MultipleReplicatorFixture requires at least 2 replicators")
+
+ for i in range(num_replicators):
+ individual_replicator_logger = self.fixturelib.new_fixture_node_logger(
+ "MultipleReplicatorFixture", self.job_num, f"replicator{i}")
+ replicator = self.fixturelib.make_fixture(
+ "ReplicatorFixture", individual_replicator_logger, self.job_num,
+ executable=executable, quiesce_period=quiesce_period, cli_options=cli_options)
+ self.replicators.append(replicator)
+
+ def setup(self):
+ """
+ Set up the multiple replicators.
+
+ For each replicator set the shard ID and call setup, which launches the replicator process.
+ """
+ if self.num_replicators != len(self.shard_ids):
+ raise ValueError(
+ "The number of replicators must match the number of shard ids provided.")
+
+ for i, rep in enumerate(self.replicators):
+ rep.set_cli_options({"id": self.shard_ids[i]})
+ rep.setup()
+
+ def pids(self):
+ """:return: pids owned by this fixture if any."""
+ pids = []
+ for rep in self.replicators:
+ pids.extend(rep.pids())
+ if len(pids) == 0:
+ self.logger.debug('Replicators not running when gathering replicator fixture pids.')
+ return pids
+
+ def await_ready(self):
+ """
+ Block until the fixture can be used for testing.
+
+ For each replicator, call await_ready().
+ """
+ for rep in self.replicators:
+ rep.await_ready()
+
+ def start(self):
+ """Start the replication process by sending the replicator a command."""
+ self.logger.info("Starting multiple replicators...\n")
+ for i, rep in enumerate(self.replicators):
+ self.logger.info("Starting replicator #%d...", i)
+ rep.start()
+
+ def commit(self):
+ """Commit the migration. This currently will just sleep for a quiesce period."""
+ for rep in self.replicators:
+ rep.commit()
+
+ def stop(self, mode=None):
+ """Stop the replicator binary."""
+ self.logger.info("Stopping multiple replicators...\n")
+ for i, rep in enumerate(self.replicators):
+ try:
+ self.logger.info("Stopping replicator #%d...", i)
+ rep.stop(mode)
+ except Exception as err:
+ msg = f"Error stopping replicator #{i}: {err}"
+ self.logger.exception(msg)
+ raise self.fixturelib.ServerFailure(msg)
+
+ def resume(self):
+ """NOOP."""
+ pass
+
+ def pause(self):
+ """NOOP."""
+ pass
+
+ def _do_teardown(self, mode=None):
+ """Teardown the fixture."""
+ if not self.is_any_process_running():
+ self.logger.info("All replicators already stopped; teardown is a NOOP.")
+ return
+
+ self.logger.warning("The replicators had not been stopped at the time of teardown.")
+ self.stop(mode)
+
+ def is_any_process_running(self):
+ """Return true if any of the replicator binaries is running as a process."""
+ return any(rep.is_running() for rep in self.replicators)
+
+ def is_running(self):
+ """Return true if all of the individual replicator fixtures are running and have not errorred."""
+ return all(rep.is_running() for rep in self.replicators)
+
+ def get_internal_connection_string(self):
+ """Return the internal connection string."""
+ raise NotImplementedError("Multiple replicator cannot have internal connection strings.")
+
+ def get_driver_connection_url(self):
+ """Return the driver connection URL."""
+ raise NotImplementedError("Multiple replicator cannot have driver connection URLs.")
+
+ def get_node_info(self):
+ """Return a list of NodeInfo objects."""
+ return [rep.get_node_info()[0] for rep in self.replicators]
+
+ def set_cli_options(self, cli_options):
+ """Set command line options."""
+ for option, value in cli_options.items():
+ self.cli_options[option] = value
+ for rep in self.replicators:
+ rep.set_cli_options(cli_options)
+
+ def set_shard_ids(self, shard_ids):
+ """Set the list of shard ids."""
+ self.shard_ids = shard_ids
diff --git a/buildscripts/resmokelib/testing/fixtures/replicator.py b/buildscripts/resmokelib/testing/fixtures/replicator.py
index b4a6e19aae5..b0fa9c92401 100644
--- a/buildscripts/resmokelib/testing/fixtures/replicator.py
+++ b/buildscripts/resmokelib/testing/fixtures/replicator.py
@@ -41,7 +41,7 @@ class ReplicatorFixture(interface.Fixture):
"""
Block until the fixture can be used for testing.
- NOOP by default since on `setup` nothing is done.
+ NOOP since the binary is fully launched in `setup`.
"""
pass
diff --git a/buildscripts/resmokelib/testing/fixtures/shardedcluster.py b/buildscripts/resmokelib/testing/fixtures/shardedcluster.py
index 0e830dfae19..75b49110900 100644
--- a/buildscripts/resmokelib/testing/fixtures/shardedcluster.py
+++ b/buildscripts/resmokelib/testing/fixtures/shardedcluster.py
@@ -108,6 +108,13 @@ class ShardedClusterFixture(interface.Fixture): # pylint: disable=too-many-inst
"command and continuing to wait")
target.await_last_op_committed(target.AWAIT_REPL_TIMEOUT_FOREVER_MINS * 60)
+ def get_shard_ids(self):
+ """Get the list of shard ids in the cluster."""
+ client = self.mongo_client()
+ interface.authenticate(client, self.auth_options)
+ res = client.admin.command("listShards")
+ return [shard_info["_id"] for shard_info in res["shards"]]
+
def await_ready(self):
"""Block until the fixture can be used for testing."""
# Wait for the config server