From 55f4d89a5b15ed14d930c7b06cb39e299d886406 Mon Sep 17 00:00:00 2001 From: mathisbessamdb Date: Thu, 15 Sep 2022 19:38:33 +0000 Subject: SERVER-66147 add support of vanilla passthrough test for shard merge protocol --- .../suites/shard_merge_jscore_passthrough.yml | 131 ++++ ...tion_causally_consistent_jscore_passthrough.yml | 2 - .../suites/tenant_migration_jscore_passthrough.yml | 2 - ...t_migration_kill_primary_jscore_passthrough.yml | 2 - ...migration_multi_stmt_txn_jscore_passthrough.yml | 2 - ...enant_migration_stepdown_jscore_passthrough.yml | 2 - ...ration_terminate_primary_jscore_passthrough.yml | 2 - .../resmokelib/testing/fixtures/shard_merge.py | 193 ++++++ .../resmokelib/testing/hooks/shard_merge.py | 740 +++++++++++++++++++++ .../resmokelib/testing/hooks/tenant_migration.py | 40 -- etc/evergreen.yml | 3 + etc/evergreen_yml_components/definitions.yml | 9 + jstests/core/batched_multi_deletes.js | 4 +- jstests/hooks/run_check_tenant_migration_dbhash.js | 16 +- .../libs/override_methods/inject_tenant_prefix.js | 39 +- 15 files changed, 1125 insertions(+), 62 deletions(-) create mode 100644 buildscripts/resmokeconfig/suites/shard_merge_jscore_passthrough.yml create mode 100644 buildscripts/resmokelib/testing/fixtures/shard_merge.py create mode 100644 buildscripts/resmokelib/testing/hooks/shard_merge.py diff --git a/buildscripts/resmokeconfig/suites/shard_merge_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/shard_merge_jscore_passthrough.yml new file mode 100644 index 00000000000..1aeaae20f8c --- /dev/null +++ b/buildscripts/resmokeconfig/suites/shard_merge_jscore_passthrough.yml @@ -0,0 +1,131 @@ +config_variables: +- &keyFile jstests/libs/authTestsKey +- &keyFileData Thiskeyisonlyforrunningthesuitewithauthenticationdontuseitinanytestsdirectly +- &authOptions + authenticationDatabase: admin + authenticationMechanism: SCRAM-SHA-256 + password: *keyFileData + username: __system + +test_kind: js_test + +selector: + roots: + - jstests/core/**/*.js + exclude_files: + - jstests/core/txns/**/*.js + # Skip any tests that run with auth explicitly. + - jstests/core/*[aA]uth*.js + + exclude_with_any_tags: + - assumes_standalone_mongod + # These tests run getMore commands which are not supported in the tenant migration passthrough. + - requires_getmore + # Due to background tenant migrations, operations in the main test shell are not guaranteed to + # be causally consistent with operations in a parallel shell. The reason is that + # TenantMigrationCommitted error is only thrown when the client does a write or a atClusterTime/ + # afterClusterTime or linearlizable read. Therefore, one of shell may not be aware that the + # migration has occurred and would not forward the read/write command to the right replica set. + - uses_parallel_shell + # Profile settings are stored in-memory only so are not transferred to the recipient. + - requires_profiling + # capped collections are banned in Serverless + - requires_capped + # emptycapped command is blocked during tenant migration. + - requires_emptycapped + # Multi-updates that conflict with tenant migration are not retried by inject_tenant_prefix.js. + - requires_multi_updates + # Full validation can cause ongoing queries to fail. This can affect the tenant migration + # process. + - uses_full_validation + - tenant_migration_incompatible + - requires_timeseries # Tenant migration not supported + - requires_fastcount + +executor: + archive: + tests: true + hooks: + - CheckReplDBHash + - CheckReplOplogs + - ValidateCollections + config: + shell_options: + eval: >- + testingReplication = true; + load('jstests/libs/override_methods/inject_tenant_prefix.js'); + jsTest.authenticate(db.getMongo()); + global_vars: + TestData: &TestData + # TODO SERVER-69034 : replace tenantId with tenantIds. + tenantId: "tenantMigrationTenantId" + auth: true + authMechanism: SCRAM-SHA-256 + keyFile: *keyFile + keyFileData: *keyFileData + roleGraphInvalidationIsFatal: true + # TODO SERVER-68643 No longer needed when we only use shard merge protocol. + useLocalDBForDBCheck: true + # Shard Merge does not support fastcount. When we call CheckTenantMigrationDBHash after + # each migration we would potentially fail when enforcing using the fast count during that + # process. + skipEnforceFastCountOnValidate: true + hooks: + - class: ContinuousShardMerge + shell_options: + global_vars: + TestData: + <<: *TestData + authOptions: *authOptions + # The CheckReplDBHash hook waits until all operations have replicated to and have been applied + # on the secondaries, so we run the ValidateCollections hook after it to ensure we're + # validating the entire contents of the collection. + - class: CheckReplOplogs + shell_options: + global_vars: + TestData: *TestData + - class: CheckReplDBHash + shell_options: + global_vars: + TestData: *TestData + - class: ValidateCollections + shell_options: + global_vars: + TestData: *TestData + - class: CleanEveryN + n: 20 + fixture: + class: ShardMergeFixture + common_mongod_options: + set_parameters: + enableTestCommands: 1 + failpoint.abortTenantMigrationBeforeLeavingBlockingState: + mode: + activationProbability: 0.5 + failpoint.pauseTenantMigrationBeforeLeavingBlockingState: + mode: alwaysOn + data: + blockTimeMS: 250 + # To avoid routing commands in each test incorrectly, the ContinuousShardMerge hook + # only runs donorForgetMigration against the donor of each migration when it is safe to do + # so. Therefore, the garbage collection delay doesn't need to be large. + tenantMigrationGarbageCollectionDelayMS: 1 + ttlMonitorSleepSecs: 1 + # Tenant migrations is not currently compatible with implicitly replicated retryable + # findAndModify images. + storeFindAndModifyImagesInSideCollection: false + minSnapshotHistoryWindowInSeconds: 30 + tlsMode: allowTLS + tlsCAFile: jstests/libs/ca.pem + tlsAllowInvalidHostnames: '' + auth: '' + keyFile: *keyFile + syncdelay: 1 + per_mongod_options: + # Each entry is for a different replica set's extra mongod options. + - tlsCertificateKeyFile: jstests/libs/rs0.pem + - tlsCertificateKeyFile: jstests/libs/rs1.pem + num_replica_sets: 2 + num_nodes_per_replica_set: 3 + use_replica_set_connection_string: true + auth_options: *authOptions diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_causally_consistent_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_causally_consistent_jscore_passthrough.yml index d2361171250..92f487b9a5b 100644 --- a/buildscripts/resmokeconfig/suites/tenant_migration_causally_consistent_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/tenant_migration_causally_consistent_jscore_passthrough.yml @@ -91,8 +91,6 @@ executor: keyFile: *keyFile keyFileData: *keyFileData roleGraphInvalidationIsFatal: true - # TODO SERVER-67860: remove the ignoreShardMergeFeatureFlag - ignoreShardMergeFeatureFlag: true # overrides the featureFlagShardMerge hooks: - class: ContinuousTenantMigration shell_options: diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml index 7eed50054a0..b026e57b6ac 100644 --- a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml @@ -62,8 +62,6 @@ executor: keyFile: *keyFile keyFileData: *keyFileData roleGraphInvalidationIsFatal: true - # TODO SERVER-66147: remove the ignoreShardMergeFeatureFlag - ignoreShardMergeFeatureFlag: true # overrides the featureFlagShardMerge hooks: - class: ContinuousTenantMigration shell_options: diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_kill_primary_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_kill_primary_jscore_passthrough.yml index 85576413baf..45dba58bf3c 100644 --- a/buildscripts/resmokeconfig/suites/tenant_migration_kill_primary_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/tenant_migration_kill_primary_jscore_passthrough.yml @@ -184,8 +184,6 @@ executor: readPreference: mode: "primary" retryWrites: true - # TODO SERVER-68643: remove the ignoreShardMergeFeatureFlag - ignoreShardMergeFeatureFlag: true # overrides the featureFlagShardMerge # We specify nodb so the shell used by each test will attempt to connect after loading the # retry logic in auto_retry_on_network_error.js. nodb: "" diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_multi_stmt_txn_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_multi_stmt_txn_jscore_passthrough.yml index cbf36bb2b23..82a680d0e0d 100644 --- a/buildscripts/resmokeconfig/suites/tenant_migration_multi_stmt_txn_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/tenant_migration_multi_stmt_txn_jscore_passthrough.yml @@ -282,8 +282,6 @@ executor: # Tests in this suite only read from primaries and only one node is electable, so causal # consistency is not required to read your own writes. causalConsistency: false - # TODO SERVER-67860: remove the ignoreShardMergeFeatureFlag - ignoreShardMergeFeatureFlag: true # overrides the featureFlagShardMerge hooks: - class: ContinuousTenantMigration shell_options: diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_stepdown_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_stepdown_jscore_passthrough.yml index 1911b9782b8..f3946ff0211 100644 --- a/buildscripts/resmokeconfig/suites/tenant_migration_stepdown_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/tenant_migration_stepdown_jscore_passthrough.yml @@ -174,8 +174,6 @@ executor: readPreference: mode: "primary" retryWrites: true - # TODO SERVER-68643: remove the ignoreShardMergeFeatureFlag - ignoreShardMergeFeatureFlag: true # overrides the featureFlagShardMerge # We specify nodb so the shell used by each test will attempt to connect after loading the # retry logic in auto_retry_on_network_error.js. nodb: "" diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_terminate_primary_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_terminate_primary_jscore_passthrough.yml index 12930d5debf..2aa00ffca65 100644 --- a/buildscripts/resmokeconfig/suites/tenant_migration_terminate_primary_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/tenant_migration_terminate_primary_jscore_passthrough.yml @@ -174,8 +174,6 @@ executor: readPreference: mode: "primary" retryWrites: true - # TODO SERVER-68643: remove the ignoreShardMergeFeatureFlag - ignoreShardMergeFeatureFlag: true # overrides the featureFlagShardMerge # We specify nodb so the shell used by each test will attempt to connect after loading the # retry logic in auto_retry_on_network_error.js. nodb: "" diff --git a/buildscripts/resmokelib/testing/fixtures/shard_merge.py b/buildscripts/resmokelib/testing/fixtures/shard_merge.py new file mode 100644 index 00000000000..28fea75cb0d --- /dev/null +++ b/buildscripts/resmokelib/testing/fixtures/shard_merge.py @@ -0,0 +1,193 @@ +"""Fixture with multiple replica sets for executing JSTests against.""" + +import os.path + +import buildscripts.resmokelib.testing.fixtures.interface as interface +from buildscripts.resmokelib.testing.fixtures.fixturelib import FixtureLib + + +class ShardMergeFixture(interface.MultiClusterFixture): # pylint: disable=too-many-instance-attributes + """Fixture which provides JSTests with a set of replica sets to run tenant migration against.""" + + def __init__( # pylint: disable=too-many-arguments,too-many-locals + self, logger, job_num, fixturelib, common_mongod_options=None, per_mongod_options=None, + dbpath_prefix=None, preserve_dbpath=False, num_replica_sets=1, + num_nodes_per_replica_set=2, start_initial_sync_node=False, + write_concern_majority_journal_default=None, auth_options=None, + replset_config_options=None, voting_secondaries=True, all_nodes_electable=False, + use_replica_set_connection_string=None, linear_chain=False, mixed_bin_versions=None, + default_read_concern=None, default_write_concern=None): + """Initialize ShardMergeFixture with different options for the replica set processes.""" + + interface.MultiClusterFixture.__init__(self, logger, job_num, fixturelib, + dbpath_prefix=dbpath_prefix) + + self.common_mongod_options = self.fixturelib.default_if_none(common_mongod_options, {}) + self.per_mongod_options = self.fixturelib.default_if_none(per_mongod_options, {}) + self.preserve_dbpath = preserve_dbpath + self.start_initial_sync_node = start_initial_sync_node + self.write_concern_majority_journal_default = write_concern_majority_journal_default + self.auth_options = auth_options + self.replset_config_options = self.fixturelib.default_if_none(replset_config_options, {}) + self.voting_secondaries = voting_secondaries + self.all_nodes_electable = all_nodes_electable + self.use_replica_set_connection_string = use_replica_set_connection_string + self.default_read_concern = default_read_concern + self.default_write_concern = default_write_concern + self.mixed_bin_versions = self.fixturelib.default_if_none(mixed_bin_versions, + self.config.MIXED_BIN_VERSIONS) + self.mixed_bin_versions_config = self.mixed_bin_versions + + # Use the values given from the command line if they exist for linear_chain and num_nodes. + linear_chain_option = self.fixturelib.default_if_none(self.config.LINEAR_CHAIN, + linear_chain) + self.linear_chain = linear_chain_option if linear_chain_option else linear_chain + self.num_nodes_per_replica_set = num_nodes_per_replica_set if num_nodes_per_replica_set \ + else self.config.NUM_REPLSET_NODES + self.num_replica_sets = num_replica_sets if num_replica_sets else self.config.NUM_REPLSETS + if self.num_replica_sets < 2: + raise ValueError("num_replica_sets must be greater or equal to 2") + + self.replica_sets = [] + + if not self.replica_sets: + for i in range(self.num_replica_sets): + rs_name = f"rs{i}" + mongod_options = self.common_mongod_options.copy() + mongod_options.update(self.per_mongod_options[i]) + mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, rs_name) + mongod_options["serverless"] = True + + self.replica_sets.append( + self.fixturelib.make_fixture( + "ReplicaSetFixture", self.logger, self.job_num, + mongod_options=mongod_options, preserve_dbpath=self.preserve_dbpath, + num_nodes=self.num_nodes_per_replica_set, auth_options=self.auth_options, + replset_config_options=self.replset_config_options, + mixed_bin_versions=self.mixed_bin_versions, + replicaset_logging_prefix=rs_name, + use_replica_set_connection_string=self.use_replica_set_connection_string, + all_nodes_electable=self.all_nodes_electable, replset_name=rs_name)) + + # The ReplicaSetFixture for the replica set that starts out owning the data (i.e. the + # replica set that driver should connect to when running commands). + self.replica_set_with_tenant = self.replica_sets[0] + + def pids(self): + """:return: pids owned by this fixture if any.""" + out = [] + for replica_set in self.replica_sets: + out.extend(replica_set.pids()) + if not out: + self.logger.debug('No replica sets when gathering multi replicaset fixture pids.') + return out + + def setup(self): + """Set up the replica sets.""" + for replica_set in self.replica_sets: + replica_set.setup() + self._create_shard_merge_donor_and_recipient_roles(replica_set) + + def await_ready(self): + """Block until the fixture can be used for testing.""" + # Wait for each of the replica sets + for replica_set in self.replica_sets: + replica_set.await_ready() + + def _do_teardown(self, mode=None): + """Shut down the replica sets.""" + self.logger.info("Stopping all replica sets...") + + running_at_start = self.is_running() + if not running_at_start: + self.logger.warning("All replica sets were expected to be running, but weren't.") + + teardown_handler = interface.FixtureTeardownHandler(self.logger) + + for replica_set in self.replica_sets: + teardown_handler.teardown(replica_set, "replica_set", mode=mode) + + if teardown_handler.was_successful(): + self.logger.info("Successfully stopped all replica sets.") + else: + self.logger.error("Stopping the fixture failed.") + raise self.fixturelib.ServerFailure(teardown_handler.get_error_message()) + + def is_running(self): + """Return true if all replica sets are still operating.""" + return all(replica_set.is_running() for replica_set in self.replica_sets) + + def get_num_replsets(self): + """Return the number of replica sets.""" + return self.num_replica_sets + + def get_replset(self, index): + """Return the ReplicaSetFixture for the replica set at the given index.""" + if not self.replica_sets: + raise ValueError("Must call setup() before calling get_replset") + return self.replica_sets[index] + + def get_replsets(self): + """Return the ReplicaSetFixtures for all the replica sets.""" + if not self.replica_sets: + raise ValueError("Must call setup() before calling get_replsets") + return self.replica_sets + + def get_internal_connection_string(self): + """Return the internal connection string to the replica set that currently starts out owning the data.""" + if not self.replica_sets: + raise ValueError("Must call setup() before calling get_internal_connection_string()") + return self.replica_set_with_tenant.get_internal_connection_string() + + def get_driver_connection_url(self): + """Return the driver connection URL to the replica set that currently starts out owning the data.""" + if not self.replica_set_with_tenant: + raise ValueError("Must call setup() before calling get_driver_connection_url") + return self.replica_set_with_tenant.get_driver_connection_url() + + def get_node_info(self): + """Return a list of dicts of NodeInfo objects.""" + output = [] + for replica_set in self.replica_sets: + output += replica_set.get_node_info() + return output + + def get_independent_clusters(self): + """Return the replica sets involved in the tenant migration.""" + return self.replica_sets.copy() + + def _create_shard_merge_donor_and_recipient_roles(self, rs): + """Create a role for shard merge donor and recipient.""" + primary = rs.get_primary() + primary_client = interface.authenticate(primary.mongo_client(), self.auth_options) + + try: + primary_client.admin.command({ + "createRole": "tenantMigrationDonorRole", "privileges": [{ + "resource": {"cluster": True}, "actions": ["runTenantMigration"] + }, {"resource": {"db": "admin", "collection": "system.keys"}, "actions": ["find"]}], + "roles": [] + }) + except: + self.logger.exception( + "Error creating tenant migration donor role on primary on port %d of replica" + + " set '%s'.", primary.port, rs.replset_name) + raise + + try: + primary_client.admin.command({ + "createRole": "tenantMigrationRecipientRole", + "privileges": [{ + "resource": {"cluster": True}, + "actions": ["listDatabases", "useUUID", "advanceClusterTime"] + }, {"resource": {"db": "", "collection": ""}, "actions": ["listCollections"]}, + { + "resource": {"anyResource": True}, + "actions": ["dbStats", "collStats", "find", "listIndexes"] + }], "roles": [] + }) + except: + self.logger.exception( + "Error creating tenant migration recipient role on primary on port %d of replica" + + " set '%s'.", primary.port, rs.replset_name) + raise diff --git a/buildscripts/resmokelib/testing/hooks/shard_merge.py b/buildscripts/resmokelib/testing/hooks/shard_merge.py new file mode 100644 index 00000000000..e1711ec66d4 --- /dev/null +++ b/buildscripts/resmokelib/testing/hooks/shard_merge.py @@ -0,0 +1,740 @@ +"""Test hook that runs shard merges continuously.""" + +import copy +import random +import re +import threading +import time +import uuid + +import bson +import pymongo.errors + +from buildscripts.resmokelib import errors +from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface +from buildscripts.resmokelib.testing.fixtures import shard_merge +from buildscripts.resmokelib.testing.hooks import dbhash_tenant_migration +from buildscripts.resmokelib.testing.hooks import interface + + +class ContinuousShardMerge(interface.Hook): # pylint: disable=too-many-instance-attributes + """Starts a shard merge thread at the beginning of each test.""" + + DESCRIPTION = ("Continuous shard merges") + + IS_BACKGROUND = True + + def __init__(self, hook_logger, fixture, shell_options): + """Initialize the ContinuousShardMerge. + + Args: + hook_logger: the logger instance for this hook. + fixture: the target ShardMergeFixture containing two replica sets. + shell_options: contains the global_vars which contains TestData.tenantId to be used for + shard merges. + + """ + interface.Hook.__init__(self, hook_logger, fixture, ContinuousShardMerge.DESCRIPTION) + + if not isinstance(fixture, shard_merge.ShardMergeFixture): + raise ValueError("The ContinuousShardMerge hook requires a ShardMergeFixture") + self._shard_merge_fixture = fixture + self._shell_options = copy.deepcopy(shell_options) + + self._shard_merge_thread = None + + def before_suite(self, test_report): + """Before suite.""" + if not self._shard_merge_fixture: + raise ValueError("No ShardMergeFixture to run migrations on") + self.logger.info("Starting the shard merge thread.") + self._shard_merge_thread = _ShardMergeThread(self.logger, self._shard_merge_fixture, + self._shell_options, test_report) + self._shard_merge_thread.start() + + def after_suite(self, test_report, teardown_flag=None): + """After suite.""" + self.logger.info("Stopping the shard merge thread.") + self._shard_merge_thread.stop() + self.logger.info("Stopped the shard merge thread.") + + def before_test(self, test, test_report): + """Before test.""" + self.logger.info("Resuming the shard merge thread.") + self._shard_merge_thread.resume(test) + + def after_test(self, test, test_report): + """After test.""" + self.logger.info("Pausing the shard merge thread.") + self._shard_merge_thread.pause() + self.logger.info("Paused the shard merge thread.") + + +class ShardMergeLifeCycle(object): + """Class for managing the various states of the shard merge thread. + + The job thread alternates between calling mark_test_started() and mark_test_finished(). The + shard merge thread is allowed to perform migrations at any point between these two calls. + Note that the job thread synchronizes with the shard merge thread outside the context of + this object to know it isn't in the process of running a migration. + """ + + _TEST_STARTED_STATE = "start" + _TEST_FINISHED_STATE = "finished" + + def __init__(self): + """Initialize the ShardMergeLifeCycle instance.""" + self.__lock = threading.Lock() + self.__cond = threading.Condition(self.__lock) + + self.test_num = 0 + self.__test_state = self._TEST_FINISHED_STATE + self.__should_stop = False + + def mark_test_started(self): + """Signal to the shard merge thread that a new test has started. + + This function should be called during before_test(). Calling it causes the + wait_for_shard_merge_permitted() function to no longer block and to instead return + true. + """ + with self.__lock: + self.test_num += 1 + self.__test_state = self._TEST_STARTED_STATE + self.__cond.notify_all() + + def mark_test_finished(self): + """Signal to the shard merge thread that the current test has finished. + + This function should be called during after_test(). Calling it causes the + wait_for_shard_merge_permitted() function to block until mark_test_started() is called + again. + """ + with self.__lock: + self.__test_state = self._TEST_FINISHED_STATE + self.__cond.notify_all() + + def is_test_finished(self): + """Return true if the current test has finished.""" + with self.__lock: + return self.__test_state == self._TEST_FINISHED_STATE + + def stop(self): + """Signal to the shard merge thread that it should exit. + + This function should be called during after_suite(). Calling it causes the + wait_for_shard_merge_permitted() function to no longer block and to instead return + false. + """ + with self.__lock: + self.__should_stop = True + self.__cond.notify_all() + + def wait_for_shard_merge_permitted(self): + """Block until migrations are permitted, or until stop() is called. + + Return true if migrations are permitted, and false if migrations are not permitted. + """ + with self.__lock: + while not self.__should_stop: + if self.__test_state == self._TEST_STARTED_STATE: + return True + + self.__cond.wait() + + return False + + def wait_for_tenant_migration_interval(self, timeout): + """Block for 'timeout' seconds, or until stop() is called.""" + with self.__lock: + self.__cond.wait(timeout) + + def poll_for_idle_request(self): # noqa: D205,D400 + """Return true if the shard merge thread should continue running migrations, or false + if it should temporarily stop running migrations. + """ + with self.__lock: + return self.__test_state == self._TEST_FINISHED_STATE + + +def get_certificate_and_private_key(pem_file_path): # noqa: D205,D400 + """Return a dictionary containing the certificate and private key extracted from the given pem + file. + """ + lines = open(pem_file_path, 'rt').read() + certificate = re.findall( + re.compile("(-*BEGIN CERTIFICATE-*\n(.*\n)*-*END CERTIFICATE-*\n)", re.MULTILINE), + lines)[0][0] + private_key = re.findall( + re.compile("(-*BEGIN PRIVATE KEY-*\n(.*\n)*-*END PRIVATE KEY-*\n)", re.MULTILINE), + lines)[0][0] + return {"certificate": certificate, "privateKey": private_key} + + +def get_primary(rs, logger, max_tries=5): # noqa: D205,D400 + """Return the primary from a replica set. Retries up to 'max_tries' times of it fails to get + the primary within the time limit. + """ + num_tries = 0 + while num_tries < max_tries: + num_tries += 1 + try: + return rs.get_primary() + except errors.ServerFailure: + logger.info( + "Timed out while waiting for a primary for replica set '%s' on try %d." + + " Retrying." if num_tries < max_tries else "", rs.replset_name, num_tries) + + +class _ShardMergeOptions: # pylint:disable=too-many-instance-attributes + def __init__( # pylint: disable=too-many-arguments + self, donor_rs, recipient_rs, tenant_id, read_preference, logger, donor_rs_index, + recipient_rs_index): + self.donor_rs = donor_rs + self.recipient_rs = recipient_rs + self.migration_id = uuid.uuid4() + self.tenant_id = tenant_id + self.read_preference = read_preference + self.logger = logger + self.donor_rs_index = donor_rs_index + self.recipient_rs_index = recipient_rs_index + + def get_donor_name(self): + """Return the replica set name for the donor.""" + return self.donor_rs.replset_name + + def get_recipient_name(self): + """Return the replica set name for the recipient.""" + return self.recipient_rs.replset_name + + def get_donor_primary(self): + """Return a connection to the donor primary.""" + return get_primary(self.donor_rs, self.logger) + + def get_recipient_primary(self): + """Return a connection to the recipient primary.""" + return get_primary(self.recipient_rs, self.logger) + + def get_donor_nodes(self): + """Return a list of connections to donor replica set nodes.""" + return self.donor_rs.nodes + + def get_recipient_nodes(self): + """Return a list of connections to recipient replica set nodes.""" + return self.recipient_rs.nodes + + def __str__(self): + opts = { + "donor": self.get_donor_name(), "recipient": self.get_recipient_name(), + "migration_id": self.migration_id, "tenant_id": self.tenant_id, + "read_preference": self.read_preference + } + return str(opts) + + +class _ShardMergeThread(threading.Thread): # pylint: disable=too-many-instance-attributes + THREAD_NAME = "ShardMergeThread" + + WAIT_SECS_RANGES = [[0.05, 0.1], [0.1, 0.5], [1, 5], [5, 15]] + POLL_INTERVAL_SECS = 0.1 + WAIT_PENDING_IDENT_SECS = 0.3 + + NO_SUCH_MIGRATION_ERR_CODE = 327 + INTERNAL_ERR_CODE = 1 + INVALID_SYNC_SOURCE_ERR_CODE = 119 + FAIL_TO_PARSE_ERR_CODE = 9 + NO_SUCH_KEY_ERR_CODE = 4 + + def __init__(self, logger, shard_merge_fixture, shell_options, test_report): + """Initialize _ShardMergeThread.""" + threading.Thread.__init__(self, name=self.THREAD_NAME) + self.daemon = True + self.logger = logger + self._shard_merge_fixture = shard_merge_fixture + # TODO SERVER-69034 : replace tenantId with tenantIds + self._tenant_id = shell_options["global_vars"]["TestData"]["tenantId"] + self._auth_options = shell_options["global_vars"]["TestData"]["authOptions"] + self._test = None + self._test_report = test_report + self._shell_options = shell_options + + self.__lifecycle = ShardMergeLifeCycle() + # Event set when the thread has been stopped using the 'stop()' method. + self._is_stopped_evt = threading.Event() + # Event set when the thread is not performing migrations. + self._is_idle_evt = threading.Event() + self._is_idle_evt.set() + + def run(self): + """Execute the thread.""" + if not self._shard_merge_fixture: + self.logger.warning("No ShardMergeFixture to run migrations on.") + return + + test_num = 0 + migration_num = 0 + donor_rs_index = 0 + + try: + while True: + self._is_idle_evt.set() + + permitted = self.__lifecycle.wait_for_shard_merge_permitted() + if not permitted: + break + + self._is_idle_evt.clear() + + if self.__lifecycle.test_num > test_num: + # Reset donor_rs_index to 0 since the shell always routes all requests to rs0 + # at the start of a test. + test_num = self.__lifecycle.test_num + donor_rs_index = 0 + + recipient_rs_index = ( + donor_rs_index + 1) % self._shard_merge_fixture.get_num_replsets() + migration_opts = self._create_migration_opts(donor_rs_index, recipient_rs_index) + + # Briefly wait to let the test run before starting the shard merge, so that + # the first migration is more likely to have data to migrate. + wait_secs = random.uniform( + *self.WAIT_SECS_RANGES[migration_num % len(self.WAIT_SECS_RANGES)]) + self.logger.info("Waiting for %.3f seconds before starting migration.", wait_secs) + self.__lifecycle.wait_for_tenant_migration_interval(wait_secs) + + self.logger.info("Starting shard merge: %s.", str(migration_opts)) + start_time = time.time() + is_committed = self._run_migration(migration_opts) + end_time = time.time() + self.logger.info("Completed shard merge in %0d ms: %s.", + (end_time - start_time) * 1000, str(migration_opts)) + + found_idle_request = self.__lifecycle.poll_for_idle_request() + if found_idle_request: + continue + + if is_committed: + donor_rs_index = recipient_rs_index + migration_num += 1 + except Exception: # pylint: disable=W0703 + # Proactively log the exception when it happens so it will be flushed immediately. + self.logger.exception("Shard merge thread threw exception") + # The event should be signaled whenever the thread is not performing migrations. + self._is_idle_evt.set() + + def stop(self): + """Stop the thread when the suite finishes.""" + self.__lifecycle.stop() + self._is_stopped_evt.set() + # Unpause to allow the thread to finish. + self.resume(self._test) + self.join() + + def pause(self): + """Pause the thread after test.""" + self.__lifecycle.mark_test_finished() + + # Wait until we are no longer executing migrations. + self._is_idle_evt.wait() + # Check if the thread is alive in case it has thrown an exception while running. + self._check_thread() + + # Check that the fixture is still running. + if not self._shard_merge_fixture.is_running(): + raise errors.ServerFailure("ShardMergeFixture with pids {} expected to be running in" + " ContinuousShardMerge, but wasn't".format( + self._shard_merge_fixture.pids())) + + def resume(self, test): + """Resume the thread before test.""" + self._test = test + self.__lifecycle.mark_test_started() + + def _wait(self, timeout): + """Wait until stop or timeout.""" + self._is_stopped_evt.wait(timeout) + + def short_name(self): + """Return the name of the thread.""" + return self.THREAD_NAME + + def _check_thread(self): + """Throw an error if the thread is not running.""" + if not self.is_alive(): + msg = "Shard merge thread is not running." + self.logger.error(msg) + raise errors.ServerFailure(msg) + + def _is_fail_point_abort_reason(self, abort_reason): + return abort_reason["code"] == self.INTERNAL_ERR_CODE and abort_reason[ + "errmsg"] == "simulate a shard merge error" + + def _create_migration_opts(self, donor_rs_index, recipient_rs_index): + donor_rs = self._shard_merge_fixture.get_replset(donor_rs_index) + recipient_rs = self._shard_merge_fixture.get_replset(recipient_rs_index) + read_preference = {"mode": "primary"} + return _ShardMergeOptions(donor_rs, recipient_rs, self._tenant_id, read_preference, + self.logger, donor_rs_index, recipient_rs_index) + + def _create_client(self, node): + return fixture_interface.authenticate(node.mongo_client(), self._auth_options) + + def _check_tenant_migration_dbhash(self, migration_opts): + # Set the donor connection string, recipient connection string, and migration uuid string + # for the shard merge dbhash check script. + self._shell_options[ + "global_vars"]["TestData"]["donorConnectionString"] = migration_opts.get_donor_primary( + ).get_internal_connection_string() + self._shell_options["global_vars"]["TestData"][ + "recipientConnectionString"] = migration_opts.get_recipient_primary( + ).get_internal_connection_string() + self._shell_options["global_vars"]["TestData"][ + "migrationIdString"] = migration_opts.migration_id.__str__() + + dbhash_test_case = dbhash_tenant_migration.CheckTenantMigrationDBHash( + self.logger, self._shard_merge_fixture, self._shell_options) + dbhash_test_case.before_suite(self._test_report) + dbhash_test_case.before_test(self._test, self._test_report) + dbhash_test_case.after_test(self._test, self._test_report) + dbhash_test_case.after_suite(self._test_report) + + def _run_migration(self, migration_opts): # noqa: D205,D400 + """Run a shard merge based on 'migration_opts', wait for the migration decision and + garbage collection. Return true if the migration commits and false otherwise. + """ + try: + # Clean up any orphaned tenant databases on the recipient allow next migration to start. + self._drop_tenant_databases(migration_opts.recipient_rs) + res = self._start_and_wait_for_migration(migration_opts) + is_committed = res["state"] == "committed" + + # Garbage collect the migration prior to throwing error to avoid migration conflict + # in the next test. + if is_committed: + # Once we have committed a migration, run a dbhash check before rerouting commands. + self._check_tenant_migration_dbhash(migration_opts) + + # If the migration committed, to avoid routing commands incorrectly, wait for the + # donor/proxy to reroute at least one command before doing garbage collection. Stop + # waiting when the test finishes. + self._wait_for_reroute_or_test_completion(migration_opts) + self._forget_migration(migration_opts) + self._wait_for_migration_garbage_collection(migration_opts) + + if not res["ok"]: + raise errors.ServerFailure("Shard merge '" + str(migration_opts.migration_id) + + "' with donor replica set '" + + migration_opts.get_donor_name() + "' failed: " + + str(res)) + + if is_committed: + return True + + abort_reason = res["abortReason"] + if self._is_fail_point_abort_reason(abort_reason): + self.logger.info( + "Shard merge '%s' with donor replica set '%s' aborted due to failpoint: " + + "%s.", migration_opts.migration_id, migration_opts.get_donor_name(), str(res)) + return False + raise errors.ServerFailure( + "Shard merge '" + str(migration_opts.migration_id) + "' with donor replica set '" + + migration_opts.get_donor_name() + "' aborted due to an error: " + str(res)) + except pymongo.errors.PyMongoError: + self.logger.exception( + "Error running shard merge '%s' with donor primary on replica set '%s'.", + migration_opts.migration_id, migration_opts.get_donor_name()) + raise + + def _override_abort_failpoint_shard_merge(self, donor_primary): # noqa: D205,D400 + """Override the abortTenantMigrationBeforeLeavingBlockingState failpoint so the shard merge + does not abort since it is currently not supported. Only use this method for shard merge. + """ + while True: + try: + donor_primary_client = self._create_client(donor_primary) + donor_primary_client.admin.command( + bson.SON([("configureFailPoint", + "pauseTenantMigrationBeforeLeavingBlockingState"), ("mode", "off")])) + donor_primary_client.admin.command( + bson.SON([("configureFailPoint", + "abortTenantMigrationBeforeLeavingBlockingState"), ("mode", "off")])) + return + except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError): + self.logger.info( + "Retrying connection to donor primary in order to disable abort failpoint for shard merge." + ) + continue + time.sleep(self.POLL_INTERVAL_SECS) + + def _start_and_wait_for_migration(self, migration_opts): # noqa: D205,D400 + """Run donorStartMigration to start a shard merge based on 'migration_opts', wait for + the migration decision and return the last response for donorStartMigration. + """ + cmd_obj = { + "donorStartMigration": + 1, "migrationId": + bson.Binary(migration_opts.migration_id.bytes, 4), "recipientConnectionString": + migration_opts.recipient_rs.get_driver_connection_url(), "readPreference": + migration_opts.read_preference, + "donorCertificateForRecipient": + get_certificate_and_private_key("jstests/libs/tenant_migration_donor.pem"), + "recipientCertificateForDonor": + get_certificate_and_private_key("jstests/libs/tenant_migration_recipient.pem"), + "protocol": + "shard merge" + } + donor_primary = migration_opts.get_donor_primary() + # TODO(SERVER-68643) We no longer need to override the failpoint once milestone 3 is done + # for shard merge. + self._override_abort_failpoint_shard_merge(donor_primary) + # For shard merge protocol we need to wait for the ident to be removed before starting a + # new migration with a shard merge otherwise, due to the two phase drop, the stored + # files will be marked to be deleted but not deleted fast enough and we would end up + # moving a file that still exists. + while self._get_pending_drop_idents(migration_opts.recipient_rs_index) > 0: + time.sleep(self.WAIT_PENDING_IDENT_SECS) + # Some tests also do drops on collection which we need to wait on before doing a new migration + while self._get_pending_drop_idents(migration_opts.donor_rs_index) > 0: + time.sleep(self.WAIT_PENDING_IDENT_SECS) + + self.logger.info( + "Starting shard merge '%s' on donor primary on port %d of replica set '%s'.", + migration_opts.migration_id, donor_primary.port, migration_opts.get_donor_name()) + + while True: + try: + # Keep polling the migration state until the migration completes. + donor_primary_client = self._create_client(donor_primary) + res = donor_primary_client.admin.command( + cmd_obj, + bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE)) + except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError): + donor_primary = migration_opts.get_donor_primary() + self.logger.info( + "Retrying shard merge '%s' against donor primary on port %d of" + + " replica set '%s'.", migration_opts.migration_id, donor_primary.port, + migration_opts.get_donor_name()) + continue + if res["state"] == "committed": + self.logger.info( + "Shard merge '%s' with donor primary on port %d of replica set '%s'" + + " has committed.", migration_opts.migration_id, donor_primary.port, + migration_opts.get_donor_name()) + return res + if res["state"] == "aborted": + self.logger.info( + "Shard merge '%s' with donor primary on port %d of replica set '%s'" + + " has aborted: %s.", migration_opts.migration_id, donor_primary.port, + migration_opts.get_donor_name(), str(res)) + return res + if not res["ok"]: + self.logger.info( + "Shard merge '%s' with donor primary on port %d of replica set '%s'" + + " has failed: %s.", migration_opts.migration_id, donor_primary.port, + migration_opts.get_donor_name(), str(res)) + return res + time.sleep(self.POLL_INTERVAL_SECS) + + def _forget_migration(self, migration_opts): + """Run donorForgetMigration to garbage collection the shard merge denoted by migration_opts'.""" + self.logger.info("Forgetting shard merge: %s.", str(migration_opts)) + + cmd_obj = { + "donorForgetMigration": 1, "migrationId": bson.Binary(migration_opts.migration_id.bytes, + 4) + } + donor_primary = migration_opts.get_donor_primary() + + self.logger.info( + "Forgetting shard merge '%s' on donor primary on port %d of replica set '%s'.", + migration_opts.migration_id, donor_primary.port, migration_opts.get_donor_name()) + + while True: + try: + donor_primary_client = self._create_client(donor_primary) + donor_primary_client.admin.command( + cmd_obj, + bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE)) + return + except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError): + donor_primary = migration_opts.get_donor_primary() + self.logger.info( + "Retrying forgetting shard merge '%s' against donor primary on port %d of " + + "replica set '%s'.", migration_opts.migration_id, donor_primary.port, + migration_opts.get_donor_name()) + continue + except pymongo.errors.OperationFailure as err: + if err.code != self.NO_SUCH_MIGRATION_ERR_CODE: + raise + # The fixture was restarted. + self.logger.info( + "Could not find shard merge '%s' on donor primary on" + + " port %d of replica set '%s': %s.", migration_opts.migration_id, + donor_primary.port, migration_opts.get_donor_name(), str(err)) + return + except pymongo.errors.PyMongoError: + self.logger.exception( + "Error forgetting shard merge '%s' on donor primary on" + + " port %d of replica set '%s'.", migration_opts.migration_id, + donor_primary.port, migration_opts.get_donor_name()) + raise + + def _wait_for_migration_garbage_collection(self, migration_opts): # noqa: D205,D400 + """Wait until the persisted state for shard merge denoted by 'migration_opts' has been + garbage collected on both the donor and recipient. + """ + try: + donor_nodes = migration_opts.get_donor_nodes() + for donor_node in donor_nodes: + self.logger.info( + "Waiting for shard merge '%s' to be garbage collected on donor node on " + + "port %d of replica set '%s'.", migration_opts.migration_id, donor_node.port, + migration_opts.get_donor_name()) + + while True: + try: + donor_node_client = self._create_client(donor_node) + res = donor_node_client.config.command({ + "count": "tenantMigrationDonors", + "query": {"_id": bson.Binary(migration_opts.migration_id.bytes, 4)} + }) + if res["n"] == 0: + break + except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError): + # Ignore NotMasterErrors because it's possible to fail with + # InterruptedDueToReplStateChange if the donor primary steps down or shuts + # down during the garbage collection check. + self.logger.info( + "Retrying waiting for shard merge '%s' to be garbage collected on" + + " donor node on port %d of replica set '%s'.", + migration_opts.migration_id, donor_node.port, + migration_opts.get_donor_name()) + continue + time.sleep(self.POLL_INTERVAL_SECS) + + recipient_nodes = migration_opts.get_recipient_nodes() + for recipient_node in recipient_nodes: + self.logger.info( + "Waiting for shard merge '%s' to be garbage collected on recipient node on" + + " port %d of replica set '%s'.", migration_opts.migration_id, + recipient_node.port, migration_opts.get_recipient_name()) + + while True: + try: + recipient_node_client = self._create_client(recipient_node) + res = recipient_node_client.config.command({ + "count": "tenantMigrationRecipients", + "query": {"_id": bson.Binary(migration_opts.migration_id.bytes, 4)} + }) + if res["n"] == 0: + break + except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError): + # Ignore NotMasterErrors because it's possible to fail with + # InterruptedDueToReplStateChange if the recipient primary steps down or + # shuts down during the garbage collection check. + self.logger.info( + "Retrying waiting for shard merge '%s' to be garbage collected on" + + " recipient node on port %d of replica set '%s'.", + migration_opts.migration_id, recipient_node.port, + migration_opts.get_recipient_name()) + continue + time.sleep(self.POLL_INTERVAL_SECS) + except pymongo.errors.PyMongoError: + self.logger.exception( + "Error waiting for shard merge '%s' from donor replica set '%s" + + " to recipient replica set '%s' to be garbage collected.", + migration_opts.migration_id, migration_opts.get_donor_name(), + migration_opts.get_recipient_name()) + raise + + def _wait_for_reroute_or_test_completion(self, migration_opts): + start_time = time.time() + donor_primary = migration_opts.get_donor_primary() + + self.logger.info( + "Waiting for donor primary on port %d of replica set '%s' for shard merge '%s' " + + "to reroute at least one conflicting command. Stop waiting when the test finishes.", + donor_primary.port, migration_opts.get_donor_name(), migration_opts.migration_id) + + while not self.__lifecycle.is_test_finished(): + try: + donor_primary_client = self._create_client(donor_primary) + doc = donor_primary_client["local"]["rerouted"].find_one( + {"_id": bson.Binary(migration_opts.migration_id.bytes, 4)}) + if doc is not None: + return + except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError): + donor_primary = migration_opts.get_donor_primary() + self.logger.info( + "Retrying waiting for donor primary on port '%d' of replica set '%s' for " + + "shard merge '%s' to reroute at least one conflicting command.", + donor_primary.port, migration_opts.get_donor_name(), + migration_opts.migration_id) + continue + except pymongo.errors.PyMongoError: + end_time = time.time() + self.logger.exception( + "Error running find command on donor primary on port %d of replica set '%s' " + + "after waiting for reroute for %0d ms", donor_primary.port, + migration_opts.get_donor_name(), (end_time - start_time) * 1000) + raise + + time.sleep(self.POLL_INTERVAL_SECS) + + def _drop_tenant_databases(self, rs): + self.logger.info("Dropping tenant databases from replica set '%s'.", rs.replset_name) + + primary = get_primary(rs, self.logger) + self.logger.info("Running dropDatabase commands against primary on port %d.", primary.port) + + while True: + try: + primary_client = self._create_client(primary) + res = primary_client.admin.command({"listDatabases": 1}) + for database in res["databases"]: + db_name = database["name"] + if db_name.startswith(self._tenant_id + "_"): + primary_client.drop_database(db_name) + return + # We retry on all write concern errors because we assume the only reason waiting for + # write concern should fail is because of a failover. + except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError, + pymongo.errors.WriteConcernError) as err: + primary = get_primary(rs, self.logger) + self.logger.info( + "Retrying dropDatabase commands against primary on port %d after error %s.", + primary.port, str(err)) + continue + except pymongo.errors.PyMongoError: + self.logger.exception( + "Error dropping databases for tenant id '%s' on primary on" + + " port %d of replica set '%s' to be garbage collection.", self._tenant_id, + primary.port, rs.replset_name) + raise + + def _get_pending_drop_idents(self, replica_set_index): # noqa: D205,D400 + """Returns the number of pending idents to be dropped. This is necessary for the shard + merge protocol since we need to wait for the idents to be dropped before starting a new + shard merge. + """ + primary = self._shard_merge_fixture.get_replset(replica_set_index).get_primary() + pending_drop_idents = None + while True: + try: + client = self._create_client(primary) + server_status = client.admin.command({"serverStatus": 1}) + pending_drop_idents = server_status["storageEngine"]["dropPendingIdents"] + break + except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError, + pymongo.errors.WriteConcernError) as err: + self.logger.info( + "Retrying getting dropPendingIdents against primary on port %d after error %s.", + primary.port, str(err)) + continue + except pymongo.errors.PyMongoError: + self.logger.exception( + "Error creating client waiting for pending drop idents on " + + " primary on port %d.", primary.port) + raise + + return pending_drop_idents diff --git a/buildscripts/resmokelib/testing/hooks/tenant_migration.py b/buildscripts/resmokelib/testing/hooks/tenant_migration.py index 3e34c5be857..0ec5d69897b 100644 --- a/buildscripts/resmokelib/testing/hooks/tenant_migration.py +++ b/buildscripts/resmokelib/testing/hooks/tenant_migration.py @@ -251,7 +251,6 @@ class _TenantMigrationThread(threading.Thread): self._test = None self._test_report = test_report self._shell_options = shell_options - self._use_shard_merge_protocol = False self.__lifecycle = TenantMigrationLifeCycle() # Event set when the thread has been stopped using the 'stop()' method. @@ -442,39 +441,6 @@ class _TenantMigrationThread(threading.Thread): migration_opts.migration_id, migration_opts.get_donor_name()) raise - def _is_shard_merge_enabled(self, donor_primary): # noqa: D205,D400 - """Check if the shard merge feature flag is enabled. Returns true if both the shard merge - feature flag is set to true and that ignoreShardMergeFeatureFlag is set to false. - """ - shard_merge_feature_enabled = False - while True: - try: - primary_client = self._create_client(donor_primary) - shard_merge_flag_doc = primary_client.admin.command( - {"getParameter": 1, "featureFlagShardMerge": 1}) - fcv_doc = primary_client.admin.command( - {"getParameter": 1, "featureCompatibilityVersion": 1}) - flag_doc_is_shard_merge = shard_merge_flag_doc["featureFlagShardMerge"].get("value") - if not flag_doc_is_shard_merge: - return False - shard_merge_flag_version = shard_merge_flag_doc["featureFlagShardMerge"].get( - "version") - fcv_version = fcv_doc["featureCompatibilityVersion"].get("version") - shard_merge_feature_enabled = (float(fcv_version) >= - float(shard_merge_flag_version)) - break - except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError): - self.logger.info("Retrying connection to primary for shard merge state doc check.") - continue - time.sleep(self.POLL_INTERVAL_SECS) - - if not shard_merge_feature_enabled: - return False - - ignore_shard_merge_feature_flag = self._shell_options["global_vars"]["TestData"].get( - "ignoreShardMergeFeatureFlag") - return not ignore_shard_merge_feature_flag - def _start_and_wait_for_migration(self, migration_opts): # noqa: D205,D400 """Run donorStartMigration to start a tenant migration based on 'migration_opts', wait for the migration decision and return the last response for donorStartMigration. @@ -496,12 +462,6 @@ class _TenantMigrationThread(threading.Thread): get_certificate_and_private_key("jstests/libs/tenant_migration_recipient.pem"), } donor_primary = migration_opts.get_donor_primary() - is_shard_merge_enabled = self._is_shard_merge_enabled(donor_primary) - if is_shard_merge_enabled: - self._override_abort_failpoints(self._tenant_migration_fixture.common_mongod_options) - cmd_obj["protocol"] = "shard merge" - self._use_shard_merge_protocol = True - self.logger.info("Using shard merge protocol for tenant migration.") self.logger.info( "Starting tenant migration '%s' on donor primary on port %d of replica set '%s'.", diff --git a/etc/evergreen.yml b/etc/evergreen.yml index 24b3c5ddb20..6233140f3f7 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -1486,6 +1486,7 @@ buildvariants: - name: .updatefuzzer - name: secondary_reads_passthrough_gen - name: .shard_split + - name: .shard_merge # Disabling as the following tests are not aware of feature flags. # - name: server_discovery_and_monitoring_json_test_TG # distros: @@ -2279,6 +2280,7 @@ buildvariants: - name: .watchdog - name: .stitch - name: .shard_split + - name: .shard_merge # Disabling the following tests as they are not aware of feature flags. # - name: .benchmarks - building benchmarks is also disabled # - name: unittest_shell_hang_analyzer_gen @@ -2440,6 +2442,7 @@ buildvariants: - name: .updatefuzzer - name: watchdog_wiredtiger - name: .shard_split + - name: .shard_merge # Disabling the following tests as they are not aware of feature flags. # - name: .benchmarks - building benchmarks is also disabled # - name: server_discovery_and_monitoring_json_test_TG diff --git a/etc/evergreen_yml_components/definitions.yml b/etc/evergreen_yml_components/definitions.yml index a531a316ba5..78e35f57860 100644 --- a/etc/evergreen_yml_components/definitions.yml +++ b/etc/evergreen_yml_components/definitions.yml @@ -5402,6 +5402,15 @@ tasks: use_large_distro: "true" fallback_num_sub_suites: 10 +# TODO(SERVER-57896): Add "serverless" tag to shard merge passthrough when no longer feature flagged +- <<: *gen_task_template + name: shard_merge_jscore_passthrough_gen + tags: ["shard_merge"] + commands: + - func: "generate resmoke tasks" + vars: + use_large_distro: "true" + - <<: *gen_task_template name: tenant_migration_jscore_passthrough_gen tags: ["serverless"] diff --git a/jstests/core/batched_multi_deletes.js b/jstests/core/batched_multi_deletes.js index 308d518cedd..7b79916b631 100644 --- a/jstests/core/batched_multi_deletes.js +++ b/jstests/core/batched_multi_deletes.js @@ -26,7 +26,7 @@ function populateAndMassDelete(queryPredicate) { coll.drop(); assert.commandWorked(coll.insertMany([...Array(collCount).keys()].map(x => ({_id: x, a: x})))); - assert.eq(collCount, coll.count()); + assert.eq(collCount, coll.countDocuments({})); // Verify the delete will involve the BATCHED_DELETE stage. const expl = testDB.runCommand({ @@ -46,7 +46,7 @@ function populateAndMassDelete(queryPredicate) { } // Execute and verify the deletion. - assert.eq(collCount, coll.count()); + assert.eq(collCount, coll.countDocuments({})); assert.commandWorked(coll.deleteMany(queryPredicate)); assert.eq(null, coll.findOne()); } diff --git a/jstests/hooks/run_check_tenant_migration_dbhash.js b/jstests/hooks/run_check_tenant_migration_dbhash.js index b35058b563c..b34e975c015 100644 --- a/jstests/hooks/run_check_tenant_migration_dbhash.js +++ b/jstests/hooks/run_check_tenant_migration_dbhash.js @@ -8,12 +8,15 @@ load("jstests/replsets/libs/tenant_migration_util.js"); const excludedDBs = ["testTenantMigration"]; const testDBName = "testTenantMigration"; const dbhashCollName = "dbhashCheck"; +const localDBName = "local"; const tenantId = TestData.tenantId; const migrationId = UUID(TestData.migrationIdString); let donorRst; let recipientRst; let donorDB; +// For shard merge we need to use the local DB that is not blocked by tenant access blockers. +let primaryLocalDB; while (true) { try { donorRst = new ReplSetTest(TestData.donorConnectionString); @@ -24,6 +27,9 @@ while (true) { // failovers, but we run in a session to keep the code simple. donorDB = new Mongo(donorRst.getURL()).startSession({retryWrites: true}).getDatabase(testDBName); + primaryLocalDB = + new Mongo(donorRst.getURL()).startSession({retryWrites: true}).getDatabase(localDBName); + break; } catch (e) { if (!TenantMigrationUtil.checkIfRetryableErrorForTenantDbHashCheck(e)) { @@ -42,6 +48,12 @@ if (TestData.tenantIds) { } // Mark that we have completed the dbhash check. -assert.commandWorked(donorDB.runCommand( - {insert: dbhashCollName, documents: [{_id: migrationId}], writeConcern: {w: "majority"}})); +// useLocalDBForDbCheck is used for Shard Merge since we use the local DB for validation. +if (TestData.useLocalDBForDBCheck) { + assert.commandWorked(primaryLocalDB.runCommand( + {insert: dbhashCollName, documents: [{_id: migrationId}], writeConcern: {w: 1}})); +} else { + assert.commandWorked(donorDB.runCommand( + {insert: dbhashCollName, documents: [{_id: migrationId}], writeConcern: {w: "majority"}})); +} })(); diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js index 2d46f138291..40ad4484c49 100644 --- a/jstests/libs/override_methods/inject_tenant_prefix.js +++ b/jstests/libs/override_methods/inject_tenant_prefix.js @@ -18,6 +18,10 @@ const originalCloseMethod = Mongo.prototype.close; // multiple internal routing connections for the lifetime of the test execution. const initialConn = db.getMongo(); +const testTenantMigrationDB = "testTenantMigration"; +// For shard merge we need to use the local DB that is not blocked by tenant access blockers. +const localDB = "local"; + /** * Asserts that the provided connection is an internal routing connection, not the top-level proxy * connection. The proxy connection also has an internal routing connection, so it is excluded from @@ -168,6 +172,20 @@ function removeTenantIdFromString(string) { return string.replace(new RegExp(`${TestData.tenantId}_`, 'g'), ""); } +/** + * @returns Whether we are currently running a shard merge passthrough. + */ +function isShardMergePassthrough(conn) { + const flagDoc = assert.commandWorked( + originalRunCommand.apply(conn, ["admin", {getParameter: 1, featureFlagShardMerge: 1}, 0])); + const fcvDoc = assert.commandWorked(assert.commandWorked(originalRunCommand.apply( + conn, ["admin", {getParameter: 1, featureCompatibilityVersion: 1}, 0]))); + return flagDoc.hasOwnProperty("featureFlagShardMerge") && flagDoc.featureFlagShardMerge.value && + MongoRunner.compareBinVersions(fcvDoc.featureCompatibilityVersion.version, + flagDoc.featureFlagShardMerge.version) >= 0 && + TestData.useLocalDBForDBCheck; +} + /** * Prepends a tenant prefix to all database name and namespace fields in the provided object, where * applicable. @@ -436,8 +454,15 @@ function convertServerConnectionStringToURI(input) { */ function getOperationStateDocument(conn) { const collection = isShardSplitPassthrough() ? "shardSplitDonors" : "tenantMigrationDonors"; - const filter = - isShardSplitPassthrough() ? {tenantIds: TestData.tenantIds} : {tenantId: TestData.tenantId}; + let filter = {tenantId: TestData.tenantId}; + if (isShardSplitPassthrough()) { + filter = {tenantIds: TestData.tenantIds}; + } else if (isShardMergePassthrough(conn)) { + // TODO (SERVER-68643) No longer require to check for shard merge since shard merge will be + // the only protocol left. + filter = {}; + } + const findRes = assert.commandWorked( originalRunCommand.apply(conn, ["config", {find: collection, filter}, 0])); const docs = findRes.cursor.firstBatch; @@ -455,15 +480,16 @@ function getOperationStateDocument(conn) { /** * Marks the outgoing tenant migration or shard split operation as having caused the shell to - * reroute commands by inserting a document for it into the testTenantMigration.rerouted collection. + * reroute commands by inserting a document for it into the testTenantMigration.rerouted collection + * or local.rerouted collection for the shard merge protocol. */ function recordRerouteDueToTenantMigration(conn, migrationStateDoc) { assertRoutingConnection(conn); - + const dbToCheck = TestData.useLocalDBForDBCheck ? localDB : testTenantMigrationDB; while (true) { try { const res = originalRunCommand.apply(conn, [ - "testTenantMigration", + dbToCheck, { insert: "rerouted", documents: [{_id: migrationStateDoc._id}], @@ -639,9 +665,10 @@ function runCommandRetryOnTenantMigrationErrors( // After getting a TenantMigrationCommitted error, wait for the python test fixture // to do a dbhash check on the donor and recipient primaries before we retry the // command on the recipient. + const dbToCheck = TestData.useLocalDBForDBCheck ? localDB : testTenantMigrationDB; assert.soon(() => { let findRes = assert.commandWorked(originalRunCommand.apply(donorConnection, [ - "testTenantMigration", + dbToCheck, { find: "dbhashCheck", filter: {_id: migrationStateDoc._id}, -- cgit v1.2.1