summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormathisbessamdb <mathis.bessa@mongodb.com>2022-09-15 19:38:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-15 21:54:28 +0000
commit55f4d89a5b15ed14d930c7b06cb39e299d886406 (patch)
tree57771e1ea25917e49b9d15bf18e60a0c8b822a35
parentba5ad7837a0c11ddd620b5aad9249cd26f48bf89 (diff)
downloadmongo-55f4d89a5b15ed14d930c7b06cb39e299d886406.tar.gz
SERVER-66147 add support of vanilla passthrough test for shard merge protocol
-rw-r--r--buildscripts/resmokeconfig/suites/shard_merge_jscore_passthrough.yml131
-rw-r--r--buildscripts/resmokeconfig/suites/tenant_migration_causally_consistent_jscore_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/tenant_migration_kill_primary_jscore_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/tenant_migration_multi_stmt_txn_jscore_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/tenant_migration_stepdown_jscore_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/tenant_migration_terminate_primary_jscore_passthrough.yml2
-rw-r--r--buildscripts/resmokelib/testing/fixtures/shard_merge.py193
-rw-r--r--buildscripts/resmokelib/testing/hooks/shard_merge.py740
-rw-r--r--buildscripts/resmokelib/testing/hooks/tenant_migration.py40
-rw-r--r--etc/evergreen.yml3
-rw-r--r--etc/evergreen_yml_components/definitions.yml9
-rw-r--r--jstests/core/batched_multi_deletes.js4
-rw-r--r--jstests/hooks/run_check_tenant_migration_dbhash.js16
-rw-r--r--jstests/libs/override_methods/inject_tenant_prefix.js39
15 files changed, 1125 insertions, 62 deletions
diff --git a/buildscripts/resmokeconfig/suites/shard_merge_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/shard_merge_jscore_passthrough.yml
new file mode 100644
index 00000000000..1aeaae20f8c
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/shard_merge_jscore_passthrough.yml
@@ -0,0 +1,131 @@
+config_variables:
+- &keyFile jstests/libs/authTestsKey
+- &keyFileData Thiskeyisonlyforrunningthesuitewithauthenticationdontuseitinanytestsdirectly
+- &authOptions
+ authenticationDatabase: admin
+ authenticationMechanism: SCRAM-SHA-256
+ password: *keyFileData
+ username: __system
+
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/core/**/*.js
+ exclude_files:
+ - jstests/core/txns/**/*.js
+ # Skip any tests that run with auth explicitly.
+ - jstests/core/*[aA]uth*.js
+
+ exclude_with_any_tags:
+ - assumes_standalone_mongod
+ # These tests run getMore commands which are not supported in the tenant migration passthrough.
+ - requires_getmore
+ # Due to background tenant migrations, operations in the main test shell are not guaranteed to
+ # be causally consistent with operations in a parallel shell. The reason is that
+ # TenantMigrationCommitted error is only thrown when the client does a write or a atClusterTime/
+ # afterClusterTime or linearlizable read. Therefore, one of shell may not be aware that the
+ # migration has occurred and would not forward the read/write command to the right replica set.
+ - uses_parallel_shell
+ # Profile settings are stored in-memory only so are not transferred to the recipient.
+ - requires_profiling
+ # capped collections are banned in Serverless
+ - requires_capped
+ # emptycapped command is blocked during tenant migration.
+ - requires_emptycapped
+ # Multi-updates that conflict with tenant migration are not retried by inject_tenant_prefix.js.
+ - requires_multi_updates
+ # Full validation can cause ongoing queries to fail. This can affect the tenant migration
+ # process.
+ - uses_full_validation
+ - tenant_migration_incompatible
+ - requires_timeseries # Tenant migration not supported
+ - requires_fastcount
+
+executor:
+ archive:
+ tests: true
+ hooks:
+ - CheckReplDBHash
+ - CheckReplOplogs
+ - ValidateCollections
+ config:
+ shell_options:
+ eval: >-
+ testingReplication = true;
+ load('jstests/libs/override_methods/inject_tenant_prefix.js');
+ jsTest.authenticate(db.getMongo());
+ global_vars:
+ TestData: &TestData
+ # TODO SERVER-69034 : replace tenantId with tenantIds.
+ tenantId: "tenantMigrationTenantId"
+ auth: true
+ authMechanism: SCRAM-SHA-256
+ keyFile: *keyFile
+ keyFileData: *keyFileData
+ roleGraphInvalidationIsFatal: true
+ # TODO SERVER-68643 No longer needed when we only use shard merge protocol.
+ useLocalDBForDBCheck: true
+ # Shard Merge does not support fastcount. When we call CheckTenantMigrationDBHash after
+ # each migration we would potentially fail when enforcing using the fast count during that
+ # process.
+ skipEnforceFastCountOnValidate: true
+ hooks:
+ - class: ContinuousShardMerge
+ shell_options:
+ global_vars:
+ TestData:
+ <<: *TestData
+ authOptions: *authOptions
+ # The CheckReplDBHash hook waits until all operations have replicated to and have been applied
+ # on the secondaries, so we run the ValidateCollections hook after it to ensure we're
+ # validating the entire contents of the collection.
+ - class: CheckReplOplogs
+ shell_options:
+ global_vars:
+ TestData: *TestData
+ - class: CheckReplDBHash
+ shell_options:
+ global_vars:
+ TestData: *TestData
+ - class: ValidateCollections
+ shell_options:
+ global_vars:
+ TestData: *TestData
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ShardMergeFixture
+ common_mongod_options:
+ set_parameters:
+ enableTestCommands: 1
+ failpoint.abortTenantMigrationBeforeLeavingBlockingState:
+ mode:
+ activationProbability: 0.5
+ failpoint.pauseTenantMigrationBeforeLeavingBlockingState:
+ mode: alwaysOn
+ data:
+ blockTimeMS: 250
+ # To avoid routing commands in each test incorrectly, the ContinuousShardMerge hook
+ # only runs donorForgetMigration against the donor of each migration when it is safe to do
+ # so. Therefore, the garbage collection delay doesn't need to be large.
+ tenantMigrationGarbageCollectionDelayMS: 1
+ ttlMonitorSleepSecs: 1
+ # Tenant migrations is not currently compatible with implicitly replicated retryable
+ # findAndModify images.
+ storeFindAndModifyImagesInSideCollection: false
+ minSnapshotHistoryWindowInSeconds: 30
+ tlsMode: allowTLS
+ tlsCAFile: jstests/libs/ca.pem
+ tlsAllowInvalidHostnames: ''
+ auth: ''
+ keyFile: *keyFile
+ syncdelay: 1
+ per_mongod_options:
+ # Each entry is for a different replica set's extra mongod options.
+ - tlsCertificateKeyFile: jstests/libs/rs0.pem
+ - tlsCertificateKeyFile: jstests/libs/rs1.pem
+ num_replica_sets: 2
+ num_nodes_per_replica_set: 3
+ use_replica_set_connection_string: true
+ auth_options: *authOptions
diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_causally_consistent_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_causally_consistent_jscore_passthrough.yml
index d2361171250..92f487b9a5b 100644
--- a/buildscripts/resmokeconfig/suites/tenant_migration_causally_consistent_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/tenant_migration_causally_consistent_jscore_passthrough.yml
@@ -91,8 +91,6 @@ executor:
keyFile: *keyFile
keyFileData: *keyFileData
roleGraphInvalidationIsFatal: true
- # TODO SERVER-67860: remove the ignoreShardMergeFeatureFlag
- ignoreShardMergeFeatureFlag: true # overrides the featureFlagShardMerge
hooks:
- class: ContinuousTenantMigration
shell_options:
diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml
index 7eed50054a0..b026e57b6ac 100644
--- a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml
@@ -62,8 +62,6 @@ executor:
keyFile: *keyFile
keyFileData: *keyFileData
roleGraphInvalidationIsFatal: true
- # TODO SERVER-66147: remove the ignoreShardMergeFeatureFlag
- ignoreShardMergeFeatureFlag: true # overrides the featureFlagShardMerge
hooks:
- class: ContinuousTenantMigration
shell_options:
diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_kill_primary_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_kill_primary_jscore_passthrough.yml
index 85576413baf..45dba58bf3c 100644
--- a/buildscripts/resmokeconfig/suites/tenant_migration_kill_primary_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/tenant_migration_kill_primary_jscore_passthrough.yml
@@ -184,8 +184,6 @@ executor:
readPreference:
mode: "primary"
retryWrites: true
- # TODO SERVER-68643: remove the ignoreShardMergeFeatureFlag
- ignoreShardMergeFeatureFlag: true # overrides the featureFlagShardMerge
# We specify nodb so the shell used by each test will attempt to connect after loading the
# retry logic in auto_retry_on_network_error.js.
nodb: ""
diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_multi_stmt_txn_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_multi_stmt_txn_jscore_passthrough.yml
index cbf36bb2b23..82a680d0e0d 100644
--- a/buildscripts/resmokeconfig/suites/tenant_migration_multi_stmt_txn_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/tenant_migration_multi_stmt_txn_jscore_passthrough.yml
@@ -282,8 +282,6 @@ executor:
# Tests in this suite only read from primaries and only one node is electable, so causal
# consistency is not required to read your own writes.
causalConsistency: false
- # TODO SERVER-67860: remove the ignoreShardMergeFeatureFlag
- ignoreShardMergeFeatureFlag: true # overrides the featureFlagShardMerge
hooks:
- class: ContinuousTenantMigration
shell_options:
diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_stepdown_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_stepdown_jscore_passthrough.yml
index 1911b9782b8..f3946ff0211 100644
--- a/buildscripts/resmokeconfig/suites/tenant_migration_stepdown_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/tenant_migration_stepdown_jscore_passthrough.yml
@@ -174,8 +174,6 @@ executor:
readPreference:
mode: "primary"
retryWrites: true
- # TODO SERVER-68643: remove the ignoreShardMergeFeatureFlag
- ignoreShardMergeFeatureFlag: true # overrides the featureFlagShardMerge
# We specify nodb so the shell used by each test will attempt to connect after loading the
# retry logic in auto_retry_on_network_error.js.
nodb: ""
diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_terminate_primary_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_terminate_primary_jscore_passthrough.yml
index 12930d5debf..2aa00ffca65 100644
--- a/buildscripts/resmokeconfig/suites/tenant_migration_terminate_primary_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/tenant_migration_terminate_primary_jscore_passthrough.yml
@@ -174,8 +174,6 @@ executor:
readPreference:
mode: "primary"
retryWrites: true
- # TODO SERVER-68643: remove the ignoreShardMergeFeatureFlag
- ignoreShardMergeFeatureFlag: true # overrides the featureFlagShardMerge
# We specify nodb so the shell used by each test will attempt to connect after loading the
# retry logic in auto_retry_on_network_error.js.
nodb: ""
diff --git a/buildscripts/resmokelib/testing/fixtures/shard_merge.py b/buildscripts/resmokelib/testing/fixtures/shard_merge.py
new file mode 100644
index 00000000000..28fea75cb0d
--- /dev/null
+++ b/buildscripts/resmokelib/testing/fixtures/shard_merge.py
@@ -0,0 +1,193 @@
+"""Fixture with multiple replica sets for executing JSTests against."""
+
+import os.path
+
+import buildscripts.resmokelib.testing.fixtures.interface as interface
+from buildscripts.resmokelib.testing.fixtures.fixturelib import FixtureLib
+
+
+class ShardMergeFixture(interface.MultiClusterFixture): # pylint: disable=too-many-instance-attributes
+ """Fixture which provides JSTests with a set of replica sets to run tenant migration against."""
+
+ def __init__( # pylint: disable=too-many-arguments,too-many-locals
+ self, logger, job_num, fixturelib, common_mongod_options=None, per_mongod_options=None,
+ dbpath_prefix=None, preserve_dbpath=False, num_replica_sets=1,
+ num_nodes_per_replica_set=2, start_initial_sync_node=False,
+ write_concern_majority_journal_default=None, auth_options=None,
+ replset_config_options=None, voting_secondaries=True, all_nodes_electable=False,
+ use_replica_set_connection_string=None, linear_chain=False, mixed_bin_versions=None,
+ default_read_concern=None, default_write_concern=None):
+ """Initialize ShardMergeFixture with different options for the replica set processes."""
+
+ interface.MultiClusterFixture.__init__(self, logger, job_num, fixturelib,
+ dbpath_prefix=dbpath_prefix)
+
+ self.common_mongod_options = self.fixturelib.default_if_none(common_mongod_options, {})
+ self.per_mongod_options = self.fixturelib.default_if_none(per_mongod_options, {})
+ self.preserve_dbpath = preserve_dbpath
+ self.start_initial_sync_node = start_initial_sync_node
+ self.write_concern_majority_journal_default = write_concern_majority_journal_default
+ self.auth_options = auth_options
+ self.replset_config_options = self.fixturelib.default_if_none(replset_config_options, {})
+ self.voting_secondaries = voting_secondaries
+ self.all_nodes_electable = all_nodes_electable
+ self.use_replica_set_connection_string = use_replica_set_connection_string
+ self.default_read_concern = default_read_concern
+ self.default_write_concern = default_write_concern
+ self.mixed_bin_versions = self.fixturelib.default_if_none(mixed_bin_versions,
+ self.config.MIXED_BIN_VERSIONS)
+ self.mixed_bin_versions_config = self.mixed_bin_versions
+
+ # Use the values given from the command line if they exist for linear_chain and num_nodes.
+ linear_chain_option = self.fixturelib.default_if_none(self.config.LINEAR_CHAIN,
+ linear_chain)
+ self.linear_chain = linear_chain_option if linear_chain_option else linear_chain
+ self.num_nodes_per_replica_set = num_nodes_per_replica_set if num_nodes_per_replica_set \
+ else self.config.NUM_REPLSET_NODES
+ self.num_replica_sets = num_replica_sets if num_replica_sets else self.config.NUM_REPLSETS
+ if self.num_replica_sets < 2:
+ raise ValueError("num_replica_sets must be greater or equal to 2")
+
+ self.replica_sets = []
+
+ if not self.replica_sets:
+ for i in range(self.num_replica_sets):
+ rs_name = f"rs{i}"
+ mongod_options = self.common_mongod_options.copy()
+ mongod_options.update(self.per_mongod_options[i])
+ mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, rs_name)
+ mongod_options["serverless"] = True
+
+ self.replica_sets.append(
+ self.fixturelib.make_fixture(
+ "ReplicaSetFixture", self.logger, self.job_num,
+ mongod_options=mongod_options, preserve_dbpath=self.preserve_dbpath,
+ num_nodes=self.num_nodes_per_replica_set, auth_options=self.auth_options,
+ replset_config_options=self.replset_config_options,
+ mixed_bin_versions=self.mixed_bin_versions,
+ replicaset_logging_prefix=rs_name,
+ use_replica_set_connection_string=self.use_replica_set_connection_string,
+ all_nodes_electable=self.all_nodes_electable, replset_name=rs_name))
+
+ # The ReplicaSetFixture for the replica set that starts out owning the data (i.e. the
+ # replica set that driver should connect to when running commands).
+ self.replica_set_with_tenant = self.replica_sets[0]
+
+ def pids(self):
+ """:return: pids owned by this fixture if any."""
+ out = []
+ for replica_set in self.replica_sets:
+ out.extend(replica_set.pids())
+ if not out:
+ self.logger.debug('No replica sets when gathering multi replicaset fixture pids.')
+ return out
+
+ def setup(self):
+ """Set up the replica sets."""
+ for replica_set in self.replica_sets:
+ replica_set.setup()
+ self._create_shard_merge_donor_and_recipient_roles(replica_set)
+
+ def await_ready(self):
+ """Block until the fixture can be used for testing."""
+ # Wait for each of the replica sets
+ for replica_set in self.replica_sets:
+ replica_set.await_ready()
+
+ def _do_teardown(self, mode=None):
+ """Shut down the replica sets."""
+ self.logger.info("Stopping all replica sets...")
+
+ running_at_start = self.is_running()
+ if not running_at_start:
+ self.logger.warning("All replica sets were expected to be running, but weren't.")
+
+ teardown_handler = interface.FixtureTeardownHandler(self.logger)
+
+ for replica_set in self.replica_sets:
+ teardown_handler.teardown(replica_set, "replica_set", mode=mode)
+
+ if teardown_handler.was_successful():
+ self.logger.info("Successfully stopped all replica sets.")
+ else:
+ self.logger.error("Stopping the fixture failed.")
+ raise self.fixturelib.ServerFailure(teardown_handler.get_error_message())
+
+ def is_running(self):
+ """Return true if all replica sets are still operating."""
+ return all(replica_set.is_running() for replica_set in self.replica_sets)
+
+ def get_num_replsets(self):
+ """Return the number of replica sets."""
+ return self.num_replica_sets
+
+ def get_replset(self, index):
+ """Return the ReplicaSetFixture for the replica set at the given index."""
+ if not self.replica_sets:
+ raise ValueError("Must call setup() before calling get_replset")
+ return self.replica_sets[index]
+
+ def get_replsets(self):
+ """Return the ReplicaSetFixtures for all the replica sets."""
+ if not self.replica_sets:
+ raise ValueError("Must call setup() before calling get_replsets")
+ return self.replica_sets
+
+ def get_internal_connection_string(self):
+ """Return the internal connection string to the replica set that currently starts out owning the data."""
+ if not self.replica_sets:
+ raise ValueError("Must call setup() before calling get_internal_connection_string()")
+ return self.replica_set_with_tenant.get_internal_connection_string()
+
+ def get_driver_connection_url(self):
+ """Return the driver connection URL to the replica set that currently starts out owning the data."""
+ if not self.replica_set_with_tenant:
+ raise ValueError("Must call setup() before calling get_driver_connection_url")
+ return self.replica_set_with_tenant.get_driver_connection_url()
+
+ def get_node_info(self):
+ """Return a list of dicts of NodeInfo objects."""
+ output = []
+ for replica_set in self.replica_sets:
+ output += replica_set.get_node_info()
+ return output
+
+ def get_independent_clusters(self):
+ """Return the replica sets involved in the tenant migration."""
+ return self.replica_sets.copy()
+
+ def _create_shard_merge_donor_and_recipient_roles(self, rs):
+ """Create a role for shard merge donor and recipient."""
+ primary = rs.get_primary()
+ primary_client = interface.authenticate(primary.mongo_client(), self.auth_options)
+
+ try:
+ primary_client.admin.command({
+ "createRole": "tenantMigrationDonorRole", "privileges": [{
+ "resource": {"cluster": True}, "actions": ["runTenantMigration"]
+ }, {"resource": {"db": "admin", "collection": "system.keys"}, "actions": ["find"]}],
+ "roles": []
+ })
+ except:
+ self.logger.exception(
+ "Error creating tenant migration donor role on primary on port %d of replica" +
+ " set '%s'.", primary.port, rs.replset_name)
+ raise
+
+ try:
+ primary_client.admin.command({
+ "createRole": "tenantMigrationRecipientRole",
+ "privileges": [{
+ "resource": {"cluster": True},
+ "actions": ["listDatabases", "useUUID", "advanceClusterTime"]
+ }, {"resource": {"db": "", "collection": ""}, "actions": ["listCollections"]},
+ {
+ "resource": {"anyResource": True},
+ "actions": ["dbStats", "collStats", "find", "listIndexes"]
+ }], "roles": []
+ })
+ except:
+ self.logger.exception(
+ "Error creating tenant migration recipient role on primary on port %d of replica" +
+ " set '%s'.", primary.port, rs.replset_name)
+ raise
diff --git a/buildscripts/resmokelib/testing/hooks/shard_merge.py b/buildscripts/resmokelib/testing/hooks/shard_merge.py
new file mode 100644
index 00000000000..e1711ec66d4
--- /dev/null
+++ b/buildscripts/resmokelib/testing/hooks/shard_merge.py
@@ -0,0 +1,740 @@
+"""Test hook that runs shard merges continuously."""
+
+import copy
+import random
+import re
+import threading
+import time
+import uuid
+
+import bson
+import pymongo.errors
+
+from buildscripts.resmokelib import errors
+from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
+from buildscripts.resmokelib.testing.fixtures import shard_merge
+from buildscripts.resmokelib.testing.hooks import dbhash_tenant_migration
+from buildscripts.resmokelib.testing.hooks import interface
+
+
+class ContinuousShardMerge(interface.Hook): # pylint: disable=too-many-instance-attributes
+ """Starts a shard merge thread at the beginning of each test."""
+
+ DESCRIPTION = ("Continuous shard merges")
+
+ IS_BACKGROUND = True
+
+ def __init__(self, hook_logger, fixture, shell_options):
+ """Initialize the ContinuousShardMerge.
+
+ Args:
+ hook_logger: the logger instance for this hook.
+ fixture: the target ShardMergeFixture containing two replica sets.
+ shell_options: contains the global_vars which contains TestData.tenantId to be used for
+ shard merges.
+
+ """
+ interface.Hook.__init__(self, hook_logger, fixture, ContinuousShardMerge.DESCRIPTION)
+
+ if not isinstance(fixture, shard_merge.ShardMergeFixture):
+ raise ValueError("The ContinuousShardMerge hook requires a ShardMergeFixture")
+ self._shard_merge_fixture = fixture
+ self._shell_options = copy.deepcopy(shell_options)
+
+ self._shard_merge_thread = None
+
+ def before_suite(self, test_report):
+ """Before suite."""
+ if not self._shard_merge_fixture:
+ raise ValueError("No ShardMergeFixture to run migrations on")
+ self.logger.info("Starting the shard merge thread.")
+ self._shard_merge_thread = _ShardMergeThread(self.logger, self._shard_merge_fixture,
+ self._shell_options, test_report)
+ self._shard_merge_thread.start()
+
+ def after_suite(self, test_report, teardown_flag=None):
+ """After suite."""
+ self.logger.info("Stopping the shard merge thread.")
+ self._shard_merge_thread.stop()
+ self.logger.info("Stopped the shard merge thread.")
+
+ def before_test(self, test, test_report):
+ """Before test."""
+ self.logger.info("Resuming the shard merge thread.")
+ self._shard_merge_thread.resume(test)
+
+ def after_test(self, test, test_report):
+ """After test."""
+ self.logger.info("Pausing the shard merge thread.")
+ self._shard_merge_thread.pause()
+ self.logger.info("Paused the shard merge thread.")
+
+
+class ShardMergeLifeCycle(object):
+ """Class for managing the various states of the shard merge thread.
+
+ The job thread alternates between calling mark_test_started() and mark_test_finished(). The
+ shard merge thread is allowed to perform migrations at any point between these two calls.
+ Note that the job thread synchronizes with the shard merge thread outside the context of
+ this object to know it isn't in the process of running a migration.
+ """
+
+ _TEST_STARTED_STATE = "start"
+ _TEST_FINISHED_STATE = "finished"
+
+ def __init__(self):
+ """Initialize the ShardMergeLifeCycle instance."""
+ self.__lock = threading.Lock()
+ self.__cond = threading.Condition(self.__lock)
+
+ self.test_num = 0
+ self.__test_state = self._TEST_FINISHED_STATE
+ self.__should_stop = False
+
+ def mark_test_started(self):
+ """Signal to the shard merge thread that a new test has started.
+
+ This function should be called during before_test(). Calling it causes the
+ wait_for_shard_merge_permitted() function to no longer block and to instead return
+ true.
+ """
+ with self.__lock:
+ self.test_num += 1
+ self.__test_state = self._TEST_STARTED_STATE
+ self.__cond.notify_all()
+
+ def mark_test_finished(self):
+ """Signal to the shard merge thread that the current test has finished.
+
+ This function should be called during after_test(). Calling it causes the
+ wait_for_shard_merge_permitted() function to block until mark_test_started() is called
+ again.
+ """
+ with self.__lock:
+ self.__test_state = self._TEST_FINISHED_STATE
+ self.__cond.notify_all()
+
+ def is_test_finished(self):
+ """Return true if the current test has finished."""
+ with self.__lock:
+ return self.__test_state == self._TEST_FINISHED_STATE
+
+ def stop(self):
+ """Signal to the shard merge thread that it should exit.
+
+ This function should be called during after_suite(). Calling it causes the
+ wait_for_shard_merge_permitted() function to no longer block and to instead return
+ false.
+ """
+ with self.__lock:
+ self.__should_stop = True
+ self.__cond.notify_all()
+
+ def wait_for_shard_merge_permitted(self):
+ """Block until migrations are permitted, or until stop() is called.
+
+ Return true if migrations are permitted, and false if migrations are not permitted.
+ """
+ with self.__lock:
+ while not self.__should_stop:
+ if self.__test_state == self._TEST_STARTED_STATE:
+ return True
+
+ self.__cond.wait()
+
+ return False
+
+ def wait_for_tenant_migration_interval(self, timeout):
+ """Block for 'timeout' seconds, or until stop() is called."""
+ with self.__lock:
+ self.__cond.wait(timeout)
+
+ def poll_for_idle_request(self): # noqa: D205,D400
+ """Return true if the shard merge thread should continue running migrations, or false
+ if it should temporarily stop running migrations.
+ """
+ with self.__lock:
+ return self.__test_state == self._TEST_FINISHED_STATE
+
+
+def get_certificate_and_private_key(pem_file_path): # noqa: D205,D400
+ """Return a dictionary containing the certificate and private key extracted from the given pem
+ file.
+ """
+ lines = open(pem_file_path, 'rt').read()
+ certificate = re.findall(
+ re.compile("(-*BEGIN CERTIFICATE-*\n(.*\n)*-*END CERTIFICATE-*\n)", re.MULTILINE),
+ lines)[0][0]
+ private_key = re.findall(
+ re.compile("(-*BEGIN PRIVATE KEY-*\n(.*\n)*-*END PRIVATE KEY-*\n)", re.MULTILINE),
+ lines)[0][0]
+ return {"certificate": certificate, "privateKey": private_key}
+
+
+def get_primary(rs, logger, max_tries=5): # noqa: D205,D400
+ """Return the primary from a replica set. Retries up to 'max_tries' times of it fails to get
+ the primary within the time limit.
+ """
+ num_tries = 0
+ while num_tries < max_tries:
+ num_tries += 1
+ try:
+ return rs.get_primary()
+ except errors.ServerFailure:
+ logger.info(
+ "Timed out while waiting for a primary for replica set '%s' on try %d." +
+ " Retrying." if num_tries < max_tries else "", rs.replset_name, num_tries)
+
+
+class _ShardMergeOptions: # pylint:disable=too-many-instance-attributes
+ def __init__( # pylint: disable=too-many-arguments
+ self, donor_rs, recipient_rs, tenant_id, read_preference, logger, donor_rs_index,
+ recipient_rs_index):
+ self.donor_rs = donor_rs
+ self.recipient_rs = recipient_rs
+ self.migration_id = uuid.uuid4()
+ self.tenant_id = tenant_id
+ self.read_preference = read_preference
+ self.logger = logger
+ self.donor_rs_index = donor_rs_index
+ self.recipient_rs_index = recipient_rs_index
+
+ def get_donor_name(self):
+ """Return the replica set name for the donor."""
+ return self.donor_rs.replset_name
+
+ def get_recipient_name(self):
+ """Return the replica set name for the recipient."""
+ return self.recipient_rs.replset_name
+
+ def get_donor_primary(self):
+ """Return a connection to the donor primary."""
+ return get_primary(self.donor_rs, self.logger)
+
+ def get_recipient_primary(self):
+ """Return a connection to the recipient primary."""
+ return get_primary(self.recipient_rs, self.logger)
+
+ def get_donor_nodes(self):
+ """Return a list of connections to donor replica set nodes."""
+ return self.donor_rs.nodes
+
+ def get_recipient_nodes(self):
+ """Return a list of connections to recipient replica set nodes."""
+ return self.recipient_rs.nodes
+
+ def __str__(self):
+ opts = {
+ "donor": self.get_donor_name(), "recipient": self.get_recipient_name(),
+ "migration_id": self.migration_id, "tenant_id": self.tenant_id,
+ "read_preference": self.read_preference
+ }
+ return str(opts)
+
+
+class _ShardMergeThread(threading.Thread): # pylint: disable=too-many-instance-attributes
+ THREAD_NAME = "ShardMergeThread"
+
+ WAIT_SECS_RANGES = [[0.05, 0.1], [0.1, 0.5], [1, 5], [5, 15]]
+ POLL_INTERVAL_SECS = 0.1
+ WAIT_PENDING_IDENT_SECS = 0.3
+
+ NO_SUCH_MIGRATION_ERR_CODE = 327
+ INTERNAL_ERR_CODE = 1
+ INVALID_SYNC_SOURCE_ERR_CODE = 119
+ FAIL_TO_PARSE_ERR_CODE = 9
+ NO_SUCH_KEY_ERR_CODE = 4
+
+ def __init__(self, logger, shard_merge_fixture, shell_options, test_report):
+ """Initialize _ShardMergeThread."""
+ threading.Thread.__init__(self, name=self.THREAD_NAME)
+ self.daemon = True
+ self.logger = logger
+ self._shard_merge_fixture = shard_merge_fixture
+ # TODO SERVER-69034 : replace tenantId with tenantIds
+ self._tenant_id = shell_options["global_vars"]["TestData"]["tenantId"]
+ self._auth_options = shell_options["global_vars"]["TestData"]["authOptions"]
+ self._test = None
+ self._test_report = test_report
+ self._shell_options = shell_options
+
+ self.__lifecycle = ShardMergeLifeCycle()
+ # Event set when the thread has been stopped using the 'stop()' method.
+ self._is_stopped_evt = threading.Event()
+ # Event set when the thread is not performing migrations.
+ self._is_idle_evt = threading.Event()
+ self._is_idle_evt.set()
+
+ def run(self):
+ """Execute the thread."""
+ if not self._shard_merge_fixture:
+ self.logger.warning("No ShardMergeFixture to run migrations on.")
+ return
+
+ test_num = 0
+ migration_num = 0
+ donor_rs_index = 0
+
+ try:
+ while True:
+ self._is_idle_evt.set()
+
+ permitted = self.__lifecycle.wait_for_shard_merge_permitted()
+ if not permitted:
+ break
+
+ self._is_idle_evt.clear()
+
+ if self.__lifecycle.test_num > test_num:
+ # Reset donor_rs_index to 0 since the shell always routes all requests to rs0
+ # at the start of a test.
+ test_num = self.__lifecycle.test_num
+ donor_rs_index = 0
+
+ recipient_rs_index = (
+ donor_rs_index + 1) % self._shard_merge_fixture.get_num_replsets()
+ migration_opts = self._create_migration_opts(donor_rs_index, recipient_rs_index)
+
+ # Briefly wait to let the test run before starting the shard merge, so that
+ # the first migration is more likely to have data to migrate.
+ wait_secs = random.uniform(
+ *self.WAIT_SECS_RANGES[migration_num % len(self.WAIT_SECS_RANGES)])
+ self.logger.info("Waiting for %.3f seconds before starting migration.", wait_secs)
+ self.__lifecycle.wait_for_tenant_migration_interval(wait_secs)
+
+ self.logger.info("Starting shard merge: %s.", str(migration_opts))
+ start_time = time.time()
+ is_committed = self._run_migration(migration_opts)
+ end_time = time.time()
+ self.logger.info("Completed shard merge in %0d ms: %s.",
+ (end_time - start_time) * 1000, str(migration_opts))
+
+ found_idle_request = self.__lifecycle.poll_for_idle_request()
+ if found_idle_request:
+ continue
+
+ if is_committed:
+ donor_rs_index = recipient_rs_index
+ migration_num += 1
+ except Exception: # pylint: disable=W0703
+ # Proactively log the exception when it happens so it will be flushed immediately.
+ self.logger.exception("Shard merge thread threw exception")
+ # The event should be signaled whenever the thread is not performing migrations.
+ self._is_idle_evt.set()
+
+ def stop(self):
+ """Stop the thread when the suite finishes."""
+ self.__lifecycle.stop()
+ self._is_stopped_evt.set()
+ # Unpause to allow the thread to finish.
+ self.resume(self._test)
+ self.join()
+
+ def pause(self):
+ """Pause the thread after test."""
+ self.__lifecycle.mark_test_finished()
+
+ # Wait until we are no longer executing migrations.
+ self._is_idle_evt.wait()
+ # Check if the thread is alive in case it has thrown an exception while running.
+ self._check_thread()
+
+ # Check that the fixture is still running.
+ if not self._shard_merge_fixture.is_running():
+ raise errors.ServerFailure("ShardMergeFixture with pids {} expected to be running in"
+ " ContinuousShardMerge, but wasn't".format(
+ self._shard_merge_fixture.pids()))
+
+ def resume(self, test):
+ """Resume the thread before test."""
+ self._test = test
+ self.__lifecycle.mark_test_started()
+
+ def _wait(self, timeout):
+ """Wait until stop or timeout."""
+ self._is_stopped_evt.wait(timeout)
+
+ def short_name(self):
+ """Return the name of the thread."""
+ return self.THREAD_NAME
+
+ def _check_thread(self):
+ """Throw an error if the thread is not running."""
+ if not self.is_alive():
+ msg = "Shard merge thread is not running."
+ self.logger.error(msg)
+ raise errors.ServerFailure(msg)
+
+ def _is_fail_point_abort_reason(self, abort_reason):
+ return abort_reason["code"] == self.INTERNAL_ERR_CODE and abort_reason[
+ "errmsg"] == "simulate a shard merge error"
+
+ def _create_migration_opts(self, donor_rs_index, recipient_rs_index):
+ donor_rs = self._shard_merge_fixture.get_replset(donor_rs_index)
+ recipient_rs = self._shard_merge_fixture.get_replset(recipient_rs_index)
+ read_preference = {"mode": "primary"}
+ return _ShardMergeOptions(donor_rs, recipient_rs, self._tenant_id, read_preference,
+ self.logger, donor_rs_index, recipient_rs_index)
+
+ def _create_client(self, node):
+ return fixture_interface.authenticate(node.mongo_client(), self._auth_options)
+
+ def _check_tenant_migration_dbhash(self, migration_opts):
+ # Set the donor connection string, recipient connection string, and migration uuid string
+ # for the shard merge dbhash check script.
+ self._shell_options[
+ "global_vars"]["TestData"]["donorConnectionString"] = migration_opts.get_donor_primary(
+ ).get_internal_connection_string()
+ self._shell_options["global_vars"]["TestData"][
+ "recipientConnectionString"] = migration_opts.get_recipient_primary(
+ ).get_internal_connection_string()
+ self._shell_options["global_vars"]["TestData"][
+ "migrationIdString"] = migration_opts.migration_id.__str__()
+
+ dbhash_test_case = dbhash_tenant_migration.CheckTenantMigrationDBHash(
+ self.logger, self._shard_merge_fixture, self._shell_options)
+ dbhash_test_case.before_suite(self._test_report)
+ dbhash_test_case.before_test(self._test, self._test_report)
+ dbhash_test_case.after_test(self._test, self._test_report)
+ dbhash_test_case.after_suite(self._test_report)
+
+ def _run_migration(self, migration_opts): # noqa: D205,D400
+ """Run a shard merge based on 'migration_opts', wait for the migration decision and
+ garbage collection. Return true if the migration commits and false otherwise.
+ """
+ try:
+ # Clean up any orphaned tenant databases on the recipient allow next migration to start.
+ self._drop_tenant_databases(migration_opts.recipient_rs)
+ res = self._start_and_wait_for_migration(migration_opts)
+ is_committed = res["state"] == "committed"
+
+ # Garbage collect the migration prior to throwing error to avoid migration conflict
+ # in the next test.
+ if is_committed:
+ # Once we have committed a migration, run a dbhash check before rerouting commands.
+ self._check_tenant_migration_dbhash(migration_opts)
+
+ # If the migration committed, to avoid routing commands incorrectly, wait for the
+ # donor/proxy to reroute at least one command before doing garbage collection. Stop
+ # waiting when the test finishes.
+ self._wait_for_reroute_or_test_completion(migration_opts)
+ self._forget_migration(migration_opts)
+ self._wait_for_migration_garbage_collection(migration_opts)
+
+ if not res["ok"]:
+ raise errors.ServerFailure("Shard merge '" + str(migration_opts.migration_id) +
+ "' with donor replica set '" +
+ migration_opts.get_donor_name() + "' failed: " +
+ str(res))
+
+ if is_committed:
+ return True
+
+ abort_reason = res["abortReason"]
+ if self._is_fail_point_abort_reason(abort_reason):
+ self.logger.info(
+ "Shard merge '%s' with donor replica set '%s' aborted due to failpoint: " +
+ "%s.", migration_opts.migration_id, migration_opts.get_donor_name(), str(res))
+ return False
+ raise errors.ServerFailure(
+ "Shard merge '" + str(migration_opts.migration_id) + "' with donor replica set '" +
+ migration_opts.get_donor_name() + "' aborted due to an error: " + str(res))
+ except pymongo.errors.PyMongoError:
+ self.logger.exception(
+ "Error running shard merge '%s' with donor primary on replica set '%s'.",
+ migration_opts.migration_id, migration_opts.get_donor_name())
+ raise
+
+ def _override_abort_failpoint_shard_merge(self, donor_primary): # noqa: D205,D400
+ """Override the abortTenantMigrationBeforeLeavingBlockingState failpoint so the shard merge
+ does not abort since it is currently not supported. Only use this method for shard merge.
+ """
+ while True:
+ try:
+ donor_primary_client = self._create_client(donor_primary)
+ donor_primary_client.admin.command(
+ bson.SON([("configureFailPoint",
+ "pauseTenantMigrationBeforeLeavingBlockingState"), ("mode", "off")]))
+ donor_primary_client.admin.command(
+ bson.SON([("configureFailPoint",
+ "abortTenantMigrationBeforeLeavingBlockingState"), ("mode", "off")]))
+ return
+ except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError):
+ self.logger.info(
+ "Retrying connection to donor primary in order to disable abort failpoint for shard merge."
+ )
+ continue
+ time.sleep(self.POLL_INTERVAL_SECS)
+
+ def _start_and_wait_for_migration(self, migration_opts): # noqa: D205,D400
+ """Run donorStartMigration to start a shard merge based on 'migration_opts', wait for
+ the migration decision and return the last response for donorStartMigration.
+ """
+ cmd_obj = {
+ "donorStartMigration":
+ 1, "migrationId":
+ bson.Binary(migration_opts.migration_id.bytes, 4), "recipientConnectionString":
+ migration_opts.recipient_rs.get_driver_connection_url(), "readPreference":
+ migration_opts.read_preference,
+ "donorCertificateForRecipient":
+ get_certificate_and_private_key("jstests/libs/tenant_migration_donor.pem"),
+ "recipientCertificateForDonor":
+ get_certificate_and_private_key("jstests/libs/tenant_migration_recipient.pem"),
+ "protocol":
+ "shard merge"
+ }
+ donor_primary = migration_opts.get_donor_primary()
+ # TODO(SERVER-68643) We no longer need to override the failpoint once milestone 3 is done
+ # for shard merge.
+ self._override_abort_failpoint_shard_merge(donor_primary)
+ # For shard merge protocol we need to wait for the ident to be removed before starting a
+ # new migration with a shard merge otherwise, due to the two phase drop, the stored
+ # files will be marked to be deleted but not deleted fast enough and we would end up
+ # moving a file that still exists.
+ while self._get_pending_drop_idents(migration_opts.recipient_rs_index) > 0:
+ time.sleep(self.WAIT_PENDING_IDENT_SECS)
+ # Some tests also do drops on collection which we need to wait on before doing a new migration
+ while self._get_pending_drop_idents(migration_opts.donor_rs_index) > 0:
+ time.sleep(self.WAIT_PENDING_IDENT_SECS)
+
+ self.logger.info(
+ "Starting shard merge '%s' on donor primary on port %d of replica set '%s'.",
+ migration_opts.migration_id, donor_primary.port, migration_opts.get_donor_name())
+
+ while True:
+ try:
+ # Keep polling the migration state until the migration completes.
+ donor_primary_client = self._create_client(donor_primary)
+ res = donor_primary_client.admin.command(
+ cmd_obj,
+ bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE))
+ except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError):
+ donor_primary = migration_opts.get_donor_primary()
+ self.logger.info(
+ "Retrying shard merge '%s' against donor primary on port %d of" +
+ " replica set '%s'.", migration_opts.migration_id, donor_primary.port,
+ migration_opts.get_donor_name())
+ continue
+ if res["state"] == "committed":
+ self.logger.info(
+ "Shard merge '%s' with donor primary on port %d of replica set '%s'" +
+ " has committed.", migration_opts.migration_id, donor_primary.port,
+ migration_opts.get_donor_name())
+ return res
+ if res["state"] == "aborted":
+ self.logger.info(
+ "Shard merge '%s' with donor primary on port %d of replica set '%s'" +
+ " has aborted: %s.", migration_opts.migration_id, donor_primary.port,
+ migration_opts.get_donor_name(), str(res))
+ return res
+ if not res["ok"]:
+ self.logger.info(
+ "Shard merge '%s' with donor primary on port %d of replica set '%s'" +
+ " has failed: %s.", migration_opts.migration_id, donor_primary.port,
+ migration_opts.get_donor_name(), str(res))
+ return res
+ time.sleep(self.POLL_INTERVAL_SECS)
+
+ def _forget_migration(self, migration_opts):
+ """Run donorForgetMigration to garbage collection the shard merge denoted by migration_opts'."""
+ self.logger.info("Forgetting shard merge: %s.", str(migration_opts))
+
+ cmd_obj = {
+ "donorForgetMigration": 1, "migrationId": bson.Binary(migration_opts.migration_id.bytes,
+ 4)
+ }
+ donor_primary = migration_opts.get_donor_primary()
+
+ self.logger.info(
+ "Forgetting shard merge '%s' on donor primary on port %d of replica set '%s'.",
+ migration_opts.migration_id, donor_primary.port, migration_opts.get_donor_name())
+
+ while True:
+ try:
+ donor_primary_client = self._create_client(donor_primary)
+ donor_primary_client.admin.command(
+ cmd_obj,
+ bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE))
+ return
+ except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError):
+ donor_primary = migration_opts.get_donor_primary()
+ self.logger.info(
+ "Retrying forgetting shard merge '%s' against donor primary on port %d of " +
+ "replica set '%s'.", migration_opts.migration_id, donor_primary.port,
+ migration_opts.get_donor_name())
+ continue
+ except pymongo.errors.OperationFailure as err:
+ if err.code != self.NO_SUCH_MIGRATION_ERR_CODE:
+ raise
+ # The fixture was restarted.
+ self.logger.info(
+ "Could not find shard merge '%s' on donor primary on" +
+ " port %d of replica set '%s': %s.", migration_opts.migration_id,
+ donor_primary.port, migration_opts.get_donor_name(), str(err))
+ return
+ except pymongo.errors.PyMongoError:
+ self.logger.exception(
+ "Error forgetting shard merge '%s' on donor primary on" +
+ " port %d of replica set '%s'.", migration_opts.migration_id,
+ donor_primary.port, migration_opts.get_donor_name())
+ raise
+
+ def _wait_for_migration_garbage_collection(self, migration_opts): # noqa: D205,D400
+ """Wait until the persisted state for shard merge denoted by 'migration_opts' has been
+ garbage collected on both the donor and recipient.
+ """
+ try:
+ donor_nodes = migration_opts.get_donor_nodes()
+ for donor_node in donor_nodes:
+ self.logger.info(
+ "Waiting for shard merge '%s' to be garbage collected on donor node on " +
+ "port %d of replica set '%s'.", migration_opts.migration_id, donor_node.port,
+ migration_opts.get_donor_name())
+
+ while True:
+ try:
+ donor_node_client = self._create_client(donor_node)
+ res = donor_node_client.config.command({
+ "count": "tenantMigrationDonors",
+ "query": {"_id": bson.Binary(migration_opts.migration_id.bytes, 4)}
+ })
+ if res["n"] == 0:
+ break
+ except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError):
+ # Ignore NotMasterErrors because it's possible to fail with
+ # InterruptedDueToReplStateChange if the donor primary steps down or shuts
+ # down during the garbage collection check.
+ self.logger.info(
+ "Retrying waiting for shard merge '%s' to be garbage collected on" +
+ " donor node on port %d of replica set '%s'.",
+ migration_opts.migration_id, donor_node.port,
+ migration_opts.get_donor_name())
+ continue
+ time.sleep(self.POLL_INTERVAL_SECS)
+
+ recipient_nodes = migration_opts.get_recipient_nodes()
+ for recipient_node in recipient_nodes:
+ self.logger.info(
+ "Waiting for shard merge '%s' to be garbage collected on recipient node on" +
+ " port %d of replica set '%s'.", migration_opts.migration_id,
+ recipient_node.port, migration_opts.get_recipient_name())
+
+ while True:
+ try:
+ recipient_node_client = self._create_client(recipient_node)
+ res = recipient_node_client.config.command({
+ "count": "tenantMigrationRecipients",
+ "query": {"_id": bson.Binary(migration_opts.migration_id.bytes, 4)}
+ })
+ if res["n"] == 0:
+ break
+ except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError):
+ # Ignore NotMasterErrors because it's possible to fail with
+ # InterruptedDueToReplStateChange if the recipient primary steps down or
+ # shuts down during the garbage collection check.
+ self.logger.info(
+ "Retrying waiting for shard merge '%s' to be garbage collected on" +
+ " recipient node on port %d of replica set '%s'.",
+ migration_opts.migration_id, recipient_node.port,
+ migration_opts.get_recipient_name())
+ continue
+ time.sleep(self.POLL_INTERVAL_SECS)
+ except pymongo.errors.PyMongoError:
+ self.logger.exception(
+ "Error waiting for shard merge '%s' from donor replica set '%s" +
+ " to recipient replica set '%s' to be garbage collected.",
+ migration_opts.migration_id, migration_opts.get_donor_name(),
+ migration_opts.get_recipient_name())
+ raise
+
+ def _wait_for_reroute_or_test_completion(self, migration_opts):
+ start_time = time.time()
+ donor_primary = migration_opts.get_donor_primary()
+
+ self.logger.info(
+ "Waiting for donor primary on port %d of replica set '%s' for shard merge '%s' " +
+ "to reroute at least one conflicting command. Stop waiting when the test finishes.",
+ donor_primary.port, migration_opts.get_donor_name(), migration_opts.migration_id)
+
+ while not self.__lifecycle.is_test_finished():
+ try:
+ donor_primary_client = self._create_client(donor_primary)
+ doc = donor_primary_client["local"]["rerouted"].find_one(
+ {"_id": bson.Binary(migration_opts.migration_id.bytes, 4)})
+ if doc is not None:
+ return
+ except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError):
+ donor_primary = migration_opts.get_donor_primary()
+ self.logger.info(
+ "Retrying waiting for donor primary on port '%d' of replica set '%s' for " +
+ "shard merge '%s' to reroute at least one conflicting command.",
+ donor_primary.port, migration_opts.get_donor_name(),
+ migration_opts.migration_id)
+ continue
+ except pymongo.errors.PyMongoError:
+ end_time = time.time()
+ self.logger.exception(
+ "Error running find command on donor primary on port %d of replica set '%s' " +
+ "after waiting for reroute for %0d ms", donor_primary.port,
+ migration_opts.get_donor_name(), (end_time - start_time) * 1000)
+ raise
+
+ time.sleep(self.POLL_INTERVAL_SECS)
+
+ def _drop_tenant_databases(self, rs):
+ self.logger.info("Dropping tenant databases from replica set '%s'.", rs.replset_name)
+
+ primary = get_primary(rs, self.logger)
+ self.logger.info("Running dropDatabase commands against primary on port %d.", primary.port)
+
+ while True:
+ try:
+ primary_client = self._create_client(primary)
+ res = primary_client.admin.command({"listDatabases": 1})
+ for database in res["databases"]:
+ db_name = database["name"]
+ if db_name.startswith(self._tenant_id + "_"):
+ primary_client.drop_database(db_name)
+ return
+ # We retry on all write concern errors because we assume the only reason waiting for
+ # write concern should fail is because of a failover.
+ except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError,
+ pymongo.errors.WriteConcernError) as err:
+ primary = get_primary(rs, self.logger)
+ self.logger.info(
+ "Retrying dropDatabase commands against primary on port %d after error %s.",
+ primary.port, str(err))
+ continue
+ except pymongo.errors.PyMongoError:
+ self.logger.exception(
+ "Error dropping databases for tenant id '%s' on primary on" +
+ " port %d of replica set '%s' to be garbage collection.", self._tenant_id,
+ primary.port, rs.replset_name)
+ raise
+
+ def _get_pending_drop_idents(self, replica_set_index): # noqa: D205,D400
+ """Returns the number of pending idents to be dropped. This is necessary for the shard
+ merge protocol since we need to wait for the idents to be dropped before starting a new
+ shard merge.
+ """
+ primary = self._shard_merge_fixture.get_replset(replica_set_index).get_primary()
+ pending_drop_idents = None
+ while True:
+ try:
+ client = self._create_client(primary)
+ server_status = client.admin.command({"serverStatus": 1})
+ pending_drop_idents = server_status["storageEngine"]["dropPendingIdents"]
+ break
+ except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError,
+ pymongo.errors.WriteConcernError) as err:
+ self.logger.info(
+ "Retrying getting dropPendingIdents against primary on port %d after error %s.",
+ primary.port, str(err))
+ continue
+ except pymongo.errors.PyMongoError:
+ self.logger.exception(
+ "Error creating client waiting for pending drop idents on " +
+ " primary on port %d.", primary.port)
+ raise
+
+ return pending_drop_idents
diff --git a/buildscripts/resmokelib/testing/hooks/tenant_migration.py b/buildscripts/resmokelib/testing/hooks/tenant_migration.py
index 3e34c5be857..0ec5d69897b 100644
--- a/buildscripts/resmokelib/testing/hooks/tenant_migration.py
+++ b/buildscripts/resmokelib/testing/hooks/tenant_migration.py
@@ -251,7 +251,6 @@ class _TenantMigrationThread(threading.Thread):
self._test = None
self._test_report = test_report
self._shell_options = shell_options
- self._use_shard_merge_protocol = False
self.__lifecycle = TenantMigrationLifeCycle()
# Event set when the thread has been stopped using the 'stop()' method.
@@ -442,39 +441,6 @@ class _TenantMigrationThread(threading.Thread):
migration_opts.migration_id, migration_opts.get_donor_name())
raise
- def _is_shard_merge_enabled(self, donor_primary): # noqa: D205,D400
- """Check if the shard merge feature flag is enabled. Returns true if both the shard merge
- feature flag is set to true and that ignoreShardMergeFeatureFlag is set to false.
- """
- shard_merge_feature_enabled = False
- while True:
- try:
- primary_client = self._create_client(donor_primary)
- shard_merge_flag_doc = primary_client.admin.command(
- {"getParameter": 1, "featureFlagShardMerge": 1})
- fcv_doc = primary_client.admin.command(
- {"getParameter": 1, "featureCompatibilityVersion": 1})
- flag_doc_is_shard_merge = shard_merge_flag_doc["featureFlagShardMerge"].get("value")
- if not flag_doc_is_shard_merge:
- return False
- shard_merge_flag_version = shard_merge_flag_doc["featureFlagShardMerge"].get(
- "version")
- fcv_version = fcv_doc["featureCompatibilityVersion"].get("version")
- shard_merge_feature_enabled = (float(fcv_version) >=
- float(shard_merge_flag_version))
- break
- except (pymongo.errors.AutoReconnect, pymongo.errors.NotMasterError):
- self.logger.info("Retrying connection to primary for shard merge state doc check.")
- continue
- time.sleep(self.POLL_INTERVAL_SECS)
-
- if not shard_merge_feature_enabled:
- return False
-
- ignore_shard_merge_feature_flag = self._shell_options["global_vars"]["TestData"].get(
- "ignoreShardMergeFeatureFlag")
- return not ignore_shard_merge_feature_flag
-
def _start_and_wait_for_migration(self, migration_opts): # noqa: D205,D400
"""Run donorStartMigration to start a tenant migration based on 'migration_opts', wait for
the migration decision and return the last response for donorStartMigration.
@@ -496,12 +462,6 @@ class _TenantMigrationThread(threading.Thread):
get_certificate_and_private_key("jstests/libs/tenant_migration_recipient.pem"),
}
donor_primary = migration_opts.get_donor_primary()
- is_shard_merge_enabled = self._is_shard_merge_enabled(donor_primary)
- if is_shard_merge_enabled:
- self._override_abort_failpoints(self._tenant_migration_fixture.common_mongod_options)
- cmd_obj["protocol"] = "shard merge"
- self._use_shard_merge_protocol = True
- self.logger.info("Using shard merge protocol for tenant migration.")
self.logger.info(
"Starting tenant migration '%s' on donor primary on port %d of replica set '%s'.",
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index 24b3c5ddb20..6233140f3f7 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -1486,6 +1486,7 @@ buildvariants:
- name: .updatefuzzer
- name: secondary_reads_passthrough_gen
- name: .shard_split
+ - name: .shard_merge
# Disabling as the following tests are not aware of feature flags.
# - name: server_discovery_and_monitoring_json_test_TG
# distros:
@@ -2279,6 +2280,7 @@ buildvariants:
- name: .watchdog
- name: .stitch
- name: .shard_split
+ - name: .shard_merge
# Disabling the following tests as they are not aware of feature flags.
# - name: .benchmarks - building benchmarks is also disabled
# - name: unittest_shell_hang_analyzer_gen
@@ -2440,6 +2442,7 @@ buildvariants:
- name: .updatefuzzer
- name: watchdog_wiredtiger
- name: .shard_split
+ - name: .shard_merge
# Disabling the following tests as they are not aware of feature flags.
# - name: .benchmarks - building benchmarks is also disabled
# - name: server_discovery_and_monitoring_json_test_TG
diff --git a/etc/evergreen_yml_components/definitions.yml b/etc/evergreen_yml_components/definitions.yml
index a531a316ba5..78e35f57860 100644
--- a/etc/evergreen_yml_components/definitions.yml
+++ b/etc/evergreen_yml_components/definitions.yml
@@ -5402,6 +5402,15 @@ tasks:
use_large_distro: "true"
fallback_num_sub_suites: 10
+# TODO(SERVER-57896): Add "serverless" tag to shard merge passthrough when no longer feature flagged
+- <<: *gen_task_template
+ name: shard_merge_jscore_passthrough_gen
+ tags: ["shard_merge"]
+ commands:
+ - func: "generate resmoke tasks"
+ vars:
+ use_large_distro: "true"
+
- <<: *gen_task_template
name: tenant_migration_jscore_passthrough_gen
tags: ["serverless"]
diff --git a/jstests/core/batched_multi_deletes.js b/jstests/core/batched_multi_deletes.js
index 308d518cedd..7b79916b631 100644
--- a/jstests/core/batched_multi_deletes.js
+++ b/jstests/core/batched_multi_deletes.js
@@ -26,7 +26,7 @@ function populateAndMassDelete(queryPredicate) {
coll.drop();
assert.commandWorked(coll.insertMany([...Array(collCount).keys()].map(x => ({_id: x, a: x}))));
- assert.eq(collCount, coll.count());
+ assert.eq(collCount, coll.countDocuments({}));
// Verify the delete will involve the BATCHED_DELETE stage.
const expl = testDB.runCommand({
@@ -46,7 +46,7 @@ function populateAndMassDelete(queryPredicate) {
}
// Execute and verify the deletion.
- assert.eq(collCount, coll.count());
+ assert.eq(collCount, coll.countDocuments({}));
assert.commandWorked(coll.deleteMany(queryPredicate));
assert.eq(null, coll.findOne());
}
diff --git a/jstests/hooks/run_check_tenant_migration_dbhash.js b/jstests/hooks/run_check_tenant_migration_dbhash.js
index b35058b563c..b34e975c015 100644
--- a/jstests/hooks/run_check_tenant_migration_dbhash.js
+++ b/jstests/hooks/run_check_tenant_migration_dbhash.js
@@ -8,12 +8,15 @@ load("jstests/replsets/libs/tenant_migration_util.js");
const excludedDBs = ["testTenantMigration"];
const testDBName = "testTenantMigration";
const dbhashCollName = "dbhashCheck";
+const localDBName = "local";
const tenantId = TestData.tenantId;
const migrationId = UUID(TestData.migrationIdString);
let donorRst;
let recipientRst;
let donorDB;
+// For shard merge we need to use the local DB that is not blocked by tenant access blockers.
+let primaryLocalDB;
while (true) {
try {
donorRst = new ReplSetTest(TestData.donorConnectionString);
@@ -24,6 +27,9 @@ while (true) {
// failovers, but we run in a session to keep the code simple.
donorDB =
new Mongo(donorRst.getURL()).startSession({retryWrites: true}).getDatabase(testDBName);
+ primaryLocalDB =
+ new Mongo(donorRst.getURL()).startSession({retryWrites: true}).getDatabase(localDBName);
+
break;
} catch (e) {
if (!TenantMigrationUtil.checkIfRetryableErrorForTenantDbHashCheck(e)) {
@@ -42,6 +48,12 @@ if (TestData.tenantIds) {
}
// Mark that we have completed the dbhash check.
-assert.commandWorked(donorDB.runCommand(
- {insert: dbhashCollName, documents: [{_id: migrationId}], writeConcern: {w: "majority"}}));
+// useLocalDBForDbCheck is used for Shard Merge since we use the local DB for validation.
+if (TestData.useLocalDBForDBCheck) {
+ assert.commandWorked(primaryLocalDB.runCommand(
+ {insert: dbhashCollName, documents: [{_id: migrationId}], writeConcern: {w: 1}}));
+} else {
+ assert.commandWorked(donorDB.runCommand(
+ {insert: dbhashCollName, documents: [{_id: migrationId}], writeConcern: {w: "majority"}}));
+}
})();
diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js
index 2d46f138291..40ad4484c49 100644
--- a/jstests/libs/override_methods/inject_tenant_prefix.js
+++ b/jstests/libs/override_methods/inject_tenant_prefix.js
@@ -18,6 +18,10 @@ const originalCloseMethod = Mongo.prototype.close;
// multiple internal routing connections for the lifetime of the test execution.
const initialConn = db.getMongo();
+const testTenantMigrationDB = "testTenantMigration";
+// For shard merge we need to use the local DB that is not blocked by tenant access blockers.
+const localDB = "local";
+
/**
* Asserts that the provided connection is an internal routing connection, not the top-level proxy
* connection. The proxy connection also has an internal routing connection, so it is excluded from
@@ -169,6 +173,20 @@ function removeTenantIdFromString(string) {
}
/**
+ * @returns Whether we are currently running a shard merge passthrough.
+ */
+function isShardMergePassthrough(conn) {
+ const flagDoc = assert.commandWorked(
+ originalRunCommand.apply(conn, ["admin", {getParameter: 1, featureFlagShardMerge: 1}, 0]));
+ const fcvDoc = assert.commandWorked(assert.commandWorked(originalRunCommand.apply(
+ conn, ["admin", {getParameter: 1, featureCompatibilityVersion: 1}, 0])));
+ return flagDoc.hasOwnProperty("featureFlagShardMerge") && flagDoc.featureFlagShardMerge.value &&
+ MongoRunner.compareBinVersions(fcvDoc.featureCompatibilityVersion.version,
+ flagDoc.featureFlagShardMerge.version) >= 0 &&
+ TestData.useLocalDBForDBCheck;
+}
+
+/**
* Prepends a tenant prefix to all database name and namespace fields in the provided object, where
* applicable.
*/
@@ -436,8 +454,15 @@ function convertServerConnectionStringToURI(input) {
*/
function getOperationStateDocument(conn) {
const collection = isShardSplitPassthrough() ? "shardSplitDonors" : "tenantMigrationDonors";
- const filter =
- isShardSplitPassthrough() ? {tenantIds: TestData.tenantIds} : {tenantId: TestData.tenantId};
+ let filter = {tenantId: TestData.tenantId};
+ if (isShardSplitPassthrough()) {
+ filter = {tenantIds: TestData.tenantIds};
+ } else if (isShardMergePassthrough(conn)) {
+ // TODO (SERVER-68643) No longer require to check for shard merge since shard merge will be
+ // the only protocol left.
+ filter = {};
+ }
+
const findRes = assert.commandWorked(
originalRunCommand.apply(conn, ["config", {find: collection, filter}, 0]));
const docs = findRes.cursor.firstBatch;
@@ -455,15 +480,16 @@ function getOperationStateDocument(conn) {
/**
* Marks the outgoing tenant migration or shard split operation as having caused the shell to
- * reroute commands by inserting a document for it into the testTenantMigration.rerouted collection.
+ * reroute commands by inserting a document for it into the testTenantMigration.rerouted collection
+ * or local.rerouted collection for the shard merge protocol.
*/
function recordRerouteDueToTenantMigration(conn, migrationStateDoc) {
assertRoutingConnection(conn);
-
+ const dbToCheck = TestData.useLocalDBForDBCheck ? localDB : testTenantMigrationDB;
while (true) {
try {
const res = originalRunCommand.apply(conn, [
- "testTenantMigration",
+ dbToCheck,
{
insert: "rerouted",
documents: [{_id: migrationStateDoc._id}],
@@ -639,9 +665,10 @@ function runCommandRetryOnTenantMigrationErrors(
// After getting a TenantMigrationCommitted error, wait for the python test fixture
// to do a dbhash check on the donor and recipient primaries before we retry the
// command on the recipient.
+ const dbToCheck = TestData.useLocalDBForDBCheck ? localDB : testTenantMigrationDB;
assert.soon(() => {
let findRes = assert.commandWorked(originalRunCommand.apply(donorConnection, [
- "testTenantMigration",
+ dbToCheck,
{
find: "dbhashCheck",
filter: {_id: migrationStateDoc._id},