summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Broadstone <mbroadst@mongodb.com>2022-04-15 21:58:44 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-15 22:33:51 +0000
commitd9f29d10916539fc25cfa03731850332866ce42c (patch)
tree7d152f53c7793b1647653c737038f668a881bd32
parent5f3e71f07128b9007032483210ae3b9309a2b4f6 (diff)
downloadmongo-d9f29d10916539fc25cfa03731850332866ce42c.tar.gz
SERVER-63093 Introduce a vanilla passthrough for shard split
-rw-r--r--buildscripts/resmokeconfig/suites/shard_split_jscore_passthrough.yml104
-rw-r--r--buildscripts/resmokelib/testing/fixtures/_builder.py13
-rw-r--r--buildscripts/resmokelib/testing/fixtures/shard_split.py317
-rw-r--r--buildscripts/resmokelib/testing/hooks/shard_split.py486
-rw-r--r--etc/evergreen.yml3
-rw-r--r--etc/evergreen_yml_components/definitions.yml9
6 files changed, 930 insertions, 2 deletions
diff --git a/buildscripts/resmokeconfig/suites/shard_split_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/shard_split_jscore_passthrough.yml
new file mode 100644
index 00000000000..8b360ca7e40
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/shard_split_jscore_passthrough.yml
@@ -0,0 +1,104 @@
+config_variables:
+- &keyFile jstests/libs/authTestsKey
+- &keyFileData Thiskeyisonlyforrunningthesuitewithauthenticationdontuseitinanytestsdirectly
+- &authOptions
+ authenticationDatabase: admin
+ authenticationMechanism: SCRAM-SHA-256
+ password: *keyFileData
+ username: __system
+
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/core/**/*.js
+ exclude_files:
+ - jstests/core/txns/**/*.js
+ # Skip any tests that run with auth explicitly.
+ - jstests/core/*[aA]uth*.js
+
+ exclude_with_any_tags:
+ - assumes_standalone_mongod
+ # These tests run getMore commands which are not supported in the tenant migration passthrough.
+ - requires_getmore
+ # Due to background tenant migrations, operations in the main test shell are not guaranteed to
+ # be causally consistent with operations in a parallel shell. The reason is that
+ # TenantMigrationCommitted error is only thrown when the client does a write or a atClusterTime/
+ # afterClusterTime or linearlizable read. Therefore, one of shell may not be aware that the
+ # migration has occurred and would not forward the read/write command to the right replica set.
+ - uses_parallel_shell
+ # Profile settings are stored in-memory only so are not transferred to the recipient.
+ - requires_profiling
+ # capped collections are banned in Serverless
+ - requires_capped
+ # emptycapped command is blocked during tenant migration.
+ - requires_emptycapped
+ # Multi-updates that conflict with tenant migration are not retried by inject_tenant_prefix.js.
+ - requires_multi_updates
+ # Full validation can cause ongoing queries to fail. This can affect the tenant migration
+ # process.
+ - uses_full_validation
+ - tenant_migration_incompatible
+
+executor:
+ archive:
+ tests: true
+ hooks:
+ - CheckReplDBHash
+ - CheckReplOplogs
+ - ValidateCollections
+
+ config:
+ shell_options:
+ eval: >-
+ testingReplication = true;
+ jsTest.authenticate(db.getMongo());
+ global_vars:
+ TestData: &TestData
+ tenantIds: ["tenantA", "tenantB"]
+ auth: true
+ authMechanism: SCRAM-SHA-256
+ keyFile: *keyFile
+ keyFileData: *keyFileData
+ roleGraphInvalidationIsFatal: true
+ hooks:
+ # Hooks are set up in the order listed here, and torn down in the same order (not as a stack).
+ # The ContinuousShardSplit hook must come before CleanEveryN so that the ShardSplitFixture's list
+ # of nodes can't be modified while CleanEveryN is running
+ - class: ContinuousShardSplit
+ shell_options:
+ global_vars:
+ TestData:
+ <<: *TestData
+ authOptions: *authOptions
+ # The CheckReplDBHash hook waits until all operations have replicated to and have been applied
+ # on the secondaries, so we run the ValidateCollections hook after it to ensure we're
+ # validating the entire contents of the collection.
+ - class: CheckReplOplogs
+ shell_options:
+ global_vars:
+ TestData: *TestData
+ - class: CheckReplDBHash
+ shell_options:
+ global_vars:
+ TestData: *TestData
+ - class: ValidateCollections
+ shell_options:
+ global_vars:
+ TestData: *TestData
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ShardSplitFixture
+ common_mongod_options:
+ set_parameters:
+ enableTestCommands: 1
+ # To avoid routing commands in each test incorrectly, the ContinuousTenantMigration hook
+ # only runs donorForgetMigration against the donor of each migration when it is safe to do
+ # so. Therefore, the garbage collection delay doesn't need to be large.
+ shardSplitGarbageCollectionDelayMS: 1
+ ttlMonitorSleepSecs: 1
+ auth: ''
+ keyFile: *keyFile
+ num_nodes_per_replica_set: 3
+ auth_options: *authOptions
diff --git a/buildscripts/resmokelib/testing/fixtures/_builder.py b/buildscripts/resmokelib/testing/fixtures/_builder.py
index e9197d9775b..53ffc538ef4 100644
--- a/buildscripts/resmokelib/testing/fixtures/_builder.py
+++ b/buildscripts/resmokelib/testing/fixtures/_builder.py
@@ -54,7 +54,7 @@ class FixtureBuilder(ABC, metaclass=registry.make_registry_metaclass(_BUILDERS,
REGISTERED_NAME = "Builder"
@abstractmethod
- def build_fixture(self, logger, job_num, fixturelib, *args, **kwargs):
+ def build_fixture(self, logger, job_num, fixturelib, *args, existing_nodes=None, **kwargs):
"""Abstract method to build a fixture."""
return
@@ -73,7 +73,7 @@ class ReplSetBuilder(FixtureBuilder):
latest_class = "MongoDFixture"
multiversion_class_suffix = "_multiversion_class_suffix"
- def build_fixture(self, logger, job_num, fixturelib, *args, **kwargs): # pylint: disable=too-many-locals
+ def build_fixture(self, logger, job_num, fixturelib, *args, existing_nodes=None, **kwargs): # pylint: disable=too-many-locals
"""Build a replica set."""
# We hijack the mixed_bin_versions passed to the fixture.
mixed_bin_versions = kwargs.pop("mixed_bin_versions", config.MIXED_BIN_VERSIONS)
@@ -152,6 +152,15 @@ class ReplSetBuilder(FixtureBuilder):
replset = _FIXTURES[self.REGISTERED_NAME](logger, job_num, fixturelib, *args, **kwargs)
replset.set_fcv(fcv)
+
+ # Don't build new nodes if existing nodes are provided.
+ if existing_nodes:
+ # Rename the logger to improve readability when printing node info maps
+ for idx, node in enumerate(existing_nodes):
+ node.logger = replset.get_logger_for_mongod(idx)
+ replset.install_mongod(node)
+ return replset
+
for node_index in range(replset.num_nodes):
node = self._new_mongod(replset, node_index, executables, classes,
mongod_binary_versions[node_index], is_multiversion)
diff --git a/buildscripts/resmokelib/testing/fixtures/shard_split.py b/buildscripts/resmokelib/testing/fixtures/shard_split.py
new file mode 100644
index 00000000000..3bb48d71162
--- /dev/null
+++ b/buildscripts/resmokelib/testing/fixtures/shard_split.py
@@ -0,0 +1,317 @@
+"""Fixture for testing shard split operations."""
+
+import time
+import os.path
+import threading
+
+import pymongo
+from bson.objectid import ObjectId
+
+import buildscripts.resmokelib.testing.fixtures.interface as interface
+from buildscripts.resmokelib.testing.fixtures import replicaset
+from buildscripts.resmokelib.core import network
+
+
+class ShardSplitFixture(interface.MultiClusterFixture): # pylint: disable=too-many-instance-attributes
+ """Fixture which provides JSTests with a replica set and recipient nodes to run splits against."""
+
+ AWAIT_REPL_TIMEOUT_MINS = 5
+ AWAIT_REPL_TIMEOUT_FOREVER_MINS = 24 * 60
+
+ def __init__( # pylint: disable=too-many-arguments,too-many-locals
+ self,
+ logger,
+ job_num,
+ fixturelib,
+ common_mongod_options=None,
+ per_mongod_options=None,
+ dbpath_prefix=None,
+ preserve_dbpath=False,
+ num_nodes_per_replica_set=2,
+ auth_options=None,
+ replset_config_options=None,
+ mixed_bin_versions=None,
+ ):
+ """Initialize ShardSplitFixture with different options for the replica set processes."""
+ interface.MultiClusterFixture.__init__(self, logger, job_num, fixturelib,
+ dbpath_prefix=dbpath_prefix)
+ self.__lock = threading.Lock()
+
+ self.common_mongod_options = self.fixturelib.default_if_none(common_mongod_options, {})
+ self.per_mongod_options = self.fixturelib.default_if_none(per_mongod_options, {})
+ self.dbpath_prefix = dbpath_prefix
+ self.preserve_dbpath = preserve_dbpath
+ self.auth_options = auth_options
+ self.replset_config_options = self.fixturelib.default_if_none(replset_config_options, {})
+ self.mixed_bin_versions = self.fixturelib.default_if_none(mixed_bin_versions,
+ self.config.MIXED_BIN_VERSIONS)
+ self.num_nodes_per_replica_set = num_nodes_per_replica_set if num_nodes_per_replica_set \
+ else self.config.NUM_REPLSET_NODES
+
+ self.fixtures = []
+
+ # Make the initial donor replica set
+ donor_rs_name = "rs0"
+ mongod_options = self.common_mongod_options.copy()
+ mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, donor_rs_name)
+ mongod_options["serverless"] = True
+
+ # The default `electionTimeoutMillis` on evergreen is 24hr to prevent spurious
+ # elections. We _want_ elections to occur after split, so reduce the value here.
+ # TODO(SERVER-64935): No longer required once we send replSetStepUp to recipient nodes
+ # when splitting them.
+ if "settings" in self.replset_config_options:
+ self.replset_config_options["settings"] = self.fixturelib.default_if_none(
+ self.replset_config_options["settings"], {})
+ self.replset_config_options["settings"]["electionTimeoutMillis"] = 5000
+ else:
+ self.replset_config_options["settings"] = {"electionTimeoutMillis": 5000}
+
+ self.fixtures.append(
+ self.fixturelib.make_fixture(
+ "ReplicaSetFixture", self.logger, self.job_num, mongod_options=mongod_options,
+ preserve_dbpath=self.preserve_dbpath, num_nodes=self.num_nodes_per_replica_set,
+ auth_options=self.auth_options, replset_config_options=self.replset_config_options,
+ mixed_bin_versions=self.mixed_bin_versions, replicaset_logging_prefix=donor_rs_name,
+ all_nodes_electable=True, replset_name=donor_rs_name))
+
+ # Ensure that all nodes are only ever run on the same deterministic set of ports, this
+ # makes it easier to reroute in the jstest overrides
+ self._port_index = 0
+ self._ports = [[node.port for node in self.get_donor_rs().nodes],
+ [
+ network.PortAllocator.next_fixture_port(self.job_num)
+ for _ in range(self.num_nodes_per_replica_set)
+ ]]
+
+ # TODO(SERVER-41031, SERVER-36417): Stop keeping retired donors alive once nodes which are
+ # removed from a replica set stop trying to send heartbeats to the replica set. We keep
+ # them alive for now to prevent a large amount of log lines from failed heartbeats.
+ # TODO(SERVER-65145): It may be possible to improve the inject_tenant_prefix override to
+ # prevent spurious errors caused by shutting down the replica set the shell originally
+ # connects to.
+ self._retired_donors = []
+
+ def pids(self):
+ """:return: pids owned by this fixture if any."""
+ out = []
+ with self.__lock:
+ for fixture in self.fixtures:
+ out.extend(fixture.pids())
+ for retired_donor in self._retired_donors:
+ out.extend(retired_donor.pids())
+ if not out:
+ self.logger.debug('No fixtures when gathering pids.')
+ return out
+
+ def setup(self):
+ """Set up the replica sets."""
+ # Don't take the lock because we don't expect setup to be called while the
+ # ContinuousShardSplit hook is running, which is the only thing that can modify
+ # self.fixtures. We don't want to take the lock because it would be held while starting
+ # mongod instances, which is prone to hanging and could cause other functions which take
+ # the lock to hang.
+ for fixture in self.fixtures:
+ fixture.setup()
+
+ def await_ready(self):
+ """Block until the fixture can be used for testing."""
+ # Don't take the lock because we don't expect await_ready to be called while the
+ # ContinuousShardSplit hook is running, which is the only thing that can modify
+ # self.fixtures. We don't want to take the lock because it would be held while waiting for
+ # the donor to initiate which may take a long time.
+ for fixture in self.fixtures:
+ fixture.await_ready()
+
+ def _do_teardown(self, mode=None):
+ """Shut down the replica sets."""
+ self.logger.info("Stopping all replica sets...")
+
+ running_at_start = self.is_running()
+ if not running_at_start:
+ self.logger.warning("Donor replica set expected to be running, but wasn't.")
+
+ teardown_handler = interface.FixtureTeardownHandler(self.logger)
+
+ # Don't take the lock because we don't expect teardown to be called while the
+ # ContinuousShardSplit hook is running, which is the only thing that can modify
+ # self.fixtures. Tearing down may take a long time, so taking the lock during that process
+ # might result in hangs in other functions which need to take the lock.
+ for retired_donor in self._retired_donors:
+ teardown_handler.teardown(retired_donor, f"replica set '{retired_donor.replset_name}'",
+ mode=mode)
+
+ for fixture in reversed(self.fixtures):
+ type_name = f"replica set '{fixture.replset_name}'" if isinstance(
+ fixture, replicaset.ReplicaSetFixture) else f"standalone on port {fixture.port}"
+ teardown_handler.teardown(fixture, type_name, mode=mode)
+
+ # Remove the recipient nodes outright now that they have been torn down
+ self.fixtures = [self.get_donor_rs()]
+
+ # Remove the retired donors, if we restart the active donor it will have no connections
+ # to retired donors and new tests will only connect to the active donor.
+ self._retired_donors = []
+
+ if teardown_handler.was_successful():
+ self.logger.info("Successfully stopped donor replica set and all standalone nodes.")
+ else:
+ self.logger.error("Stopping the fixture failed.")
+ raise self.fixturelib.ServerFailure(teardown_handler.get_error_message())
+
+ def is_running(self):
+ """Return true if all replica sets are still operating."""
+ # This method is most importantly used in between test runs in job.py to determine if a
+ # fixture has crashed between test invocations. We return the `is_running` status of the
+ # donor here, instead of all fixtures, some of which may not have been started yet.
+ return self.get_donor_rs().is_running()
+
+ def get_internal_connection_string(self):
+ """Return the internal connection string to the replica set that currently starts out owning the data."""
+ donor_rs = self.get_donor_rs()
+ if not donor_rs:
+ raise ValueError("Must call setup() before calling get_internal_connection_string()")
+ return donor_rs.get_internal_connection_string()
+
+ def get_driver_connection_url(self):
+ """Return the driver connection URL to the replica set that currently starts out owning the data."""
+ donor_rs = self.get_donor_rs()
+ if not donor_rs:
+ raise ValueError("Must call setup() before calling get_driver_connection_url")
+ return donor_rs.get_driver_connection_url()
+
+ def get_node_info(self):
+ """Return a list of dicts of NodeInfo objects."""
+ output = []
+ with self.__lock:
+ for fixture in self.fixtures:
+ output += fixture.get_node_info()
+ for retired_donor in self._retired_donors:
+ output += retired_donor.get_node_info()
+ return output
+
+ def get_independent_clusters(self):
+ """Return the replica sets involved in the tenant migration."""
+ with self.__lock:
+ return self.fixtures.copy() + self._retired_donors.copy()
+
+ def get_donor_rs(self):
+ """:return the donor replica set."""
+ with self.__lock:
+ donor_rs = next(iter(self.fixtures), None)
+ if donor_rs and not isinstance(donor_rs, replicaset.ReplicaSetFixture):
+ raise ValueError("Invalid configuration, donor_rs is not a ReplicaSetFixture")
+ return donor_rs
+
+ def get_recipient_nodes(self):
+ """:return the recipient nodes for the current split operation."""
+ with self.__lock:
+ return self.fixtures[1:]
+
+ def add_recipient_nodes(self, recipient_set_name, recipient_tag_name=None):
+ """Build recipient nodes, and reconfig them into the donor as non-voting members."""
+ recipient_tag_name = recipient_tag_name or "recipientNode"
+
+ self.logger.info(
+ f"Adding {self.num_nodes_per_replica_set} recipient nodes to donor replica set.")
+
+ with self.__lock:
+ self._port_index ^= 1 # Toggle the set of mongod ports between index 0 and 1
+ for i in range(self.num_nodes_per_replica_set):
+ mongod_logger = self.fixturelib.new_fixture_node_logger(
+ "MongoDFixture", self.job_num, f"{recipient_set_name}:node{i}")
+
+ mongod_options = self.common_mongod_options.copy()
+ # Even though these nodes are not starting in a replica set, we structure their
+ # files on disk as if they were already part of the new recipient set. This makes
+ # logging and cleanup easier.
+ mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, recipient_set_name,
+ "node{}".format(i))
+ mongod_options["set_parameters"] = mongod_options.get(
+ "set_parameters", self.fixturelib.make_historic({})).copy()
+ mongod_options["serverless"] = True
+ mongod_port = self._ports[self._port_index][i]
+ self.fixtures.append(
+ self.fixturelib.make_fixture(
+ "MongoDFixture", mongod_logger, self.job_num, mongod_options=mongod_options,
+ dbpath_prefix=self.dbpath_prefix, preserve_dbpath=self.preserve_dbpath,
+ port=mongod_port))
+
+ recipient_nodes = self.get_recipient_nodes()
+ for recipient_node in recipient_nodes:
+ recipient_node.setup()
+ recipient_node.await_ready()
+
+ # Reconfig the donor to add the recipient nodes as non-voting members
+ donor_client = self.get_donor_rs().get_primary().mongo_client()
+ interface.authenticate(donor_client, self.auth_options)
+
+ repl_config = donor_client.admin.command({"replSetGetConfig": 1})["config"]
+ repl_members = repl_config["members"]
+ for recipient_node in recipient_nodes:
+ repl_members.append({
+ "host": recipient_node.get_internal_connection_string(), "votes": 0, "priority": 0,
+ "tags": {recipient_tag_name: str(ObjectId())}
+ })
+
+ # Re-index all members from 0
+ for idx, member in enumerate(repl_members):
+ member["_id"] = idx
+
+ # Prepare the new config
+ repl_config["version"] = repl_config["version"] + 1
+ repl_config["members"] = repl_members
+
+ self.logger.info(
+ f"Reconfiguring donor replica set to add non-voting recipient nodes: {repl_config}")
+ donor_client.admin.command(
+ {"replSetReconfig": repl_config, "maxTimeMS": self.AWAIT_REPL_TIMEOUT_MINS * 60 * 1000})
+
+ # Wait for recipient nodes to become secondaries
+ self._await_recipient_nodes()
+
+ def _await_recipient_nodes(self):
+ """Wait for recipient nodes to become available."""
+ recipient_nodes = self.get_recipient_nodes()
+ for recipient_node in recipient_nodes:
+ client = recipient_node.mongo_client(read_preference=pymongo.ReadPreference.SECONDARY)
+ while True:
+ self.logger.info(
+ f"Waiting for secondary on port {recipient_node.port} to become available.")
+ try:
+ is_secondary = client.admin.command("isMaster")["secondary"]
+ if is_secondary:
+ break
+ except pymongo.errors.OperationFailure as err:
+ if err.code != ShardSplitFixture._INTERRUPTED_DUE_TO_STORAGE_CHANGE:
+ raise
+ time.sleep(0.1) # Wait a little bit before trying again.
+ self.logger.info(f"Secondary on port {recipient_node.port} is now available.")
+
+ def replace_donor_with_recipient(self, recipient_set_name):
+ """Replace the current donor with the newly initiated recipient."""
+ self.logger.info("Replacing donor replica set with recipient replica set.")
+
+ retired_donor_rs = self.get_donor_rs()
+ self.logger.info(f"Retiring old donor replica set '{retired_donor_rs.replset_name}'.")
+ with self.__lock:
+ self._retired_donors.append(retired_donor_rs)
+
+ self.logger.info(
+ f"Making new donor replica set '{recipient_set_name}' from existing recipient nodes.")
+ mongod_options = self.common_mongod_options.copy()
+ mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, recipient_set_name)
+ mongod_options["serverless"] = True
+ new_donor_rs = self.fixturelib.make_fixture(
+ "ReplicaSetFixture", self.logger, self.job_num, mongod_options=mongod_options,
+ preserve_dbpath=self.preserve_dbpath, num_nodes=self.num_nodes_per_replica_set,
+ auth_options=self.auth_options, replset_config_options=self.replset_config_options,
+ mixed_bin_versions=self.mixed_bin_versions,
+ replicaset_logging_prefix=recipient_set_name, all_nodes_electable=True,
+ replset_name=recipient_set_name, existing_nodes=self.get_recipient_nodes())
+
+ new_donor_rs.get_primary() # Awaits an election of a primary
+
+ self.logger.info("Replacing internal fixtures with new donor replica set.")
+ with self.__lock:
+ self.fixtures = [new_donor_rs]
diff --git a/buildscripts/resmokelib/testing/hooks/shard_split.py b/buildscripts/resmokelib/testing/hooks/shard_split.py
new file mode 100644
index 00000000000..d9e8e307d56
--- /dev/null
+++ b/buildscripts/resmokelib/testing/hooks/shard_split.py
@@ -0,0 +1,486 @@
+"""Test hook that runs shard splits continuously."""
+
+import copy
+import random
+import threading
+import time
+import uuid
+
+import bson
+import pymongo.errors
+
+from buildscripts.resmokelib import errors
+from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
+from buildscripts.resmokelib.testing.fixtures import shard_split
+from buildscripts.resmokelib.testing.fixtures.replicaset import ReplicaSetFixture
+from buildscripts.resmokelib.testing.hooks import interface
+
+
+class ContinuousShardSplit(interface.Hook): # pylint: disable=too-many-instance-attributes
+ """Starts a shard split thread at the beginning of each test."""
+
+ DESCRIPTION = ("Continuous shard split operations")
+
+ IS_BACKGROUND = True
+ AWAIT_REPL_TIMEOUT_MINS = ReplicaSetFixture.AWAIT_REPL_TIMEOUT_MINS
+
+ def __init__(self, hook_logger, fixture, shell_options):
+ """Initialize the ContinuousShardSplit.
+
+ Args:
+ hook_logger: the logger instance for this hook.
+ fixture: the target ShardSplitFixture containing the donor replica set.
+ shell_options: contains the global_vars which contains TestData.tenantIds to be used for
+ shard splits.
+
+ """
+ interface.Hook.__init__(self, hook_logger, fixture, ContinuousShardSplit.DESCRIPTION)
+
+ if not isinstance(fixture, shard_split.ShardSplitFixture):
+ raise ValueError("The ContinuousShardSplit hook requires a ShardSplitFixture")
+ self._shard_split_fixture = fixture
+ self._shell_options = copy.deepcopy(shell_options)
+ self._shard_split_thread = None
+
+ def before_suite(self, test_report):
+ """Before suite."""
+ if not self._shard_split_fixture:
+ raise ValueError("No ShardSplitFixture to run shard splits on")
+ self.logger.info("Starting the shard split thread.")
+ self._shard_split_thread = _ShardSplitThread(self.logger, self._shard_split_fixture,
+ self._shell_options, test_report)
+ self._shard_split_thread.start()
+
+ def after_suite(self, test_report, teardown_flag=None):
+ """After suite."""
+ self.logger.info("Stopping the shard split thread.")
+ self._shard_split_thread.stop()
+ self.logger.info("Stopped the shard split thread.")
+
+ def before_test(self, test, test_report):
+ """Before test."""
+ self.logger.info("Resuming the shard split thread.")
+ self._shard_split_thread.resume(test)
+
+ def after_test(self, test, test_report):
+ """After test."""
+ self.logger.info("Pausing the shard split thread.")
+ self._shard_split_thread.pause()
+ self.logger.info("Paused the shard split thread.")
+
+
+class ShardSplitLifeCycle(object):
+ """Class for managing the various states of the shard split thread.
+
+ The job thread alternates between calling mark_test_started() and mark_test_finished(). The
+ shard split thread is allowed to perform splits at any point between these two calls.
+ Note that the job thread synchronizes with the shard split thread outside the context of
+ this object to know it isn't in the process of running a split.
+ """
+
+ _TEST_STARTED_STATE = "start"
+ _TEST_FINISHED_STATE = "finished"
+
+ def __init__(self):
+ """Initialize the ShardSplitLifeCycle instance."""
+ self.__lock = threading.Lock()
+ self.__cond = threading.Condition(self.__lock)
+
+ self.test_num = 0
+ self.__test_state = self._TEST_FINISHED_STATE
+ self.__should_stop = False
+
+ def mark_test_started(self):
+ """Signal to the shard split thread that a new test has started.
+
+ This function should be called during before_test(). Calling it causes the
+ wait_for_shard_split_permitted() function to no longer block and to instead return
+ true.
+ """
+ with self.__lock:
+ self.test_num += 1
+ self.__test_state = self._TEST_STARTED_STATE
+ self.__cond.notify_all()
+
+ def mark_test_finished(self):
+ """Signal to the shard split thread that the current test has finished.
+
+ This function should be called during after_test(). Calling it causes the
+ wait_for_shard_split_permitted() function to block until mark_test_started() is called
+ again.
+ """
+ with self.__lock:
+ self.__test_state = self._TEST_FINISHED_STATE
+ self.__cond.notify_all()
+
+ def stop(self):
+ """Signal to the shard split thread that it should exit.
+
+ This function should be called during after_suite(). Calling it causes the
+ wait_for_shard_split_permitted() function to no longer block and to instead return
+ false.
+ """
+ with self.__lock:
+ self.__should_stop = True
+ self.__cond.notify_all()
+
+ def wait_for_shard_split_permitted(self):
+ """Block until splits are permitted, or until stop() is called."""
+ with self.__lock:
+ while not self.__should_stop:
+ if self.__test_state == self._TEST_STARTED_STATE:
+ return True
+
+ self.__cond.wait()
+
+ return False
+
+ def wait_for_shard_split_interval(self, timeout):
+ """Block for 'timeout' seconds, or until stop() is called."""
+ with self.__lock:
+ self.__cond.wait(timeout)
+
+ def poll_for_idle_request(self): # noqa: D205,D400
+ """Return true if the shard split thread should continue running splits, or false
+ if it should temporarily stop running splits.
+ """
+ with self.__lock:
+ return self.__test_state == self._TEST_FINISHED_STATE
+
+
+class _ShardSplitOptions:
+ def __init__( # pylint: disable=too-many-arguments
+ self, logger, shard_split_fixture, tenant_ids, recipient_tag_name, recipient_set_name):
+ self.logger = logger
+ self.migration_id = uuid.uuid4()
+ self.shard_split_fixture = shard_split_fixture
+ self.tenant_ids = tenant_ids
+ self.recipient_tag_name = recipient_tag_name
+ self.recipient_set_name = recipient_set_name
+
+ def get_migration_id_as_binary(self):
+ """Return the migration id as BSON Binary."""
+ return bson.Binary(self.migration_id.bytes, 4)
+
+ def get_donor_rs(self):
+ """Return the current donor for the split fixture."""
+ return self.shard_split_fixture.get_donor_rs()
+
+ def get_donor_name(self):
+ """Return the replica set name for the donor."""
+ return self.get_donor_rs().replset_name
+
+ def get_donor_primary(self):
+ """Return a connection to the donor primary."""
+ return self.get_donor_rs().get_primary(timeout_secs=self.AWAIT_REPL_TIMEOUT_MINS)
+
+ def get_donor_nodes(self):
+ """Return the nodes for the current shard split fixture donor."""
+ return self.get_donor_rs().nodes
+
+ def get_recipient_nodes(self):
+ """Return the recipient nodes for the shard split fixture."""
+ return self.shard_split_fixture.get_recipient_nodes()
+
+ def __str__(self):
+ opts = {
+ "migration_id": self.migration_id, "tenant_ids": self.tenant_ids,
+ "donor": self.get_donor_name(), "recipientSetName": self.recipient_set_name,
+ "recipientTagName": self.recipient_tag_name
+ }
+ return str(opts)
+
+
+class _ShardSplitThread(threading.Thread): # pylint: disable=too-many-instance-attributes
+ THREAD_NAME = "ShardSplitThread"
+
+ WAIT_SECS_RANGES = [[0.05, 0.1], [0.1, 0.5], [1, 5], [5, 15]]
+ POLL_INTERVAL_SECS = 0.1
+
+ NO_SUCH_MIGRATION_ERR_CODE = 327
+ INTERNAL_ERR_CODE = 1
+
+ def __init__(self, logger, shard_split_fixture, shell_options, test_report):
+ """Initialize _ShardSplitThread."""
+ threading.Thread.__init__(self, name=self.THREAD_NAME)
+ self.daemon = True
+ self.logger = logger
+ self._shard_split_fixture = shard_split_fixture
+ self._tenant_ids = shell_options["global_vars"]["TestData"]["tenantIds"]
+ self._auth_options = shell_options["global_vars"]["TestData"]["authOptions"]
+ self._test = None
+ self._test_report = test_report
+ self._shell_options = shell_options
+
+ self.__lifecycle = ShardSplitLifeCycle()
+ # Event set when the thread has been stopped using the 'stop()' method.
+ self._is_stopped_evt = threading.Event()
+ # Event set when the thread is not performing shard splits.
+ self._is_idle_evt = threading.Event()
+ self._is_idle_evt.set()
+
+ def run(self):
+ """Execute the thread."""
+ if not self._shard_split_fixture:
+ self.logger.warning("No ShardSplitFixture to run shard splits on.")
+ return
+
+ split_count = 0
+
+ try:
+ while True:
+ self._is_idle_evt.set()
+
+ permitted = self.__lifecycle.wait_for_shard_split_permitted()
+ if not permitted:
+ break
+
+ if split_count >= 1: # TODO(SERVER-65042): Remove this check and run multiple splits
+ time.sleep(self.POLL_INTERVAL_SECS)
+ continue
+
+ self._is_idle_evt.clear()
+
+ split_opts = self._create_split_opts(split_count)
+
+ # Set up the donor for a split
+ self._shard_split_fixture.add_recipient_nodes(split_opts.recipient_set_name)
+
+ # Briefly wait to let the test run before starting the split operation, so that
+ # the first split is more likely to have data to migrate.
+ wait_secs = random.uniform(
+ *self.WAIT_SECS_RANGES[split_count % len(self.WAIT_SECS_RANGES)])
+ self.logger.info(f"Waiting for {wait_secs} seconds before starting split.")
+ self.__lifecycle.wait_for_shard_split_interval(wait_secs)
+
+ self.logger.info(f"Starting shard split: {str(split_opts)}.")
+ start_time = time.time()
+ is_committed = self._run_shard_split(split_opts)
+ end_time = time.time()
+
+ split_count += 1
+ self.logger.info(
+ f"Completed shard split {str(split_opts)} in {(end_time - start_time) * 1000} ms."
+ )
+
+ # set up the fixture for the next split operation
+ if is_committed:
+ self._shard_split_fixture.replace_donor_with_recipient(
+ split_opts.recipient_set_name)
+ else:
+ self._shard_split_fixture.remove_recipient_nodes()
+
+ found_idle_request = self.__lifecycle.poll_for_idle_request()
+ if found_idle_request:
+ continue
+ except Exception: # pylint: disable=W0703
+ # Proactively log the exception when it happens so it will be flushed immediately.
+ self.logger.exception("Shard split thread threw exception")
+ # The event should be signaled whenever the thread is not performing shard splits.
+ self._is_idle_evt.set()
+
+ def stop(self):
+ """Stop the thread when the suite finishes."""
+ self.__lifecycle.stop()
+ self._is_stopped_evt.set()
+ # Unpause to allow the thread to finish.
+ self.resume(self._test)
+ self.join()
+
+ def pause(self):
+ """Pause the thread after test."""
+ self.__lifecycle.mark_test_finished()
+
+ # Wait until we are no longer executing splits.
+ self._is_idle_evt.wait()
+ # Check if the thread is alive in case it has thrown an exception while running.
+ self._check_thread()
+
+ # Check that the fixture is still running.
+ if not self._shard_split_fixture.is_running():
+ raise errors.ServerFailure(
+ f"ShardSplitFixture with pids {self._shard_split_fixture.pids()} expected to be running in"
+ " ContinuousShardSplit, but wasn't.")
+
+ def resume(self, test):
+ """Resume the thread before test."""
+ self._test = test
+ self.__lifecycle.mark_test_started()
+
+ def _wait(self, timeout):
+ """Wait until stop or timeout."""
+ self._is_stopped_evt.wait(timeout)
+
+ def short_name(self):
+ """Return the name of the thread."""
+ return self.THREAD_NAME
+
+ def _check_thread(self):
+ """Throw an error if the thread is not running."""
+ if not self.is_alive():
+ msg = "Shard split thread is not running."
+ self.logger.error(msg)
+ raise errors.ServerFailure(msg)
+
+ def _is_fail_point_abort_reason(self, abort_reason):
+ return abort_reason["code"] == self.INTERNAL_ERR_CODE and abort_reason[
+ "errmsg"] == "simulate a shard split error"
+
+ def _create_split_opts(self, split_count):
+ recipient_set_name = f"rs{split_count+1}"
+ recipient_tag_name = "recipientNode"
+ return _ShardSplitOptions(self.logger, self._shard_split_fixture, self._tenant_ids,
+ recipient_tag_name, recipient_set_name)
+
+ def _create_client(self, node):
+ return fixture_interface.authenticate(node.mongo_client(), self._auth_options)
+
+ def _run_shard_split(self, split_opts): # noqa: D205,D400
+ try:
+ donor_client = self._create_client(split_opts.get_donor_rs())
+ res = self._commit_shard_split(donor_client, split_opts)
+ is_committed = res["state"] == "committed"
+
+ self._forget_shard_split(donor_client, split_opts)
+ self._wait_for_garbage_collection(split_opts)
+
+ if not res["ok"]:
+ raise errors.ServerFailure(
+ f"Shard split '{split_opts.migration_id}' on replica set "
+ f"'{split_opts.get_donor_name()}' failed: {str(res)}")
+
+ if is_committed:
+ return True
+
+ abort_reason = res["abortReason"]
+ if self._is_fail_point_abort_reason(abort_reason):
+ self.logger.info(
+ f"Shard split '{split_opts.migration_id}' on replica set "
+ f"'{split_opts.get_donor_name()}' aborted due to failpoint: {str(res)}.")
+ return False
+ raise errors.ServerFailure(
+ f"Shard split '{str(split_opts.migration_id)}' with donor replica set "
+ f"'{split_opts.get_donor_name()}' aborted due to an error: {str(res)}")
+ except pymongo.errors.PyMongoError:
+ self.logger.exception(
+ f"Error running shard split '{split_opts.migration_id}' with donor primary on "
+ f"replica set '{split_opts.get_donor_name()}'.")
+ raise
+
+ def _commit_shard_split(self, donor_client, split_opts): # noqa: D205,D400
+ self.logger.info(f"Starting shard split '{split_opts.migration_id}' on replica set "
+ f"'{split_opts.get_donor_name()}'.")
+
+ while True:
+ try:
+ res = donor_client.admin.command({
+ "commitShardSplit": 1, "migrationId": split_opts.get_migration_id_as_binary(),
+ "tenantIds": split_opts.tenant_ids,
+ "recipientTagName": split_opts.recipient_tag_name, "recipientSetName":
+ split_opts.recipient_set_name
+ }, bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE))
+
+ if res["state"] == "committed":
+ self.logger.info(f"Shard split '{split_opts.migration_id}' on replica set "
+ f"'{split_opts.get_donor_name()}' has committed.")
+ return res
+ if res["state"] == "aborted":
+ self.logger.info(f"Shard split '{split_opts.migration_id}' on replica set "
+ f"'{split_opts.get_donor_name()}' has aborted: {str(res)}.")
+ return res
+ if not res["ok"]:
+ self.logger.info(f"Shard split '{split_opts.migration_id}' on replica set "
+ f"'{split_opts.get_donor_name()}' has failed: {str(res)}.")
+ return res
+ except pymongo.errors.ConnectionFailure:
+ self.logger.info(
+ f"Retrying shard split '{split_opts.migration_id}' against replica set "
+ f"'{split_opts.get_donor_name()}'.")
+
+ def _forget_shard_split(self, donor_client, split_opts):
+ self.logger.info(f"Forgetting shard split '{split_opts.migration_id}' on replica set "
+ f"'{split_opts.get_donor_name()}'.")
+
+ while True:
+ try:
+ donor_client.admin.command(
+ {"forgetShardSplit": 1, "migrationId": split_opts.get_migration_id_as_binary()},
+ bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE))
+ return
+ except pymongo.errors.ConnectionFailure:
+ self.logger.info(
+ f"Retrying forget shard split '{split_opts.migration_id}' against replica "
+ f"set '{split_opts.get_donor_name()}'.")
+ continue
+ except pymongo.errors.OperationFailure as err:
+ if err.code != self.NO_SUCH_MIGRATION_ERR_CODE:
+ raise
+
+ # The fixture was restarted.
+ self.logger.info(
+ f"Could not find shard split '{split_opts.migration_id}' on donor primary on "
+ f"replica set '{split_opts.get_donor_name()}': {str(err)}.")
+ return
+ except pymongo.errors.PyMongoError:
+ self.logger.exception(
+ f"Error forgetting shard split '{split_opts.migration_id}' on donor primary on "
+ f"replica set '{split_opts.get_donor_name()}'.")
+ raise
+
+ def _wait_for_garbage_collection(self, split_opts): # noqa: D205,D400
+ try:
+ donor_nodes = split_opts.get_donor_nodes()
+ for donor_node in donor_nodes:
+ self.logger.info(
+ f"Waiting for shard split '{split_opts.migration_id}' to be garbage collected on donor node on port {donor_node.port} of replica set '{split_opts.get_donor_name()}'."
+ )
+
+ donor_node_client = self._create_client(donor_node)
+ while True:
+ try:
+ res = donor_node_client.config.command({
+ "count": "tenantSplitDonors",
+ "query": {"tenantIds": split_opts.tenant_ids}
+ })
+ if res["n"] == 0:
+ break
+ except pymongo.errors.ConnectionFailure:
+ self.logger.info(
+ f"Retrying waiting for shard split '{split_opts.migration_id}' to be garbage collected on donor node on port {donor_node.port} of replica set '{split_opts.get_donor_name()}'."
+ )
+ continue
+ time.sleep(self.POLL_INTERVAL_SECS)
+
+ recipient_nodes = split_opts.get_recipient_nodes()
+ for recipient_node in recipient_nodes:
+ self.logger.info(
+ f"Waiting for shard split '{split_opts.migration_id}' to be garbage collected on recipient node on port {recipient_node.port} of replica set '{split_opts.recipient_set_name}'."
+ )
+
+ recipient_node_client = self._create_client(recipient_node)
+ while True:
+ try:
+ hello = recipient_node_client.admin.command("hello")
+ print(f"recipient hello: {hello}")
+
+ res = recipient_node_client.config.command({
+ "count": "tenantSplitDonors",
+ "query": {"tenantIds": split_opts.tenant_ids}
+ })
+ if res["n"] == 0:
+ break
+
+ docs = recipient_node_client.config.tenantSplitDonors.find()
+ print(f"recipient tenantSplitDonors docs: {list(docs)}")
+ except pymongo.errors.ConnectionFailure:
+ self.logger.info(
+ f"Retrying waiting for shard split '{split_opts.migration_id}' to be garbage collected on recipient node on port {recipient_node.port} of replica set '{split_opts.recipient_set_name}'."
+ )
+ continue
+ time.sleep(self.POLL_INTERVAL_SECS)
+
+ except pymongo.errors.PyMongoError:
+ self.logger.exception(
+ f"Error waiting for shard split '{split_opts.migration_id}' from donor replica set '{split_opts.get_donor_name()} to recipient replica set '{split_opts.recipient_set_name}' to be garbage collected."
+ )
+ raise
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index 249c3a2e4ce..1ba31d24ad7 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -1621,6 +1621,7 @@ buildvariants:
- name: .serverless
- name: .updatefuzzer
- name: secondary_reads_passthrough_gen
+ - name: .shard_split
# Disabling as the following tests are not aware of feature flags.
# - name: server_discovery_and_monitoring_json_test_TG
# distros:
@@ -2371,6 +2372,7 @@ buildvariants:
- name: .serverless
- name: .watchdog
- name: .stitch
+ - name: .shard_split
# Disabling the following tests as they are not aware of feature flags.
# - name: .benchmarks
# - name: unittest_shell_hang_analyzer_gen
@@ -2520,6 +2522,7 @@ buildvariants:
- name: .serverless
- name: .updatefuzzer
- name: watchdog_wiredtiger
+ - name: .shard_split
# Disabling the following tests as they are not aware of feature flags.
# - name: .benchmarks
# - name: server_discovery_and_monitoring_json_test_TG
diff --git a/etc/evergreen_yml_components/definitions.yml b/etc/evergreen_yml_components/definitions.yml
index 40d62593132..ab1f86e129a 100644
--- a/etc/evergreen_yml_components/definitions.yml
+++ b/etc/evergreen_yml_components/definitions.yml
@@ -5078,6 +5078,15 @@ tasks:
vars:
use_large_distro: "true"
+# TODO(SERVER-65178): Add "serverless" tag to shard split passthrough when no longer feature flagged
+- <<: *gen_task_template
+ name: shard_split_jscore_passthrough_gen
+ tags: ["shard_split"]
+ commands:
+ - func: "generate resmoke tasks"
+ vars:
+ use_large_distro: "true"
+
- <<: *gen_task_template
name: tenant_migration_jscore_passthrough_gen
tags: ["serverless"]