diff options
author | Matt Broadstone <mbroadst@mongodb.com> | 2022-04-15 21:58:44 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-15 22:33:51 +0000 |
commit | d9f29d10916539fc25cfa03731850332866ce42c (patch) | |
tree | 7d152f53c7793b1647653c737038f668a881bd32 | |
parent | 5f3e71f07128b9007032483210ae3b9309a2b4f6 (diff) | |
download | mongo-d9f29d10916539fc25cfa03731850332866ce42c.tar.gz |
SERVER-63093 Introduce a vanilla passthrough for shard split
-rw-r--r-- | buildscripts/resmokeconfig/suites/shard_split_jscore_passthrough.yml | 104 | ||||
-rw-r--r-- | buildscripts/resmokelib/testing/fixtures/_builder.py | 13 | ||||
-rw-r--r-- | buildscripts/resmokelib/testing/fixtures/shard_split.py | 317 | ||||
-rw-r--r-- | buildscripts/resmokelib/testing/hooks/shard_split.py | 486 | ||||
-rw-r--r-- | etc/evergreen.yml | 3 | ||||
-rw-r--r-- | etc/evergreen_yml_components/definitions.yml | 9 |
6 files changed, 930 insertions, 2 deletions
diff --git a/buildscripts/resmokeconfig/suites/shard_split_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/shard_split_jscore_passthrough.yml new file mode 100644 index 00000000000..8b360ca7e40 --- /dev/null +++ b/buildscripts/resmokeconfig/suites/shard_split_jscore_passthrough.yml @@ -0,0 +1,104 @@ +config_variables: +- &keyFile jstests/libs/authTestsKey +- &keyFileData Thiskeyisonlyforrunningthesuitewithauthenticationdontuseitinanytestsdirectly +- &authOptions + authenticationDatabase: admin + authenticationMechanism: SCRAM-SHA-256 + password: *keyFileData + username: __system + +test_kind: js_test + +selector: + roots: + - jstests/core/**/*.js + exclude_files: + - jstests/core/txns/**/*.js + # Skip any tests that run with auth explicitly. + - jstests/core/*[aA]uth*.js + + exclude_with_any_tags: + - assumes_standalone_mongod + # These tests run getMore commands which are not supported in the tenant migration passthrough. + - requires_getmore + # Due to background tenant migrations, operations in the main test shell are not guaranteed to + # be causally consistent with operations in a parallel shell. The reason is that + # TenantMigrationCommitted error is only thrown when the client does a write or a atClusterTime/ + # afterClusterTime or linearlizable read. Therefore, one of shell may not be aware that the + # migration has occurred and would not forward the read/write command to the right replica set. + - uses_parallel_shell + # Profile settings are stored in-memory only so are not transferred to the recipient. + - requires_profiling + # capped collections are banned in Serverless + - requires_capped + # emptycapped command is blocked during tenant migration. + - requires_emptycapped + # Multi-updates that conflict with tenant migration are not retried by inject_tenant_prefix.js. + - requires_multi_updates + # Full validation can cause ongoing queries to fail. This can affect the tenant migration + # process. + - uses_full_validation + - tenant_migration_incompatible + +executor: + archive: + tests: true + hooks: + - CheckReplDBHash + - CheckReplOplogs + - ValidateCollections + + config: + shell_options: + eval: >- + testingReplication = true; + jsTest.authenticate(db.getMongo()); + global_vars: + TestData: &TestData + tenantIds: ["tenantA", "tenantB"] + auth: true + authMechanism: SCRAM-SHA-256 + keyFile: *keyFile + keyFileData: *keyFileData + roleGraphInvalidationIsFatal: true + hooks: + # Hooks are set up in the order listed here, and torn down in the same order (not as a stack). + # The ContinuousShardSplit hook must come before CleanEveryN so that the ShardSplitFixture's list + # of nodes can't be modified while CleanEveryN is running + - class: ContinuousShardSplit + shell_options: + global_vars: + TestData: + <<: *TestData + authOptions: *authOptions + # The CheckReplDBHash hook waits until all operations have replicated to and have been applied + # on the secondaries, so we run the ValidateCollections hook after it to ensure we're + # validating the entire contents of the collection. + - class: CheckReplOplogs + shell_options: + global_vars: + TestData: *TestData + - class: CheckReplDBHash + shell_options: + global_vars: + TestData: *TestData + - class: ValidateCollections + shell_options: + global_vars: + TestData: *TestData + - class: CleanEveryN + n: 20 + fixture: + class: ShardSplitFixture + common_mongod_options: + set_parameters: + enableTestCommands: 1 + # To avoid routing commands in each test incorrectly, the ContinuousTenantMigration hook + # only runs donorForgetMigration against the donor of each migration when it is safe to do + # so. Therefore, the garbage collection delay doesn't need to be large. + shardSplitGarbageCollectionDelayMS: 1 + ttlMonitorSleepSecs: 1 + auth: '' + keyFile: *keyFile + num_nodes_per_replica_set: 3 + auth_options: *authOptions diff --git a/buildscripts/resmokelib/testing/fixtures/_builder.py b/buildscripts/resmokelib/testing/fixtures/_builder.py index e9197d9775b..53ffc538ef4 100644 --- a/buildscripts/resmokelib/testing/fixtures/_builder.py +++ b/buildscripts/resmokelib/testing/fixtures/_builder.py @@ -54,7 +54,7 @@ class FixtureBuilder(ABC, metaclass=registry.make_registry_metaclass(_BUILDERS, REGISTERED_NAME = "Builder" @abstractmethod - def build_fixture(self, logger, job_num, fixturelib, *args, **kwargs): + def build_fixture(self, logger, job_num, fixturelib, *args, existing_nodes=None, **kwargs): """Abstract method to build a fixture.""" return @@ -73,7 +73,7 @@ class ReplSetBuilder(FixtureBuilder): latest_class = "MongoDFixture" multiversion_class_suffix = "_multiversion_class_suffix" - def build_fixture(self, logger, job_num, fixturelib, *args, **kwargs): # pylint: disable=too-many-locals + def build_fixture(self, logger, job_num, fixturelib, *args, existing_nodes=None, **kwargs): # pylint: disable=too-many-locals """Build a replica set.""" # We hijack the mixed_bin_versions passed to the fixture. mixed_bin_versions = kwargs.pop("mixed_bin_versions", config.MIXED_BIN_VERSIONS) @@ -152,6 +152,15 @@ class ReplSetBuilder(FixtureBuilder): replset = _FIXTURES[self.REGISTERED_NAME](logger, job_num, fixturelib, *args, **kwargs) replset.set_fcv(fcv) + + # Don't build new nodes if existing nodes are provided. + if existing_nodes: + # Rename the logger to improve readability when printing node info maps + for idx, node in enumerate(existing_nodes): + node.logger = replset.get_logger_for_mongod(idx) + replset.install_mongod(node) + return replset + for node_index in range(replset.num_nodes): node = self._new_mongod(replset, node_index, executables, classes, mongod_binary_versions[node_index], is_multiversion) diff --git a/buildscripts/resmokelib/testing/fixtures/shard_split.py b/buildscripts/resmokelib/testing/fixtures/shard_split.py new file mode 100644 index 00000000000..3bb48d71162 --- /dev/null +++ b/buildscripts/resmokelib/testing/fixtures/shard_split.py @@ -0,0 +1,317 @@ +"""Fixture for testing shard split operations.""" + +import time +import os.path +import threading + +import pymongo +from bson.objectid import ObjectId + +import buildscripts.resmokelib.testing.fixtures.interface as interface +from buildscripts.resmokelib.testing.fixtures import replicaset +from buildscripts.resmokelib.core import network + + +class ShardSplitFixture(interface.MultiClusterFixture): # pylint: disable=too-many-instance-attributes + """Fixture which provides JSTests with a replica set and recipient nodes to run splits against.""" + + AWAIT_REPL_TIMEOUT_MINS = 5 + AWAIT_REPL_TIMEOUT_FOREVER_MINS = 24 * 60 + + def __init__( # pylint: disable=too-many-arguments,too-many-locals + self, + logger, + job_num, + fixturelib, + common_mongod_options=None, + per_mongod_options=None, + dbpath_prefix=None, + preserve_dbpath=False, + num_nodes_per_replica_set=2, + auth_options=None, + replset_config_options=None, + mixed_bin_versions=None, + ): + """Initialize ShardSplitFixture with different options for the replica set processes.""" + interface.MultiClusterFixture.__init__(self, logger, job_num, fixturelib, + dbpath_prefix=dbpath_prefix) + self.__lock = threading.Lock() + + self.common_mongod_options = self.fixturelib.default_if_none(common_mongod_options, {}) + self.per_mongod_options = self.fixturelib.default_if_none(per_mongod_options, {}) + self.dbpath_prefix = dbpath_prefix + self.preserve_dbpath = preserve_dbpath + self.auth_options = auth_options + self.replset_config_options = self.fixturelib.default_if_none(replset_config_options, {}) + self.mixed_bin_versions = self.fixturelib.default_if_none(mixed_bin_versions, + self.config.MIXED_BIN_VERSIONS) + self.num_nodes_per_replica_set = num_nodes_per_replica_set if num_nodes_per_replica_set \ + else self.config.NUM_REPLSET_NODES + + self.fixtures = [] + + # Make the initial donor replica set + donor_rs_name = "rs0" + mongod_options = self.common_mongod_options.copy() + mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, donor_rs_name) + mongod_options["serverless"] = True + + # The default `electionTimeoutMillis` on evergreen is 24hr to prevent spurious + # elections. We _want_ elections to occur after split, so reduce the value here. + # TODO(SERVER-64935): No longer required once we send replSetStepUp to recipient nodes + # when splitting them. + if "settings" in self.replset_config_options: + self.replset_config_options["settings"] = self.fixturelib.default_if_none( + self.replset_config_options["settings"], {}) + self.replset_config_options["settings"]["electionTimeoutMillis"] = 5000 + else: + self.replset_config_options["settings"] = {"electionTimeoutMillis": 5000} + + self.fixtures.append( + self.fixturelib.make_fixture( + "ReplicaSetFixture", self.logger, self.job_num, mongod_options=mongod_options, + preserve_dbpath=self.preserve_dbpath, num_nodes=self.num_nodes_per_replica_set, + auth_options=self.auth_options, replset_config_options=self.replset_config_options, + mixed_bin_versions=self.mixed_bin_versions, replicaset_logging_prefix=donor_rs_name, + all_nodes_electable=True, replset_name=donor_rs_name)) + + # Ensure that all nodes are only ever run on the same deterministic set of ports, this + # makes it easier to reroute in the jstest overrides + self._port_index = 0 + self._ports = [[node.port for node in self.get_donor_rs().nodes], + [ + network.PortAllocator.next_fixture_port(self.job_num) + for _ in range(self.num_nodes_per_replica_set) + ]] + + # TODO(SERVER-41031, SERVER-36417): Stop keeping retired donors alive once nodes which are + # removed from a replica set stop trying to send heartbeats to the replica set. We keep + # them alive for now to prevent a large amount of log lines from failed heartbeats. + # TODO(SERVER-65145): It may be possible to improve the inject_tenant_prefix override to + # prevent spurious errors caused by shutting down the replica set the shell originally + # connects to. + self._retired_donors = [] + + def pids(self): + """:return: pids owned by this fixture if any.""" + out = [] + with self.__lock: + for fixture in self.fixtures: + out.extend(fixture.pids()) + for retired_donor in self._retired_donors: + out.extend(retired_donor.pids()) + if not out: + self.logger.debug('No fixtures when gathering pids.') + return out + + def setup(self): + """Set up the replica sets.""" + # Don't take the lock because we don't expect setup to be called while the + # ContinuousShardSplit hook is running, which is the only thing that can modify + # self.fixtures. We don't want to take the lock because it would be held while starting + # mongod instances, which is prone to hanging and could cause other functions which take + # the lock to hang. + for fixture in self.fixtures: + fixture.setup() + + def await_ready(self): + """Block until the fixture can be used for testing.""" + # Don't take the lock because we don't expect await_ready to be called while the + # ContinuousShardSplit hook is running, which is the only thing that can modify + # self.fixtures. We don't want to take the lock because it would be held while waiting for + # the donor to initiate which may take a long time. + for fixture in self.fixtures: + fixture.await_ready() + + def _do_teardown(self, mode=None): + """Shut down the replica sets.""" + self.logger.info("Stopping all replica sets...") + + running_at_start = self.is_running() + if not running_at_start: + self.logger.warning("Donor replica set expected to be running, but wasn't.") + + teardown_handler = interface.FixtureTeardownHandler(self.logger) + + # Don't take the lock because we don't expect teardown to be called while the + # ContinuousShardSplit hook is running, which is the only thing that can modify + # self.fixtures. Tearing down may take a long time, so taking the lock during that process + # might result in hangs in other functions which need to take the lock. + for retired_donor in self._retired_donors: + teardown_handler.teardown(retired_donor, f"replica set '{retired_donor.replset_name}'", + mode=mode) + + for fixture in reversed(self.fixtures): + type_name = f"replica set '{fixture.replset_name}'" if isinstance( + fixture, replicaset.ReplicaSetFixture) else f"standalone on port {fixture.port}" + teardown_handler.teardown(fixture, type_name, mode=mode) + + # Remove the recipient nodes outright now that they have been torn down + self.fixtures = [self.get_donor_rs()] + + # Remove the retired donors, if we restart the active donor it will have no connections + # to retired donors and new tests will only connect to the active donor. + self._retired_donors = [] + + if teardown_handler.was_successful(): + self.logger.info("Successfully stopped donor replica set and all standalone nodes.") + else: + self.logger.error("Stopping the fixture failed.") + raise self.fixturelib.ServerFailure(teardown_handler.get_error_message()) + + def is_running(self): + """Return true if all replica sets are still operating.""" + # This method is most importantly used in between test runs in job.py to determine if a + # fixture has crashed between test invocations. We return the `is_running` status of the + # donor here, instead of all fixtures, some of which may not have been started yet. + return self.get_donor_rs().is_running() + + def get_internal_connection_string(self): + """Return the internal connection string to the replica set that currently starts out owning the data.""" + donor_rs = self.get_donor_rs() + if not donor_rs: + raise ValueError("Must call setup() before calling get_internal_connection_string()") + return donor_rs.get_internal_connection_string() + + def get_driver_connection_url(self): + """Return the driver connection URL to the replica set that currently starts out owning the data.""" + donor_rs = self.get_donor_rs() + if not donor_rs: + raise ValueError("Must call setup() before calling get_driver_connection_url") + return donor_rs.get_driver_connection_url() + + def get_node_info(self): + """Return a list of dicts of NodeInfo objects.""" + output = [] + with self.__lock: + for fixture in self.fixtures: + output += fixture.get_node_info() + for retired_donor in self._retired_donors: + output += retired_donor.get_node_info() + return output + + def get_independent_clusters(self): + """Return the replica sets involved in the tenant migration.""" + with self.__lock: + return self.fixtures.copy() + self._retired_donors.copy() + + def get_donor_rs(self): + """:return the donor replica set.""" + with self.__lock: + donor_rs = next(iter(self.fixtures), None) + if donor_rs and not isinstance(donor_rs, replicaset.ReplicaSetFixture): + raise ValueError("Invalid configuration, donor_rs is not a ReplicaSetFixture") + return donor_rs + + def get_recipient_nodes(self): + """:return the recipient nodes for the current split operation.""" + with self.__lock: + return self.fixtures[1:] + + def add_recipient_nodes(self, recipient_set_name, recipient_tag_name=None): + """Build recipient nodes, and reconfig them into the donor as non-voting members.""" + recipient_tag_name = recipient_tag_name or "recipientNode" + + self.logger.info( + f"Adding {self.num_nodes_per_replica_set} recipient nodes to donor replica set.") + + with self.__lock: + self._port_index ^= 1 # Toggle the set of mongod ports between index 0 and 1 + for i in range(self.num_nodes_per_replica_set): + mongod_logger = self.fixturelib.new_fixture_node_logger( + "MongoDFixture", self.job_num, f"{recipient_set_name}:node{i}") + + mongod_options = self.common_mongod_options.copy() + # Even though these nodes are not starting in a replica set, we structure their + # files on disk as if they were already part of the new recipient set. This makes + # logging and cleanup easier. + mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, recipient_set_name, + "node{}".format(i)) + mongod_options["set_parameters"] = mongod_options.get( + "set_parameters", self.fixturelib.make_historic({})).copy() + mongod_options["serverless"] = True + mongod_port = self._ports[self._port_index][i] + self.fixtures.append( + self.fixturelib.make_fixture( + "MongoDFixture", mongod_logger, self.job_num, mongod_options=mongod_options, + dbpath_prefix=self.dbpath_prefix, preserve_dbpath=self.preserve_dbpath, + port=mongod_port)) + + recipient_nodes = self.get_recipient_nodes() + for recipient_node in recipient_nodes: + recipient_node.setup() + recipient_node.await_ready() + + # Reconfig the donor to add the recipient nodes as non-voting members + donor_client = self.get_donor_rs().get_primary().mongo_client() + interface.authenticate(donor_client, self.auth_options) + + repl_config = donor_client.admin.command({"replSetGetConfig": 1})["config"] + repl_members = repl_config["members"] + for recipient_node in recipient_nodes: + repl_members.append({ + "host": recipient_node.get_internal_connection_string(), "votes": 0, "priority": 0, + "tags": {recipient_tag_name: str(ObjectId())} + }) + + # Re-index all members from 0 + for idx, member in enumerate(repl_members): + member["_id"] = idx + + # Prepare the new config + repl_config["version"] = repl_config["version"] + 1 + repl_config["members"] = repl_members + + self.logger.info( + f"Reconfiguring donor replica set to add non-voting recipient nodes: {repl_config}") + donor_client.admin.command( + {"replSetReconfig": repl_config, "maxTimeMS": self.AWAIT_REPL_TIMEOUT_MINS * 60 * 1000}) + + # Wait for recipient nodes to become secondaries + self._await_recipient_nodes() + + def _await_recipient_nodes(self): + """Wait for recipient nodes to become available.""" + recipient_nodes = self.get_recipient_nodes() + for recipient_node in recipient_nodes: + client = recipient_node.mongo_client(read_preference=pymongo.ReadPreference.SECONDARY) + while True: + self.logger.info( + f"Waiting for secondary on port {recipient_node.port} to become available.") + try: + is_secondary = client.admin.command("isMaster")["secondary"] + if is_secondary: + break + except pymongo.errors.OperationFailure as err: + if err.code != ShardSplitFixture._INTERRUPTED_DUE_TO_STORAGE_CHANGE: + raise + time.sleep(0.1) # Wait a little bit before trying again. + self.logger.info(f"Secondary on port {recipient_node.port} is now available.") + + def replace_donor_with_recipient(self, recipient_set_name): + """Replace the current donor with the newly initiated recipient.""" + self.logger.info("Replacing donor replica set with recipient replica set.") + + retired_donor_rs = self.get_donor_rs() + self.logger.info(f"Retiring old donor replica set '{retired_donor_rs.replset_name}'.") + with self.__lock: + self._retired_donors.append(retired_donor_rs) + + self.logger.info( + f"Making new donor replica set '{recipient_set_name}' from existing recipient nodes.") + mongod_options = self.common_mongod_options.copy() + mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, recipient_set_name) + mongod_options["serverless"] = True + new_donor_rs = self.fixturelib.make_fixture( + "ReplicaSetFixture", self.logger, self.job_num, mongod_options=mongod_options, + preserve_dbpath=self.preserve_dbpath, num_nodes=self.num_nodes_per_replica_set, + auth_options=self.auth_options, replset_config_options=self.replset_config_options, + mixed_bin_versions=self.mixed_bin_versions, + replicaset_logging_prefix=recipient_set_name, all_nodes_electable=True, + replset_name=recipient_set_name, existing_nodes=self.get_recipient_nodes()) + + new_donor_rs.get_primary() # Awaits an election of a primary + + self.logger.info("Replacing internal fixtures with new donor replica set.") + with self.__lock: + self.fixtures = [new_donor_rs] diff --git a/buildscripts/resmokelib/testing/hooks/shard_split.py b/buildscripts/resmokelib/testing/hooks/shard_split.py new file mode 100644 index 00000000000..d9e8e307d56 --- /dev/null +++ b/buildscripts/resmokelib/testing/hooks/shard_split.py @@ -0,0 +1,486 @@ +"""Test hook that runs shard splits continuously.""" + +import copy +import random +import threading +import time +import uuid + +import bson +import pymongo.errors + +from buildscripts.resmokelib import errors +from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface +from buildscripts.resmokelib.testing.fixtures import shard_split +from buildscripts.resmokelib.testing.fixtures.replicaset import ReplicaSetFixture +from buildscripts.resmokelib.testing.hooks import interface + + +class ContinuousShardSplit(interface.Hook): # pylint: disable=too-many-instance-attributes + """Starts a shard split thread at the beginning of each test.""" + + DESCRIPTION = ("Continuous shard split operations") + + IS_BACKGROUND = True + AWAIT_REPL_TIMEOUT_MINS = ReplicaSetFixture.AWAIT_REPL_TIMEOUT_MINS + + def __init__(self, hook_logger, fixture, shell_options): + """Initialize the ContinuousShardSplit. + + Args: + hook_logger: the logger instance for this hook. + fixture: the target ShardSplitFixture containing the donor replica set. + shell_options: contains the global_vars which contains TestData.tenantIds to be used for + shard splits. + + """ + interface.Hook.__init__(self, hook_logger, fixture, ContinuousShardSplit.DESCRIPTION) + + if not isinstance(fixture, shard_split.ShardSplitFixture): + raise ValueError("The ContinuousShardSplit hook requires a ShardSplitFixture") + self._shard_split_fixture = fixture + self._shell_options = copy.deepcopy(shell_options) + self._shard_split_thread = None + + def before_suite(self, test_report): + """Before suite.""" + if not self._shard_split_fixture: + raise ValueError("No ShardSplitFixture to run shard splits on") + self.logger.info("Starting the shard split thread.") + self._shard_split_thread = _ShardSplitThread(self.logger, self._shard_split_fixture, + self._shell_options, test_report) + self._shard_split_thread.start() + + def after_suite(self, test_report, teardown_flag=None): + """After suite.""" + self.logger.info("Stopping the shard split thread.") + self._shard_split_thread.stop() + self.logger.info("Stopped the shard split thread.") + + def before_test(self, test, test_report): + """Before test.""" + self.logger.info("Resuming the shard split thread.") + self._shard_split_thread.resume(test) + + def after_test(self, test, test_report): + """After test.""" + self.logger.info("Pausing the shard split thread.") + self._shard_split_thread.pause() + self.logger.info("Paused the shard split thread.") + + +class ShardSplitLifeCycle(object): + """Class for managing the various states of the shard split thread. + + The job thread alternates between calling mark_test_started() and mark_test_finished(). The + shard split thread is allowed to perform splits at any point between these two calls. + Note that the job thread synchronizes with the shard split thread outside the context of + this object to know it isn't in the process of running a split. + """ + + _TEST_STARTED_STATE = "start" + _TEST_FINISHED_STATE = "finished" + + def __init__(self): + """Initialize the ShardSplitLifeCycle instance.""" + self.__lock = threading.Lock() + self.__cond = threading.Condition(self.__lock) + + self.test_num = 0 + self.__test_state = self._TEST_FINISHED_STATE + self.__should_stop = False + + def mark_test_started(self): + """Signal to the shard split thread that a new test has started. + + This function should be called during before_test(). Calling it causes the + wait_for_shard_split_permitted() function to no longer block and to instead return + true. + """ + with self.__lock: + self.test_num += 1 + self.__test_state = self._TEST_STARTED_STATE + self.__cond.notify_all() + + def mark_test_finished(self): + """Signal to the shard split thread that the current test has finished. + + This function should be called during after_test(). Calling it causes the + wait_for_shard_split_permitted() function to block until mark_test_started() is called + again. + """ + with self.__lock: + self.__test_state = self._TEST_FINISHED_STATE + self.__cond.notify_all() + + def stop(self): + """Signal to the shard split thread that it should exit. + + This function should be called during after_suite(). Calling it causes the + wait_for_shard_split_permitted() function to no longer block and to instead return + false. + """ + with self.__lock: + self.__should_stop = True + self.__cond.notify_all() + + def wait_for_shard_split_permitted(self): + """Block until splits are permitted, or until stop() is called.""" + with self.__lock: + while not self.__should_stop: + if self.__test_state == self._TEST_STARTED_STATE: + return True + + self.__cond.wait() + + return False + + def wait_for_shard_split_interval(self, timeout): + """Block for 'timeout' seconds, or until stop() is called.""" + with self.__lock: + self.__cond.wait(timeout) + + def poll_for_idle_request(self): # noqa: D205,D400 + """Return true if the shard split thread should continue running splits, or false + if it should temporarily stop running splits. + """ + with self.__lock: + return self.__test_state == self._TEST_FINISHED_STATE + + +class _ShardSplitOptions: + def __init__( # pylint: disable=too-many-arguments + self, logger, shard_split_fixture, tenant_ids, recipient_tag_name, recipient_set_name): + self.logger = logger + self.migration_id = uuid.uuid4() + self.shard_split_fixture = shard_split_fixture + self.tenant_ids = tenant_ids + self.recipient_tag_name = recipient_tag_name + self.recipient_set_name = recipient_set_name + + def get_migration_id_as_binary(self): + """Return the migration id as BSON Binary.""" + return bson.Binary(self.migration_id.bytes, 4) + + def get_donor_rs(self): + """Return the current donor for the split fixture.""" + return self.shard_split_fixture.get_donor_rs() + + def get_donor_name(self): + """Return the replica set name for the donor.""" + return self.get_donor_rs().replset_name + + def get_donor_primary(self): + """Return a connection to the donor primary.""" + return self.get_donor_rs().get_primary(timeout_secs=self.AWAIT_REPL_TIMEOUT_MINS) + + def get_donor_nodes(self): + """Return the nodes for the current shard split fixture donor.""" + return self.get_donor_rs().nodes + + def get_recipient_nodes(self): + """Return the recipient nodes for the shard split fixture.""" + return self.shard_split_fixture.get_recipient_nodes() + + def __str__(self): + opts = { + "migration_id": self.migration_id, "tenant_ids": self.tenant_ids, + "donor": self.get_donor_name(), "recipientSetName": self.recipient_set_name, + "recipientTagName": self.recipient_tag_name + } + return str(opts) + + +class _ShardSplitThread(threading.Thread): # pylint: disable=too-many-instance-attributes + THREAD_NAME = "ShardSplitThread" + + WAIT_SECS_RANGES = [[0.05, 0.1], [0.1, 0.5], [1, 5], [5, 15]] + POLL_INTERVAL_SECS = 0.1 + + NO_SUCH_MIGRATION_ERR_CODE = 327 + INTERNAL_ERR_CODE = 1 + + def __init__(self, logger, shard_split_fixture, shell_options, test_report): + """Initialize _ShardSplitThread.""" + threading.Thread.__init__(self, name=self.THREAD_NAME) + self.daemon = True + self.logger = logger + self._shard_split_fixture = shard_split_fixture + self._tenant_ids = shell_options["global_vars"]["TestData"]["tenantIds"] + self._auth_options = shell_options["global_vars"]["TestData"]["authOptions"] + self._test = None + self._test_report = test_report + self._shell_options = shell_options + + self.__lifecycle = ShardSplitLifeCycle() + # Event set when the thread has been stopped using the 'stop()' method. + self._is_stopped_evt = threading.Event() + # Event set when the thread is not performing shard splits. + self._is_idle_evt = threading.Event() + self._is_idle_evt.set() + + def run(self): + """Execute the thread.""" + if not self._shard_split_fixture: + self.logger.warning("No ShardSplitFixture to run shard splits on.") + return + + split_count = 0 + + try: + while True: + self._is_idle_evt.set() + + permitted = self.__lifecycle.wait_for_shard_split_permitted() + if not permitted: + break + + if split_count >= 1: # TODO(SERVER-65042): Remove this check and run multiple splits + time.sleep(self.POLL_INTERVAL_SECS) + continue + + self._is_idle_evt.clear() + + split_opts = self._create_split_opts(split_count) + + # Set up the donor for a split + self._shard_split_fixture.add_recipient_nodes(split_opts.recipient_set_name) + + # Briefly wait to let the test run before starting the split operation, so that + # the first split is more likely to have data to migrate. + wait_secs = random.uniform( + *self.WAIT_SECS_RANGES[split_count % len(self.WAIT_SECS_RANGES)]) + self.logger.info(f"Waiting for {wait_secs} seconds before starting split.") + self.__lifecycle.wait_for_shard_split_interval(wait_secs) + + self.logger.info(f"Starting shard split: {str(split_opts)}.") + start_time = time.time() + is_committed = self._run_shard_split(split_opts) + end_time = time.time() + + split_count += 1 + self.logger.info( + f"Completed shard split {str(split_opts)} in {(end_time - start_time) * 1000} ms." + ) + + # set up the fixture for the next split operation + if is_committed: + self._shard_split_fixture.replace_donor_with_recipient( + split_opts.recipient_set_name) + else: + self._shard_split_fixture.remove_recipient_nodes() + + found_idle_request = self.__lifecycle.poll_for_idle_request() + if found_idle_request: + continue + except Exception: # pylint: disable=W0703 + # Proactively log the exception when it happens so it will be flushed immediately. + self.logger.exception("Shard split thread threw exception") + # The event should be signaled whenever the thread is not performing shard splits. + self._is_idle_evt.set() + + def stop(self): + """Stop the thread when the suite finishes.""" + self.__lifecycle.stop() + self._is_stopped_evt.set() + # Unpause to allow the thread to finish. + self.resume(self._test) + self.join() + + def pause(self): + """Pause the thread after test.""" + self.__lifecycle.mark_test_finished() + + # Wait until we are no longer executing splits. + self._is_idle_evt.wait() + # Check if the thread is alive in case it has thrown an exception while running. + self._check_thread() + + # Check that the fixture is still running. + if not self._shard_split_fixture.is_running(): + raise errors.ServerFailure( + f"ShardSplitFixture with pids {self._shard_split_fixture.pids()} expected to be running in" + " ContinuousShardSplit, but wasn't.") + + def resume(self, test): + """Resume the thread before test.""" + self._test = test + self.__lifecycle.mark_test_started() + + def _wait(self, timeout): + """Wait until stop or timeout.""" + self._is_stopped_evt.wait(timeout) + + def short_name(self): + """Return the name of the thread.""" + return self.THREAD_NAME + + def _check_thread(self): + """Throw an error if the thread is not running.""" + if not self.is_alive(): + msg = "Shard split thread is not running." + self.logger.error(msg) + raise errors.ServerFailure(msg) + + def _is_fail_point_abort_reason(self, abort_reason): + return abort_reason["code"] == self.INTERNAL_ERR_CODE and abort_reason[ + "errmsg"] == "simulate a shard split error" + + def _create_split_opts(self, split_count): + recipient_set_name = f"rs{split_count+1}" + recipient_tag_name = "recipientNode" + return _ShardSplitOptions(self.logger, self._shard_split_fixture, self._tenant_ids, + recipient_tag_name, recipient_set_name) + + def _create_client(self, node): + return fixture_interface.authenticate(node.mongo_client(), self._auth_options) + + def _run_shard_split(self, split_opts): # noqa: D205,D400 + try: + donor_client = self._create_client(split_opts.get_donor_rs()) + res = self._commit_shard_split(donor_client, split_opts) + is_committed = res["state"] == "committed" + + self._forget_shard_split(donor_client, split_opts) + self._wait_for_garbage_collection(split_opts) + + if not res["ok"]: + raise errors.ServerFailure( + f"Shard split '{split_opts.migration_id}' on replica set " + f"'{split_opts.get_donor_name()}' failed: {str(res)}") + + if is_committed: + return True + + abort_reason = res["abortReason"] + if self._is_fail_point_abort_reason(abort_reason): + self.logger.info( + f"Shard split '{split_opts.migration_id}' on replica set " + f"'{split_opts.get_donor_name()}' aborted due to failpoint: {str(res)}.") + return False + raise errors.ServerFailure( + f"Shard split '{str(split_opts.migration_id)}' with donor replica set " + f"'{split_opts.get_donor_name()}' aborted due to an error: {str(res)}") + except pymongo.errors.PyMongoError: + self.logger.exception( + f"Error running shard split '{split_opts.migration_id}' with donor primary on " + f"replica set '{split_opts.get_donor_name()}'.") + raise + + def _commit_shard_split(self, donor_client, split_opts): # noqa: D205,D400 + self.logger.info(f"Starting shard split '{split_opts.migration_id}' on replica set " + f"'{split_opts.get_donor_name()}'.") + + while True: + try: + res = donor_client.admin.command({ + "commitShardSplit": 1, "migrationId": split_opts.get_migration_id_as_binary(), + "tenantIds": split_opts.tenant_ids, + "recipientTagName": split_opts.recipient_tag_name, "recipientSetName": + split_opts.recipient_set_name + }, bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE)) + + if res["state"] == "committed": + self.logger.info(f"Shard split '{split_opts.migration_id}' on replica set " + f"'{split_opts.get_donor_name()}' has committed.") + return res + if res["state"] == "aborted": + self.logger.info(f"Shard split '{split_opts.migration_id}' on replica set " + f"'{split_opts.get_donor_name()}' has aborted: {str(res)}.") + return res + if not res["ok"]: + self.logger.info(f"Shard split '{split_opts.migration_id}' on replica set " + f"'{split_opts.get_donor_name()}' has failed: {str(res)}.") + return res + except pymongo.errors.ConnectionFailure: + self.logger.info( + f"Retrying shard split '{split_opts.migration_id}' against replica set " + f"'{split_opts.get_donor_name()}'.") + + def _forget_shard_split(self, donor_client, split_opts): + self.logger.info(f"Forgetting shard split '{split_opts.migration_id}' on replica set " + f"'{split_opts.get_donor_name()}'.") + + while True: + try: + donor_client.admin.command( + {"forgetShardSplit": 1, "migrationId": split_opts.get_migration_id_as_binary()}, + bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE)) + return + except pymongo.errors.ConnectionFailure: + self.logger.info( + f"Retrying forget shard split '{split_opts.migration_id}' against replica " + f"set '{split_opts.get_donor_name()}'.") + continue + except pymongo.errors.OperationFailure as err: + if err.code != self.NO_SUCH_MIGRATION_ERR_CODE: + raise + + # The fixture was restarted. + self.logger.info( + f"Could not find shard split '{split_opts.migration_id}' on donor primary on " + f"replica set '{split_opts.get_donor_name()}': {str(err)}.") + return + except pymongo.errors.PyMongoError: + self.logger.exception( + f"Error forgetting shard split '{split_opts.migration_id}' on donor primary on " + f"replica set '{split_opts.get_donor_name()}'.") + raise + + def _wait_for_garbage_collection(self, split_opts): # noqa: D205,D400 + try: + donor_nodes = split_opts.get_donor_nodes() + for donor_node in donor_nodes: + self.logger.info( + f"Waiting for shard split '{split_opts.migration_id}' to be garbage collected on donor node on port {donor_node.port} of replica set '{split_opts.get_donor_name()}'." + ) + + donor_node_client = self._create_client(donor_node) + while True: + try: + res = donor_node_client.config.command({ + "count": "tenantSplitDonors", + "query": {"tenantIds": split_opts.tenant_ids} + }) + if res["n"] == 0: + break + except pymongo.errors.ConnectionFailure: + self.logger.info( + f"Retrying waiting for shard split '{split_opts.migration_id}' to be garbage collected on donor node on port {donor_node.port} of replica set '{split_opts.get_donor_name()}'." + ) + continue + time.sleep(self.POLL_INTERVAL_SECS) + + recipient_nodes = split_opts.get_recipient_nodes() + for recipient_node in recipient_nodes: + self.logger.info( + f"Waiting for shard split '{split_opts.migration_id}' to be garbage collected on recipient node on port {recipient_node.port} of replica set '{split_opts.recipient_set_name}'." + ) + + recipient_node_client = self._create_client(recipient_node) + while True: + try: + hello = recipient_node_client.admin.command("hello") + print(f"recipient hello: {hello}") + + res = recipient_node_client.config.command({ + "count": "tenantSplitDonors", + "query": {"tenantIds": split_opts.tenant_ids} + }) + if res["n"] == 0: + break + + docs = recipient_node_client.config.tenantSplitDonors.find() + print(f"recipient tenantSplitDonors docs: {list(docs)}") + except pymongo.errors.ConnectionFailure: + self.logger.info( + f"Retrying waiting for shard split '{split_opts.migration_id}' to be garbage collected on recipient node on port {recipient_node.port} of replica set '{split_opts.recipient_set_name}'." + ) + continue + time.sleep(self.POLL_INTERVAL_SECS) + + except pymongo.errors.PyMongoError: + self.logger.exception( + f"Error waiting for shard split '{split_opts.migration_id}' from donor replica set '{split_opts.get_donor_name()} to recipient replica set '{split_opts.recipient_set_name}' to be garbage collected." + ) + raise diff --git a/etc/evergreen.yml b/etc/evergreen.yml index 249c3a2e4ce..1ba31d24ad7 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -1621,6 +1621,7 @@ buildvariants: - name: .serverless - name: .updatefuzzer - name: secondary_reads_passthrough_gen + - name: .shard_split # Disabling as the following tests are not aware of feature flags. # - name: server_discovery_and_monitoring_json_test_TG # distros: @@ -2371,6 +2372,7 @@ buildvariants: - name: .serverless - name: .watchdog - name: .stitch + - name: .shard_split # Disabling the following tests as they are not aware of feature flags. # - name: .benchmarks # - name: unittest_shell_hang_analyzer_gen @@ -2520,6 +2522,7 @@ buildvariants: - name: .serverless - name: .updatefuzzer - name: watchdog_wiredtiger + - name: .shard_split # Disabling the following tests as they are not aware of feature flags. # - name: .benchmarks # - name: server_discovery_and_monitoring_json_test_TG diff --git a/etc/evergreen_yml_components/definitions.yml b/etc/evergreen_yml_components/definitions.yml index 40d62593132..ab1f86e129a 100644 --- a/etc/evergreen_yml_components/definitions.yml +++ b/etc/evergreen_yml_components/definitions.yml @@ -5078,6 +5078,15 @@ tasks: vars: use_large_distro: "true" +# TODO(SERVER-65178): Add "serverless" tag to shard split passthrough when no longer feature flagged +- <<: *gen_task_template + name: shard_split_jscore_passthrough_gen + tags: ["shard_split"] + commands: + - func: "generate resmoke tasks" + vars: + use_large_distro: "true" + - <<: *gen_task_template name: tenant_migration_jscore_passthrough_gen tags: ["serverless"] |