summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2020-10-06 18:29:00 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-12 22:26:05 +0000
commitd7f220af7070c6c3ef510d0ebe65172d90f9f73f (patch)
tree697316e86650e1b0bba86acb659290e16d022263
parent46835741b3abaeb10aea8b3c1ef67616f95115b0 (diff)
downloadmongo-d7f220af7070c6c3ef510d0ebe65172d90f9f73f.tar.gz
SERVER-50493 Make tenant_migration_jscore_passthrough simulate a complete migration between two real replica sets that aborts
-rw-r--r--buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml7
-rw-r--r--buildscripts/resmokelib/config.py3
-rw-r--r--buildscripts/resmokelib/testing/fixtures/replicaset.py6
-rw-r--r--buildscripts/resmokelib/testing/fixtures/tenant_migration.py146
-rw-r--r--buildscripts/resmokelib/testing/hooks/tenant_migration.py83
5 files changed, 199 insertions, 46 deletions
diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml
index 0dc9bb17995..27f3a8341d3 100644
--- a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml
@@ -90,12 +90,11 @@ executor:
# clear the in-memory and persisted state for each migration.
n: 1
fixture:
- class: ReplicaSetFixture
+ class: TenantMigrationFixture
mongod_options:
set_parameters:
enableTestCommands: 1
enableTenantMigrations: true
- failpoint.skipSendingRecipientSyncDataCommand:
- mode: alwaysOn
- num_nodes: 3
+ num_replica_sets: 2
+ num_nodes_per_replica_set: 3
use_replica_set_connection_string: true
diff --git a/buildscripts/resmokelib/config.py b/buildscripts/resmokelib/config.py
index b5d2dc591a8..05b4ea6ca83 100644
--- a/buildscripts/resmokelib/config.py
+++ b/buildscripts/resmokelib/config.py
@@ -422,6 +422,9 @@ MIXED_BIN_VERSIONS = None
# Specifies the number of replica set members in a ReplicaSetFixture.
NUM_REPLSET_NODES = None
+# Specifies the number of replica sets in a MultiReplicaSetFixture.
+NUM_REPLSETS = None
+
# Specifies the number of shards in a ShardedClusterFixture.
NUM_SHARDS = None
diff --git a/buildscripts/resmokelib/testing/fixtures/replicaset.py b/buildscripts/resmokelib/testing/fixtures/replicaset.py
index 85c4366fe4b..93dd66c3fca 100644
--- a/buildscripts/resmokelib/testing/fixtures/replicaset.py
+++ b/buildscripts/resmokelib/testing/fixtures/replicaset.py
@@ -33,7 +33,7 @@ class ReplicaSetFixture(interface.ReplFixture): # pylint: disable=too-many-inst
auth_options=None, replset_config_options=None, voting_secondaries=True,
all_nodes_electable=False, use_replica_set_connection_string=None, linear_chain=False,
mixed_bin_versions=None, default_read_concern=None, default_write_concern=None,
- shard_logging_prefix=None):
+ shard_logging_prefix=None, replicaset_logging_prefix=None):
"""Initialize ReplicaSetFixture."""
interface.ReplFixture.__init__(self, logger, job_num, dbpath_prefix=dbpath_prefix)
@@ -53,6 +53,7 @@ class ReplicaSetFixture(interface.ReplFixture): # pylint: disable=too-many-inst
config.MIXED_BIN_VERSIONS)
self.mixed_bin_versions_config = self.mixed_bin_versions
self.shard_logging_prefix = shard_logging_prefix
+ self.replicaset_logging_prefix = replicaset_logging_prefix
# Use the values given from the command line if they exist for linear_chain and num_nodes.
linear_chain_option = utils.default_if_none(config.LINEAR_CHAIN, linear_chain)
@@ -623,6 +624,9 @@ class ReplicaSetFixture(interface.ReplFixture): # pylint: disable=too-many-inst
return logging.loggers.new_fixture_node_logger("ShardedClusterFixture", self.job_num,
node_name)
+ if self.replicaset_logging_prefix is not None:
+ node_name = f"{self.replicaset_logging_prefix}:{node_name}"
+
return logging.loggers.new_fixture_node_logger(self.__class__.__name__, self.job_num,
node_name)
diff --git a/buildscripts/resmokelib/testing/fixtures/tenant_migration.py b/buildscripts/resmokelib/testing/fixtures/tenant_migration.py
new file mode 100644
index 00000000000..c1fce354893
--- /dev/null
+++ b/buildscripts/resmokelib/testing/fixtures/tenant_migration.py
@@ -0,0 +1,146 @@
+"""Fixture with multiple replica sets for executing JSTests against."""
+
+import os.path
+
+from buildscripts.resmokelib import config
+from buildscripts.resmokelib import errors
+from buildscripts.resmokelib import logging
+from buildscripts.resmokelib import utils
+from buildscripts.resmokelib.testing.fixtures import interface
+from buildscripts.resmokelib.testing.fixtures import replicaset
+
+
+class TenantMigrationFixture(interface.Fixture): # pylint: disable=too-many-instance-attributes
+ """Fixture which provides JSTests with a set of replica sets to run tenant migration against."""
+
+ def __init__( # pylint: disable=too-many-arguments,too-many-locals
+ self, logger, job_num, mongod_options=None, dbpath_prefix=None, preserve_dbpath=False,
+ num_replica_sets=1, num_nodes_per_replica_set=2, start_initial_sync_node=False,
+ write_concern_majority_journal_default=None, auth_options=None,
+ replset_config_options=None, voting_secondaries=True, all_nodes_electable=False,
+ use_replica_set_connection_string=None, linear_chain=False, mixed_bin_versions=None,
+ default_read_concern=None, default_write_concern=None):
+ """Initialize TenantMigrationFixture with different options for the replica set processes."""
+
+ interface.Fixture.__init__(self, logger, job_num, dbpath_prefix=dbpath_prefix)
+
+ self.mongod_options = utils.default_if_none(mongod_options, {})
+ self.preserve_dbpath = preserve_dbpath
+ self.start_initial_sync_node = start_initial_sync_node
+ self.write_concern_majority_journal_default = write_concern_majority_journal_default
+ self.auth_options = auth_options
+ self.replset_config_options = utils.default_if_none(replset_config_options, {})
+ self.voting_secondaries = voting_secondaries
+ self.all_nodes_electable = all_nodes_electable
+ self.use_replica_set_connection_string = use_replica_set_connection_string
+ self.default_read_concern = default_read_concern
+ self.default_write_concern = default_write_concern
+ self.mixed_bin_versions = utils.default_if_none(mixed_bin_versions,
+ config.MIXED_BIN_VERSIONS)
+ self.mixed_bin_versions_config = self.mixed_bin_versions
+
+ # Use the values given from the command line if they exist for linear_chain and num_nodes.
+ linear_chain_option = utils.default_if_none(config.LINEAR_CHAIN, linear_chain)
+ self.linear_chain = linear_chain_option if linear_chain_option else linear_chain
+ self.num_nodes_per_replica_set = num_nodes_per_replica_set if num_nodes_per_replica_set \
+ else config.NUM_REPLSET_NODES
+ self.num_replica_sets = num_replica_sets if num_replica_sets else config.NUM_REPLSETS
+ if self.num_replica_sets < 2:
+ raise ValueError("num_replica_sets must be greater or equal to 2")
+
+ self.replica_sets = []
+
+ # The ReplicaSetFixture for the replica set that starts out owning the data (i.e. the
+ # replica set that driver should connect to when running commands).
+ self.replica_set_with_tenant = None
+
+ def pids(self):
+ """:return: pids owned by this fixture if any."""
+ out = []
+ for replica_set in self.replica_sets:
+ out.extend(replica_set.pids())
+ if not out:
+ self.logger.debug('No replica sets when gathering multi replicaset fixture pids.')
+ return out
+
+ def setup(self):
+ """Set up the replica sets."""
+ if not self.replica_sets:
+ for i in range(self.num_replica_sets):
+ rs_name = f"rs{i}"
+ mongod_options = self.mongod_options.copy()
+ mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, rs_name)
+ mongod_options["replSet"] = rs_name
+
+ self.replica_sets.append(
+ replicaset.ReplicaSetFixture(
+ self.logger, self.job_num, mongod_options=mongod_options,
+ preserve_dbpath=self.preserve_dbpath,
+ num_nodes=self.num_nodes_per_replica_set, auth_options=self.auth_options,
+ replset_config_options=self.replset_config_options,
+ mixed_bin_versions=self.mixed_bin_versions,
+ replicaset_logging_prefix=rs_name,
+ use_replica_set_connection_string=self.use_replica_set_connection_string))
+
+ self.replica_set_with_tenant = self.replica_sets[0]
+
+ # Start up each of the replica sets
+ for replica_set in self.replica_sets:
+ replica_set.setup()
+
+ def await_ready(self):
+ """Block until the fixture can be used for testing."""
+ # Wait for each of the replica sets
+ for replica_set in self.replica_sets:
+ replica_set.await_ready()
+
+ def _do_teardown(self, mode=None):
+ """Shut down the replica sets."""
+ self.logger.info("Stopping all replica sets...")
+
+ running_at_start = self.is_running()
+ if not running_at_start:
+ self.logger.warning("All replica sets were expected to be running, but weren't.")
+
+ teardown_handler = interface.FixtureTeardownHandler(self.logger)
+
+ for replica_set in self.replica_sets:
+ teardown_handler.teardown(replica_set, "replica_set", mode=mode)
+
+ if teardown_handler.was_successful():
+ self.logger.info("Successfully stopped all replica sets.")
+ else:
+ self.logger.error("Stopping the fixture failed.")
+ raise errors.ServerFailure(teardown_handler.get_error_message())
+
+ def is_running(self):
+ """Return true if all replica sets are still operating."""
+ return all(replica_set.is_running() for replica_set in self.replica_sets)
+
+ def get_num_replica_sets(self):
+ """Return the number of replica sets."""
+ return self.num_replica_sets
+
+ def get_replset(self, index):
+ """Return the ReplicaSetFixture for the replica set at the given index."""
+ if not self.replica_sets:
+ raise ValueError("Must call setup() before calling get_replset")
+ return self.replica_sets[index]
+
+ def get_internal_connection_string(self):
+ """Return the internal connection string to the replica set that currently starts out owning the data."""
+ if not self.replica_sets:
+ raise ValueError("Must call setup() before calling get_internal_connection_string()")
+ return self.replica_set_with_tenant.get_internal_connection_string()
+
+ def get_driver_connection_url(self):
+ """Return the driver connection URL to the replica set that currently starts out owning the data."""
+ if not self.replica_set_with_tenant:
+ raise ValueError("Must call setup() before calling get_driver_connection_url")
+ return self.replica_set_with_tenant.get_driver_connection_url()
+
+ def get_node_info(self):
+ """Return a list of dicts of NodeInfo objects."""
+ output = []
+ for replica_set in self.replica_sets:
+ output += replica_set.get_node_info()
diff --git a/buildscripts/resmokelib/testing/hooks/tenant_migration.py b/buildscripts/resmokelib/testing/hooks/tenant_migration.py
index 28e38192a61..8c974110b9e 100644
--- a/buildscripts/resmokelib/testing/hooks/tenant_migration.py
+++ b/buildscripts/resmokelib/testing/hooks/tenant_migration.py
@@ -11,7 +11,7 @@ import pymongo.errors
from buildscripts.resmokelib import errors
from buildscripts.resmokelib import utils
from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
-from buildscripts.resmokelib.testing.fixtures import replicaset
+from buildscripts.resmokelib.testing.fixtures import tenant_migration
from buildscripts.resmokelib.testing.hooks import interface
@@ -25,17 +25,21 @@ class ContinuousTenantMigration(interface.Hook): # pylint: disable=too-many-ins
Args:
hook_logger: the logger instance for this hook.
- fixture: the target replica set fixture.
+ fixture: the target TenantMigrationFixture containing two replica sets.
shell_options: contains the global_vars which contains TestData.dbPrefix to be used for
tenant migrations.
"""
interface.Hook.__init__(self, hook_logger, fixture, ContinuousTenantMigration.DESCRIPTION)
- self._fixture = fixture
self._db_prefix = shell_options["global_vars"]["TestData"]["dbPrefix"]
+ if not isinstance(fixture, tenant_migration.TenantMigrationFixture) or \
+ fixture.get_num_replica_sets() != 2:
+ raise ValueError(
+ "The ContinuousTenantMigration hook requires a TenantMigrationFixture with two replica sets"
+ )
+ self._tenant_migration_fixture = fixture
- self._rs_fixtures = []
self._tenant_migration_thread = None
def before_suite(self, test_report):
@@ -43,8 +47,8 @@ class ContinuousTenantMigration(interface.Hook): # pylint: disable=too-many-ins
# TODO (SERVER-50496): Make the hook start the migration thread once here instead of inside
# before_test and make it run migrations continuously back and forth between the two replica
# sets.
- if not self._rs_fixtures:
- self._add_fixture(self._fixture)
+ if not self._tenant_migration_fixture:
+ raise ValueError("No replica set pair to run migrations on")
def after_suite(self, test_report):
"""After suite."""
@@ -53,8 +57,8 @@ class ContinuousTenantMigration(interface.Hook): # pylint: disable=too-many-ins
def before_test(self, test, test_report):
"""Before test."""
self.logger.info("Starting the migration thread.")
- self._tenant_migration_thread = _TenantMigrationThread(self.logger, self._rs_fixtures,
- self._db_prefix)
+ self._tenant_migration_thread = _TenantMigrationThread(
+ self.logger, self._tenant_migration_fixture, self._db_prefix)
self._tenant_migration_thread.start()
def after_test(self, test, test_report):
@@ -63,38 +67,35 @@ class ContinuousTenantMigration(interface.Hook): # pylint: disable=too-many-ins
self._tenant_migration_thread.stop()
self.logger.info("migration thread stopped.")
- def _add_fixture(self, fixture):
- if isinstance(fixture, replicaset.ReplicaSetFixture):
- self._rs_fixtures.append(fixture)
-
class _TenantMigrationThread(threading.Thread): # pylint: disable=too-many-instance-attributes
- MAX_SLEEP_SECS = 0.1
- MAX_BLOCK_TIME_MILLISECS = 5 * 1000
+ MIN_START_MIGRATION_DELAY_SECS = 0.1
+ MAX_START_MIGRATION_DELAY_SECS = 0.25
+ MIN_BLOCK_TIME_SECS = 1
+ MAX_BLOCK_TIME_SECS = 2.5
DONOR_START_MIGRATION_POLL_INTERVAL_SECS = 0.1
- TENANT_MIGRATION_ABORTED_ERROR_CODE = 325
- def __init__(self, logger, rs_fixtures, db_prefix):
+ def __init__(self, logger, tenant_migration_fixture, db_prefix):
"""Initialize _TenantMigrationThread."""
threading.Thread.__init__(self, name="TenantMigrationThread")
self.daemon = True
self.logger = logger
- self._rs_fixtures = rs_fixtures
+ self._tenant_migration_fixture = tenant_migration_fixture
self._db_prefix = db_prefix
self._last_exec = time.time()
def run(self):
"""Execute the thread."""
- if not self._rs_fixtures:
- self.logger.warning("No replica set on which to run migrations.")
+ if not self._tenant_migration_fixture:
+ self.logger.warning("No replica set pair to run migrations on.")
return
try:
now = time.time()
self.logger.info("Starting a tenant migration for database prefix '%s'",
self._db_prefix)
- self._run_migration(self._rs_fixtures[0])
+ self._run_migration(self._tenant_migration_fixture)
self._last_exec = time.time()
self.logger.info("Completed a tenant migration in %0d ms",
(self._last_exec - now) * 1000)
@@ -114,12 +115,14 @@ class _TenantMigrationThread(threading.Thread): # pylint: disable=too-many-inst
# end of each test so that each test uses its own randomly generated block time.
try:
donor_primary_client.admin.command(
- bson.SON([("configureFailPoint", "abortTenantMigrationAfterBlockingStarts"),
- ("mode", "alwaysOn"),
- ("data",
- bson.SON([("blockTimeMS",
- random.uniform(
- 0, _TenantMigrationThread.MAX_BLOCK_TIME_MILLISECS))]))]))
+ bson.SON(
+ [("configureFailPoint", "abortTenantMigrationAfterBlockingStarts"),
+ ("mode", "alwaysOn"),
+ ("data",
+ bson.SON(
+ [("blockTimeMS",
+ 1000 * random.uniform(_TenantMigrationThread.MIN_BLOCK_TIME_SECS,
+ _TenantMigrationThread.MAX_BLOCK_TIME_SECS))]))]))
except pymongo.errors.OperationFailure as err:
self.logger.exception(
"Unable to enable the failpoint to make migrations abort on donor primary on port "
@@ -143,24 +146,29 @@ class _TenantMigrationThread(threading.Thread): # pylint: disable=too-many-inst
+ "{} of replica set '{}': {}".format(donor_primary_port, donor_primary_rs_name,
err.args[0]))
- def _run_migration(self, rs_fixture):
- donor_primary = rs_fixture.get_primary()
+ def _run_migration(self, tenant_migration_fixture):
+ donor_rs = tenant_migration_fixture.get_replset(0)
+ recipient_rs = tenant_migration_fixture.get_replset(1)
+
+ donor_primary = donor_rs.get_primary()
donor_primary_client = donor_primary.mongo_client()
- time.sleep(random.uniform(0, _TenantMigrationThread.MAX_SLEEP_SECS))
+ time.sleep(
+ random.uniform(_TenantMigrationThread.MIN_START_MIGRATION_DELAY_SECS,
+ _TenantMigrationThread.MAX_START_MIGRATION_DELAY_SECS))
self.logger.info(
"Starting a tenant migration with donor primary on port %d of replica set '%s'.",
- donor_primary.port, rs_fixture.replset_name)
+ donor_primary.port, donor_rs.replset_name)
cmd_obj = {
"donorStartMigration": 1, "migrationId": bson.Binary(uuid.uuid4().bytes, 4),
- "recipientConnectionString": "dummySet/dummyHost:1234",
+ "recipientConnectionString": recipient_rs.get_driver_connection_url(),
"databasePrefix": self._db_prefix, "readPreference": {"mode": "primary"}
}
try:
- self._enable_abort(donor_primary_client, donor_primary.port, rs_fixture.replset_name)
+ self._enable_abort(donor_primary_client, donor_primary.port, donor_rs.replset_name)
while True:
# Keep polling the migration state until the migration completes, otherwise we might
@@ -172,17 +180,10 @@ class _TenantMigrationThread(threading.Thread): # pylint: disable=too-many-inst
if (not res["ok"] or res["state"] == "committed" or res["state"] == "aborted"):
break
time.sleep(_TenantMigrationThread.DONOR_START_MIGRATION_POLL_INTERVAL_SECS)
- except pymongo.errors.OperationFailure as err:
- if err.code == _TenantMigrationThread.TENANT_MIGRATION_ABORTED_ERROR_CODE:
- self.logger.exception(
- "tenant migration with donor primary on port %d of replica set '%s' aborted.",
- donor_primary.port, rs_fixture.replset_name)
- return
- raise
except pymongo.errors.PyMongoError:
self.logger.exception(
"Error running tenant migration with donor primary on port %d of replica set '%s'.",
- donor_primary.port, rs_fixture.replset_name)
+ donor_primary.port, donor_rs.replset_name)
raise
finally:
- self._disable_abort(donor_primary_client, donor_primary.port, rs_fixture.replset_name)
+ self._disable_abort(donor_primary_client, donor_primary.port, donor_rs.replset_name)