diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2020-10-06 18:29:00 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-10-12 22:26:05 +0000 |
commit | d7f220af7070c6c3ef510d0ebe65172d90f9f73f (patch) | |
tree | 697316e86650e1b0bba86acb659290e16d022263 | |
parent | 46835741b3abaeb10aea8b3c1ef67616f95115b0 (diff) | |
download | mongo-d7f220af7070c6c3ef510d0ebe65172d90f9f73f.tar.gz |
SERVER-50493 Make tenant_migration_jscore_passthrough simulate a complete migration between two real replica sets that aborts
5 files changed, 199 insertions, 46 deletions
diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml index 0dc9bb17995..27f3a8341d3 100644 --- a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml @@ -90,12 +90,11 @@ executor: # clear the in-memory and persisted state for each migration. n: 1 fixture: - class: ReplicaSetFixture + class: TenantMigrationFixture mongod_options: set_parameters: enableTestCommands: 1 enableTenantMigrations: true - failpoint.skipSendingRecipientSyncDataCommand: - mode: alwaysOn - num_nodes: 3 + num_replica_sets: 2 + num_nodes_per_replica_set: 3 use_replica_set_connection_string: true diff --git a/buildscripts/resmokelib/config.py b/buildscripts/resmokelib/config.py index b5d2dc591a8..05b4ea6ca83 100644 --- a/buildscripts/resmokelib/config.py +++ b/buildscripts/resmokelib/config.py @@ -422,6 +422,9 @@ MIXED_BIN_VERSIONS = None # Specifies the number of replica set members in a ReplicaSetFixture. NUM_REPLSET_NODES = None +# Specifies the number of replica sets in a MultiReplicaSetFixture. +NUM_REPLSETS = None + # Specifies the number of shards in a ShardedClusterFixture. NUM_SHARDS = None diff --git a/buildscripts/resmokelib/testing/fixtures/replicaset.py b/buildscripts/resmokelib/testing/fixtures/replicaset.py index 85c4366fe4b..93dd66c3fca 100644 --- a/buildscripts/resmokelib/testing/fixtures/replicaset.py +++ b/buildscripts/resmokelib/testing/fixtures/replicaset.py @@ -33,7 +33,7 @@ class ReplicaSetFixture(interface.ReplFixture): # pylint: disable=too-many-inst auth_options=None, replset_config_options=None, voting_secondaries=True, all_nodes_electable=False, use_replica_set_connection_string=None, linear_chain=False, mixed_bin_versions=None, default_read_concern=None, default_write_concern=None, - shard_logging_prefix=None): + shard_logging_prefix=None, replicaset_logging_prefix=None): """Initialize ReplicaSetFixture.""" interface.ReplFixture.__init__(self, logger, job_num, dbpath_prefix=dbpath_prefix) @@ -53,6 +53,7 @@ class ReplicaSetFixture(interface.ReplFixture): # pylint: disable=too-many-inst config.MIXED_BIN_VERSIONS) self.mixed_bin_versions_config = self.mixed_bin_versions self.shard_logging_prefix = shard_logging_prefix + self.replicaset_logging_prefix = replicaset_logging_prefix # Use the values given from the command line if they exist for linear_chain and num_nodes. linear_chain_option = utils.default_if_none(config.LINEAR_CHAIN, linear_chain) @@ -623,6 +624,9 @@ class ReplicaSetFixture(interface.ReplFixture): # pylint: disable=too-many-inst return logging.loggers.new_fixture_node_logger("ShardedClusterFixture", self.job_num, node_name) + if self.replicaset_logging_prefix is not None: + node_name = f"{self.replicaset_logging_prefix}:{node_name}" + return logging.loggers.new_fixture_node_logger(self.__class__.__name__, self.job_num, node_name) diff --git a/buildscripts/resmokelib/testing/fixtures/tenant_migration.py b/buildscripts/resmokelib/testing/fixtures/tenant_migration.py new file mode 100644 index 00000000000..c1fce354893 --- /dev/null +++ b/buildscripts/resmokelib/testing/fixtures/tenant_migration.py @@ -0,0 +1,146 @@ +"""Fixture with multiple replica sets for executing JSTests against.""" + +import os.path + +from buildscripts.resmokelib import config +from buildscripts.resmokelib import errors +from buildscripts.resmokelib import logging +from buildscripts.resmokelib import utils +from buildscripts.resmokelib.testing.fixtures import interface +from buildscripts.resmokelib.testing.fixtures import replicaset + + +class TenantMigrationFixture(interface.Fixture): # pylint: disable=too-many-instance-attributes + """Fixture which provides JSTests with a set of replica sets to run tenant migration against.""" + + def __init__( # pylint: disable=too-many-arguments,too-many-locals + self, logger, job_num, mongod_options=None, dbpath_prefix=None, preserve_dbpath=False, + num_replica_sets=1, num_nodes_per_replica_set=2, start_initial_sync_node=False, + write_concern_majority_journal_default=None, auth_options=None, + replset_config_options=None, voting_secondaries=True, all_nodes_electable=False, + use_replica_set_connection_string=None, linear_chain=False, mixed_bin_versions=None, + default_read_concern=None, default_write_concern=None): + """Initialize TenantMigrationFixture with different options for the replica set processes.""" + + interface.Fixture.__init__(self, logger, job_num, dbpath_prefix=dbpath_prefix) + + self.mongod_options = utils.default_if_none(mongod_options, {}) + self.preserve_dbpath = preserve_dbpath + self.start_initial_sync_node = start_initial_sync_node + self.write_concern_majority_journal_default = write_concern_majority_journal_default + self.auth_options = auth_options + self.replset_config_options = utils.default_if_none(replset_config_options, {}) + self.voting_secondaries = voting_secondaries + self.all_nodes_electable = all_nodes_electable + self.use_replica_set_connection_string = use_replica_set_connection_string + self.default_read_concern = default_read_concern + self.default_write_concern = default_write_concern + self.mixed_bin_versions = utils.default_if_none(mixed_bin_versions, + config.MIXED_BIN_VERSIONS) + self.mixed_bin_versions_config = self.mixed_bin_versions + + # Use the values given from the command line if they exist for linear_chain and num_nodes. + linear_chain_option = utils.default_if_none(config.LINEAR_CHAIN, linear_chain) + self.linear_chain = linear_chain_option if linear_chain_option else linear_chain + self.num_nodes_per_replica_set = num_nodes_per_replica_set if num_nodes_per_replica_set \ + else config.NUM_REPLSET_NODES + self.num_replica_sets = num_replica_sets if num_replica_sets else config.NUM_REPLSETS + if self.num_replica_sets < 2: + raise ValueError("num_replica_sets must be greater or equal to 2") + + self.replica_sets = [] + + # The ReplicaSetFixture for the replica set that starts out owning the data (i.e. the + # replica set that driver should connect to when running commands). + self.replica_set_with_tenant = None + + def pids(self): + """:return: pids owned by this fixture if any.""" + out = [] + for replica_set in self.replica_sets: + out.extend(replica_set.pids()) + if not out: + self.logger.debug('No replica sets when gathering multi replicaset fixture pids.') + return out + + def setup(self): + """Set up the replica sets.""" + if not self.replica_sets: + for i in range(self.num_replica_sets): + rs_name = f"rs{i}" + mongod_options = self.mongod_options.copy() + mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, rs_name) + mongod_options["replSet"] = rs_name + + self.replica_sets.append( + replicaset.ReplicaSetFixture( + self.logger, self.job_num, mongod_options=mongod_options, + preserve_dbpath=self.preserve_dbpath, + num_nodes=self.num_nodes_per_replica_set, auth_options=self.auth_options, + replset_config_options=self.replset_config_options, + mixed_bin_versions=self.mixed_bin_versions, + replicaset_logging_prefix=rs_name, + use_replica_set_connection_string=self.use_replica_set_connection_string)) + + self.replica_set_with_tenant = self.replica_sets[0] + + # Start up each of the replica sets + for replica_set in self.replica_sets: + replica_set.setup() + + def await_ready(self): + """Block until the fixture can be used for testing.""" + # Wait for each of the replica sets + for replica_set in self.replica_sets: + replica_set.await_ready() + + def _do_teardown(self, mode=None): + """Shut down the replica sets.""" + self.logger.info("Stopping all replica sets...") + + running_at_start = self.is_running() + if not running_at_start: + self.logger.warning("All replica sets were expected to be running, but weren't.") + + teardown_handler = interface.FixtureTeardownHandler(self.logger) + + for replica_set in self.replica_sets: + teardown_handler.teardown(replica_set, "replica_set", mode=mode) + + if teardown_handler.was_successful(): + self.logger.info("Successfully stopped all replica sets.") + else: + self.logger.error("Stopping the fixture failed.") + raise errors.ServerFailure(teardown_handler.get_error_message()) + + def is_running(self): + """Return true if all replica sets are still operating.""" + return all(replica_set.is_running() for replica_set in self.replica_sets) + + def get_num_replica_sets(self): + """Return the number of replica sets.""" + return self.num_replica_sets + + def get_replset(self, index): + """Return the ReplicaSetFixture for the replica set at the given index.""" + if not self.replica_sets: + raise ValueError("Must call setup() before calling get_replset") + return self.replica_sets[index] + + def get_internal_connection_string(self): + """Return the internal connection string to the replica set that currently starts out owning the data.""" + if not self.replica_sets: + raise ValueError("Must call setup() before calling get_internal_connection_string()") + return self.replica_set_with_tenant.get_internal_connection_string() + + def get_driver_connection_url(self): + """Return the driver connection URL to the replica set that currently starts out owning the data.""" + if not self.replica_set_with_tenant: + raise ValueError("Must call setup() before calling get_driver_connection_url") + return self.replica_set_with_tenant.get_driver_connection_url() + + def get_node_info(self): + """Return a list of dicts of NodeInfo objects.""" + output = [] + for replica_set in self.replica_sets: + output += replica_set.get_node_info() diff --git a/buildscripts/resmokelib/testing/hooks/tenant_migration.py b/buildscripts/resmokelib/testing/hooks/tenant_migration.py index 28e38192a61..8c974110b9e 100644 --- a/buildscripts/resmokelib/testing/hooks/tenant_migration.py +++ b/buildscripts/resmokelib/testing/hooks/tenant_migration.py @@ -11,7 +11,7 @@ import pymongo.errors from buildscripts.resmokelib import errors from buildscripts.resmokelib import utils from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface -from buildscripts.resmokelib.testing.fixtures import replicaset +from buildscripts.resmokelib.testing.fixtures import tenant_migration from buildscripts.resmokelib.testing.hooks import interface @@ -25,17 +25,21 @@ class ContinuousTenantMigration(interface.Hook): # pylint: disable=too-many-ins Args: hook_logger: the logger instance for this hook. - fixture: the target replica set fixture. + fixture: the target TenantMigrationFixture containing two replica sets. shell_options: contains the global_vars which contains TestData.dbPrefix to be used for tenant migrations. """ interface.Hook.__init__(self, hook_logger, fixture, ContinuousTenantMigration.DESCRIPTION) - self._fixture = fixture self._db_prefix = shell_options["global_vars"]["TestData"]["dbPrefix"] + if not isinstance(fixture, tenant_migration.TenantMigrationFixture) or \ + fixture.get_num_replica_sets() != 2: + raise ValueError( + "The ContinuousTenantMigration hook requires a TenantMigrationFixture with two replica sets" + ) + self._tenant_migration_fixture = fixture - self._rs_fixtures = [] self._tenant_migration_thread = None def before_suite(self, test_report): @@ -43,8 +47,8 @@ class ContinuousTenantMigration(interface.Hook): # pylint: disable=too-many-ins # TODO (SERVER-50496): Make the hook start the migration thread once here instead of inside # before_test and make it run migrations continuously back and forth between the two replica # sets. - if not self._rs_fixtures: - self._add_fixture(self._fixture) + if not self._tenant_migration_fixture: + raise ValueError("No replica set pair to run migrations on") def after_suite(self, test_report): """After suite.""" @@ -53,8 +57,8 @@ class ContinuousTenantMigration(interface.Hook): # pylint: disable=too-many-ins def before_test(self, test, test_report): """Before test.""" self.logger.info("Starting the migration thread.") - self._tenant_migration_thread = _TenantMigrationThread(self.logger, self._rs_fixtures, - self._db_prefix) + self._tenant_migration_thread = _TenantMigrationThread( + self.logger, self._tenant_migration_fixture, self._db_prefix) self._tenant_migration_thread.start() def after_test(self, test, test_report): @@ -63,38 +67,35 @@ class ContinuousTenantMigration(interface.Hook): # pylint: disable=too-many-ins self._tenant_migration_thread.stop() self.logger.info("migration thread stopped.") - def _add_fixture(self, fixture): - if isinstance(fixture, replicaset.ReplicaSetFixture): - self._rs_fixtures.append(fixture) - class _TenantMigrationThread(threading.Thread): # pylint: disable=too-many-instance-attributes - MAX_SLEEP_SECS = 0.1 - MAX_BLOCK_TIME_MILLISECS = 5 * 1000 + MIN_START_MIGRATION_DELAY_SECS = 0.1 + MAX_START_MIGRATION_DELAY_SECS = 0.25 + MIN_BLOCK_TIME_SECS = 1 + MAX_BLOCK_TIME_SECS = 2.5 DONOR_START_MIGRATION_POLL_INTERVAL_SECS = 0.1 - TENANT_MIGRATION_ABORTED_ERROR_CODE = 325 - def __init__(self, logger, rs_fixtures, db_prefix): + def __init__(self, logger, tenant_migration_fixture, db_prefix): """Initialize _TenantMigrationThread.""" threading.Thread.__init__(self, name="TenantMigrationThread") self.daemon = True self.logger = logger - self._rs_fixtures = rs_fixtures + self._tenant_migration_fixture = tenant_migration_fixture self._db_prefix = db_prefix self._last_exec = time.time() def run(self): """Execute the thread.""" - if not self._rs_fixtures: - self.logger.warning("No replica set on which to run migrations.") + if not self._tenant_migration_fixture: + self.logger.warning("No replica set pair to run migrations on.") return try: now = time.time() self.logger.info("Starting a tenant migration for database prefix '%s'", self._db_prefix) - self._run_migration(self._rs_fixtures[0]) + self._run_migration(self._tenant_migration_fixture) self._last_exec = time.time() self.logger.info("Completed a tenant migration in %0d ms", (self._last_exec - now) * 1000) @@ -114,12 +115,14 @@ class _TenantMigrationThread(threading.Thread): # pylint: disable=too-many-inst # end of each test so that each test uses its own randomly generated block time. try: donor_primary_client.admin.command( - bson.SON([("configureFailPoint", "abortTenantMigrationAfterBlockingStarts"), - ("mode", "alwaysOn"), - ("data", - bson.SON([("blockTimeMS", - random.uniform( - 0, _TenantMigrationThread.MAX_BLOCK_TIME_MILLISECS))]))])) + bson.SON( + [("configureFailPoint", "abortTenantMigrationAfterBlockingStarts"), + ("mode", "alwaysOn"), + ("data", + bson.SON( + [("blockTimeMS", + 1000 * random.uniform(_TenantMigrationThread.MIN_BLOCK_TIME_SECS, + _TenantMigrationThread.MAX_BLOCK_TIME_SECS))]))])) except pymongo.errors.OperationFailure as err: self.logger.exception( "Unable to enable the failpoint to make migrations abort on donor primary on port " @@ -143,24 +146,29 @@ class _TenantMigrationThread(threading.Thread): # pylint: disable=too-many-inst + "{} of replica set '{}': {}".format(donor_primary_port, donor_primary_rs_name, err.args[0])) - def _run_migration(self, rs_fixture): - donor_primary = rs_fixture.get_primary() + def _run_migration(self, tenant_migration_fixture): + donor_rs = tenant_migration_fixture.get_replset(0) + recipient_rs = tenant_migration_fixture.get_replset(1) + + donor_primary = donor_rs.get_primary() donor_primary_client = donor_primary.mongo_client() - time.sleep(random.uniform(0, _TenantMigrationThread.MAX_SLEEP_SECS)) + time.sleep( + random.uniform(_TenantMigrationThread.MIN_START_MIGRATION_DELAY_SECS, + _TenantMigrationThread.MAX_START_MIGRATION_DELAY_SECS)) self.logger.info( "Starting a tenant migration with donor primary on port %d of replica set '%s'.", - donor_primary.port, rs_fixture.replset_name) + donor_primary.port, donor_rs.replset_name) cmd_obj = { "donorStartMigration": 1, "migrationId": bson.Binary(uuid.uuid4().bytes, 4), - "recipientConnectionString": "dummySet/dummyHost:1234", + "recipientConnectionString": recipient_rs.get_driver_connection_url(), "databasePrefix": self._db_prefix, "readPreference": {"mode": "primary"} } try: - self._enable_abort(donor_primary_client, donor_primary.port, rs_fixture.replset_name) + self._enable_abort(donor_primary_client, donor_primary.port, donor_rs.replset_name) while True: # Keep polling the migration state until the migration completes, otherwise we might @@ -172,17 +180,10 @@ class _TenantMigrationThread(threading.Thread): # pylint: disable=too-many-inst if (not res["ok"] or res["state"] == "committed" or res["state"] == "aborted"): break time.sleep(_TenantMigrationThread.DONOR_START_MIGRATION_POLL_INTERVAL_SECS) - except pymongo.errors.OperationFailure as err: - if err.code == _TenantMigrationThread.TENANT_MIGRATION_ABORTED_ERROR_CODE: - self.logger.exception( - "tenant migration with donor primary on port %d of replica set '%s' aborted.", - donor_primary.port, rs_fixture.replset_name) - return - raise except pymongo.errors.PyMongoError: self.logger.exception( "Error running tenant migration with donor primary on port %d of replica set '%s'.", - donor_primary.port, rs_fixture.replset_name) + donor_primary.port, donor_rs.replset_name) raise finally: - self._disable_abort(donor_primary_client, donor_primary.port, rs_fixture.replset_name) + self._disable_abort(donor_primary_client, donor_primary.port, donor_rs.replset_name) |