diff options
author | Matt Broadstone <mbroadst@mongodb.com> | 2023-04-06 19:15:58 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-06 23:48:17 +0000 |
commit | 579a14858a79058e9ed6adeac98274b3423a4d4f (patch) | |
tree | 451ba121c44c6d891ad438b6cf9db14cf09acd84 /buildscripts/resmokelib | |
parent | 5758aa24889d38935af4ce54444ec18910fdfa45 (diff) | |
download | mongo-579a14858a79058e9ed6adeac98274b3423a4d4f.tar.gz |
SERVER-73129 Use driver retryability in tenant migration hook
Diffstat (limited to 'buildscripts/resmokelib')
6 files changed, 351 insertions, 414 deletions
diff --git a/buildscripts/resmokelib/testing/fixtures/fixturelib.py b/buildscripts/resmokelib/testing/fixtures/fixturelib.py index e2ff39d983c..ba08735fe42 100644 --- a/buildscripts/resmokelib/testing/fixtures/fixturelib.py +++ b/buildscripts/resmokelib/testing/fixtures/fixturelib.py @@ -10,6 +10,7 @@ from buildscripts.resmokelib.core import network from buildscripts.resmokelib.utils.dictionary import merge_dicts from buildscripts.resmokelib.utils.history import make_historic as _make_historic from buildscripts.resmokelib.testing.fixtures import _builder +from buildscripts.resmokelib.testing.retry import with_naive_retry class FixtureLib: diff --git a/buildscripts/resmokelib/testing/fixtures/shard_split.py b/buildscripts/resmokelib/testing/fixtures/shard_split.py index 6a19db5d8f1..5be3e390efa 100644 --- a/buildscripts/resmokelib/testing/fixtures/shard_split.py +++ b/buildscripts/resmokelib/testing/fixtures/shard_split.py @@ -9,6 +9,7 @@ import pymongo from bson.objectid import ObjectId import buildscripts.resmokelib.testing.fixtures.interface as interface +from buildscripts.resmokelib.testing.fixtures.fixturelib import with_naive_retry def _is_replica_set_fixture(fixture): @@ -200,10 +201,11 @@ class ShardSplitFixture(interface.MultiClusterFixture): return self.fixtures[1:] def _create_client(self, fixture, **kwargs): - return fixture.mongo_client( - username=self.auth_options["username"], password=self.auth_options["password"], - authSource=self.auth_options["authenticationDatabase"], - authMechanism=self.auth_options["authenticationMechanism"], **kwargs) + return fixture.mongo_client(username=self.auth_options["username"], + password=self.auth_options["password"], + authSource=self.auth_options["authenticationDatabase"], + authMechanism=self.auth_options["authenticationMechanism"], + uuidRepresentation='standard', **kwargs) def add_recipient_nodes(self, recipient_set_name, recipient_tag_name=None): """Build recipient nodes, and reconfig them into the donor as non-voting members.""" @@ -245,63 +247,64 @@ class ShardSplitFixture(interface.MultiClusterFixture): # Reconfig the donor to add the recipient nodes as non-voting members donor_client = self._create_client(self.get_donor_rs()) - while True: - try: - repl_config = donor_client.admin.command({"replSetGetConfig": 1})["config"] - repl_members = repl_config["members"] - - for recipient_node in recipient_nodes: - # It is possible for the reconfig below to fail with a retryable error code like - # 'InterruptedDueToReplStateChange'. In these cases, we need to run the reconfig - # again, but some or all of the recipient nodes might have already been added to - # the member list. Only add recipient nodes which have not yet been added on a - # retry. - recipient_host = recipient_node.get_internal_connection_string() - recipient_entry = { - "host": recipient_host, "votes": 0, "priority": 0, "hidden": True, - "tags": {recipient_tag_name: str(ObjectId())} - } - member_exists = False - for index, member in enumerate(repl_members): - if member["host"] == recipient_host: - repl_members[index] = recipient_entry - member_exists = True - - if not member_exists: - repl_members.append(recipient_entry) - - # Re-index all members from 0 - for idx, member in enumerate(repl_members): - member["_id"] = idx - - # Prepare the new config - repl_config["version"] = repl_config["version"] + 1 - repl_config["members"] = repl_members + repl_config = with_naive_retry(lambda: donor_client.admin.command({"replSetGetConfig": 1})[ + "config"]) + repl_members = repl_config["members"] - self.logger.info( - f"Reconfiguring donor replica set to add non-voting recipient nodes: {repl_config}" - ) - donor_client.admin.command({ - "replSetReconfig": repl_config, - "maxTimeMS": self.AWAIT_REPL_TIMEOUT_MINS * 60 * 1000 - }) - - # Wait for recipient nodes to become secondaries - self._await_recipient_nodes() - return - except pymongo.errors.ConnectionFailure as err: - self.logger.info( - f"Retrying adding recipient nodes on replica set '{donor_rs_name}' after error: {str(err)}." - ) - continue + for recipient_node in recipient_nodes: + # It is possible for the reconfig below to fail with a retryable error code like + # 'InterruptedDueToReplStateChange'. In these cases, we need to run the reconfig + # again, but some or all of the recipient nodes might have already been added to + # the member list. Only add recipient nodes which have not yet been added on a + # retry. + recipient_host = recipient_node.get_internal_connection_string() + recipient_entry = { + "host": recipient_host, "votes": 0, "priority": 0, "hidden": True, + "tags": {recipient_tag_name: str(ObjectId())} + } + member_exists = False + for index, member in enumerate(repl_members): + if member["host"] == recipient_host: + repl_members[index] = recipient_entry + member_exists = True + + if not member_exists: + repl_members.append(recipient_entry) + + # Re-index all members from 0 + for idx, member in enumerate(repl_members): + member["_id"] = idx + + # Prepare the new config + repl_config["version"] = repl_config["version"] + 1 + repl_config["members"] = repl_members - def _await_recipient_nodes(self): + self.logger.info( + f"Reconfiguring donor replica set to add non-voting recipient nodes: {repl_config}") + with_naive_retry(lambda: donor_client.admin.command({ + "replSetReconfig": repl_config, "maxTimeMS": self.AWAIT_REPL_TIMEOUT_MINS * 60 * 1000 + })) + + # Wait for recipient nodes to become secondaries + self._await_recipient_nodes() + + def _await_recipient_nodes(self, timeout_secs=None): """Wait for recipient nodes to become available.""" + if timeout_secs is None: + timeout_secs = self.AWAIT_REPL_TIMEOUT_MINS * 60 + + start = time.time() recipient_nodes = self.get_recipient_nodes() for recipient_node in recipient_nodes: recipient_client = self._create_client(recipient_node, read_preference=pymongo.ReadPreference.SECONDARY) while True: + now = time.time() + if (now - start) >= timeout_secs: + msg = f"Timed out while waiting for secondary on port {recipient_node.port} to become available." + self.logger.error(msg) + raise self.fixturelib.ServerFailure(msg) + self.logger.info( f"Waiting for secondary on port {recipient_node.port} to become available.") try: @@ -311,8 +314,6 @@ class ShardSplitFixture(interface.MultiClusterFixture): except pymongo.errors.OperationFailure as err: if err.code != ShardSplitFixture._INTERRUPTED_DUE_TO_STORAGE_CHANGE: raise - except pymongo.errors.ConnectionFailure: - pass time.sleep(0.1) # Wait a little bit before trying again. self.logger.info(f"Secondary on port {recipient_node.port} is now available.") @@ -333,39 +334,30 @@ class ShardSplitFixture(interface.MultiClusterFixture): self.fixtures = [donor_rs] donor_client = self._create_client(self.get_donor_rs()) - while True: - try: - repl_config = donor_client.admin.command({"replSetGetConfig": 1})["config"] - repl_members = [ - member for member in repl_config["members"] - if not 'tags' in member or not recipient_tag_name in member["tags"] - ] - - # Re-index all members from 0 - for idx, member in enumerate(repl_members): - member["_id"] = idx - - # Prepare the new config - repl_config["version"] = repl_config["version"] + 1 - repl_config["members"] = repl_members - - # It's possible that the recipient config has been removed in a previous remove attempt. - if "recipientConfig" in repl_config: - del repl_config["recipientConfig"] + repl_config = with_naive_retry(lambda: donor_client.admin.command({"replSetGetConfig": 1})[ + "config"]) + repl_members = [ + member for member in repl_config["members"] + if not 'tags' in member or not recipient_tag_name in member["tags"] + ] - self.logger.info( - f"Reconfiguring donor '{donor_rs_name}' to remove recipient nodes: {repl_config}" - ) - donor_client.admin.command({ - "replSetReconfig": repl_config, - "maxTimeMS": self.AWAIT_REPL_TIMEOUT_MINS * 60 * 1000 - }) - break - except pymongo.errors.ConnectionFailure as err: - self.logger.info( - f"Retrying removing recipient nodes from donor '{donor_rs_name}' after error: {str(err)}." - ) - continue + # Re-index all members from 0 + for idx, member in enumerate(repl_members): + member["_id"] = idx + + # Prepare the new config + repl_config["version"] = repl_config["version"] + 1 + repl_config["members"] = repl_members + + # It's possible that the recipient config has been removed in a previous remove attempt. + if "recipientConfig" in repl_config: + del repl_config["recipientConfig"] + + self.logger.info( + f"Reconfiguring donor '{donor_rs_name}' to remove recipient nodes: {repl_config}") + with_naive_retry(lambda: donor_client.admin.command({ + "replSetReconfig": repl_config, "maxTimeMS": self.AWAIT_REPL_TIMEOUT_MINS * 60 * 1000 + })) self.logger.info("Tearing down recipient nodes and removing data directories.") for recipient_node in reversed(recipient_nodes): diff --git a/buildscripts/resmokelib/testing/fixtures/tenant_migration.py b/buildscripts/resmokelib/testing/fixtures/tenant_migration.py index 3a45424a66a..5e4ede264f9 100644 --- a/buildscripts/resmokelib/testing/fixtures/tenant_migration.py +++ b/buildscripts/resmokelib/testing/fixtures/tenant_migration.py @@ -3,12 +3,15 @@ import os.path import buildscripts.resmokelib.testing.fixtures.interface as interface -from buildscripts.resmokelib.testing.fixtures.fixturelib import FixtureLib +from buildscripts.resmokelib.testing.fixtures.fixturelib import with_naive_retry class TenantMigrationFixture(interface.MultiClusterFixture): """Fixture which provides JSTests with a set of replica sets to run tenant migration against.""" + AWAIT_REPL_TIMEOUT_MINS = 5 + AWAIT_REPL_TIMEOUT_FOREVER_MINS = 24 * 60 + def __init__(self, logger, job_num, fixturelib, common_mongod_options=None, per_mongod_options=None, dbpath_prefix=None, preserve_dbpath=False, num_replica_sets=1, num_nodes_per_replica_set=2, start_initial_sync_node=False, @@ -155,18 +158,25 @@ class TenantMigrationFixture(interface.MultiClusterFixture): """Return the replica sets involved in the tenant migration.""" return self.replica_sets.copy() + def _create_client(self, fixture, **kwargs): + return fixture.mongo_client(username=self.auth_options["username"], + password=self.auth_options["password"], + authSource=self.auth_options["authenticationDatabase"], + authMechanism=self.auth_options["authenticationMechanism"], + uuidRepresentation='standard', **kwargs) + def _create_tenant_migration_donor_and_recipient_roles(self, rs): """Create a role for tenant migration donor and recipient.""" primary = rs.get_primary() - primary_client = interface.build_client(primary, self.auth_options) + primary_client = self._create_client(primary) try: - primary_client.admin.command({ + with_naive_retry(lambda: primary_client.admin.command({ "createRole": "tenantMigrationDonorRole", "privileges": [{ "resource": {"cluster": True}, "actions": ["runTenantMigration"] }, {"resource": {"db": "admin", "collection": "system.keys"}, "actions": ["find"]}], "roles": [] - }) + })) except: self.logger.exception( "Error creating tenant migration donor role on primary on port %d of replica" + @@ -174,7 +184,7 @@ class TenantMigrationFixture(interface.MultiClusterFixture): raise try: - primary_client.admin.command({ + with_naive_retry(lambda: primary_client.admin.command({ "createRole": "tenantMigrationRecipientRole", "privileges": [{ "resource": {"cluster": True}, @@ -184,7 +194,7 @@ class TenantMigrationFixture(interface.MultiClusterFixture): "resource": {"anyResource": True}, "actions": ["dbStats", "collStats", "find", "listIndexes"] }], "roles": [] - }) + })) except: self.logger.exception( "Error creating tenant migration recipient role on primary on port %d of replica" + diff --git a/buildscripts/resmokelib/testing/hooks/shard_split.py b/buildscripts/resmokelib/testing/hooks/shard_split.py index 0d85e5fcdb9..876f35c6fd0 100644 --- a/buildscripts/resmokelib/testing/hooks/shard_split.py +++ b/buildscripts/resmokelib/testing/hooks/shard_split.py @@ -6,14 +6,14 @@ import threading import time import uuid -import bson -import pymongo.errors - +from bson.binary import Binary, UUID_SUBTYPE from bson.objectid import ObjectId +from pymongo.errors import OperationFailure, PyMongoError from buildscripts.resmokelib import errors from buildscripts.resmokelib.testing.fixtures import shard_split from buildscripts.resmokelib.testing.fixtures.replicaset import ReplicaSetFixture +from buildscripts.resmokelib.testing.fixtures.fixturelib import with_naive_retry from buildscripts.resmokelib.testing.hooks import interface from buildscripts.resmokelib.testing.hooks import dbhash_tenant_migration @@ -167,7 +167,7 @@ class _ShardSplitOptions: def get_migration_id_as_binary(self): """Return the migration id as BSON Binary.""" - return bson.Binary(self.migration_id.bytes, 4) + return Binary(self.migration_id.bytes, UUID_SUBTYPE) def get_donor_rs(self): """Return the current donor for the split fixture.""" @@ -339,35 +339,33 @@ class _ShardSplitThread(threading.Thread): recipient_tag_name, recipient_set_name) def _create_client(self, fixture, **kwargs): - return fixture.mongo_client( - username=self._auth_options["username"], password=self._auth_options["password"], - authSource=self._auth_options["authenticationDatabase"], - authMechanism=self._auth_options["authenticationMechanism"], **kwargs) + return fixture.mongo_client(username=self._auth_options["username"], + password=self._auth_options["password"], + authSource=self._auth_options["authenticationDatabase"], + authMechanism=self._auth_options["authenticationMechanism"], + uuidRepresentation='standard', **kwargs) def _get_recipient_primary(self, split_opts, timeout_secs=None): if timeout_secs is None: timeout_secs = self._shard_split_fixture.AWAIT_REPL_TIMEOUT_MINS * 60 nodes = split_opts.get_recipient_nodes() - start = time.time() clients = {} - while True: + start = time.monotonic() + while time.monotonic() - start < timeout_secs: for node in nodes: - now = time.time() - if (now - start) >= timeout_secs: - msg = f"Timed out while waiting for a primary on replica set '{split_opts.recipient_set_name}'." - self.logger.error(msg) - raise errors.ServerFailure(msg) - - try: - if node.port not in clients: - clients[node.port] = self._create_client(node) - - client = clients[node.port] - is_master = client.admin.command("isMaster")["ismaster"] - if is_master: - return node - except pymongo.errors.ConnectionFailure: - continue + if node.port not in clients: + clients[node.port] = self._create_client(node) + + client = clients[node.port] + is_master = client.admin.command("isMaster")["ismaster"] + if is_master: + return node + + time.sleep(self.POLL_INTERVAL_SECS) + + raise errors.ServerFailure( + f"Timed out while waiting for a primary on replica set '{split_opts.recipient_set_name}'." + ) def _check_split_dbhash(self, split_opts): # Set the donor connection string, recipient connection string, and migration uuid string @@ -410,85 +408,71 @@ class _ShardSplitThread(threading.Thread): self.logger.info(f"Committing shard split '{split_opts.migration_id}' on replica set " f"'{split_opts.get_donor_name()}'.") - while True: - try: - donor_client.admin.command({ - "commitShardSplit": 1, "migrationId": split_opts.get_migration_id_as_binary(), - "tenantIds": split_opts.tenant_ids, - "recipientTagName": split_opts.recipient_tag_name, "recipientSetName": - split_opts.recipient_set_name - }, bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE)) - - self.logger.info(f"Shard split '{split_opts.migration_id}' on replica set " - f"'{split_opts.get_donor_name()}' has committed.") - return True - except pymongo.errors.OperationFailure as err: - if not self._is_fail_point_err(err): - # This is an unexpected abort, raise it for debugging. - raise + try: + with_naive_retry(lambda: donor_client.admin.command({ + "commitShardSplit": 1, "migrationId": split_opts.get_migration_id_as_binary(), + "tenantIds": split_opts.tenant_ids, "recipientTagName": + split_opts.recipient_tag_name, "recipientSetName": split_opts.recipient_set_name + })) + + self.logger.info(f"Shard split '{split_opts.migration_id}' on replica set " + f"'{split_opts.get_donor_name()}' has committed.") + return True + except OperationFailure as err: + if not self._is_fail_point_err(err): + # This is an unexpected abort, raise it for debugging. + raise - self.logger.info( - f"Shard split '{split_opts.migration_id}' on replica set " - f"'{split_opts.get_donor_name()}' has aborted due to failpoint: {str(err)}.") - return False - except pymongo.errors.ConnectionFailure: - self.logger.info( - f"Retrying shard split '{split_opts.migration_id}' against replica set " - f"'{split_opts.get_donor_name()}'.") + self.logger.info( + f"Shard split '{split_opts.migration_id}' on replica set " + f"'{split_opts.get_donor_name()}' has aborted due to failpoint: {str(err)}.") + return False def _forget_shard_split(self, donor_client, split_opts): self.logger.info(f"Forgetting shard split '{split_opts.migration_id}' on replica set " f"'{split_opts.get_donor_name()}'.") - while True: - try: - donor_client.admin.command( - {"forgetShardSplit": 1, "migrationId": split_opts.get_migration_id_as_binary()}, - bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE)) - return - except pymongo.errors.ConnectionFailure: - self.logger.info( - f"Retrying forget shard split '{split_opts.migration_id}' against replica " - f"set '{split_opts.get_donor_name()}'.") - continue - except pymongo.errors.OperationFailure as err: - if err.code != self.NO_SUCH_MIGRATION_ERR_CODE: - raise - - self.logger.info(f"Could not find shard split '{split_opts.migration_id}' on " - f"replica set '{split_opts.get_donor_name()}': {str(err)}.") - return - except pymongo.errors.PyMongoError: - self.logger.exception( - f"Error forgetting shard split '{split_opts.migration_id}' on " - f"replica set '{split_opts.get_donor_name()}'.") + try: + with_naive_retry(lambda: donor_client.admin.command( + {"forgetShardSplit": 1, "migrationId": split_opts.get_migration_id_as_binary()})) + return + except OperationFailure as err: + if err.code != self.NO_SUCH_MIGRATION_ERR_CODE: raise + self.logger.info(f"Could not find shard split '{split_opts.migration_id}' on " + f"replica set '{split_opts.get_donor_name()}': {str(err)}.") + except PyMongoError: + self.logger.exception(f"Error forgetting shard split '{split_opts.migration_id}' on " + f"replica set '{split_opts.get_donor_name()}'.") + raise + def _wait_for_garbage_collection(self, split_opts, is_committed): # noqa: D205,D400 + timeout_secs = self._shard_split_fixture.AWAIT_REPL_TIMEOUT_MINS * 60 + + def wait_for_gc_on_node(node, rs_name): + self.logger.info( + f"Waiting for shard split '{split_opts.migration_id}' to be garbage collected " + f"on donor node on port {node.port} of replica set " + f"'{rs_name}'.") + + node_client = self._create_client(node) + + start = time.monotonic() + while time.monotonic() - start < timeout_secs: + res = with_naive_retry(lambda: node_client.config.command( + {"count": "shardSplitDonors", "query": {"tenantIds": split_opts.tenant_ids}})) + if res["n"] == 0: + return + time.sleep(self.POLL_INTERVAL_SECS) + + raise errors.ServerFailure( + f"Timed out while waiting for garbage collection for node on port {node.port}.") + try: donor_nodes = split_opts.get_donor_nodes() for donor_node in donor_nodes: - self.logger.info( - f"Waiting for shard split '{split_opts.migration_id}' to be garbage collected " - f"on donor node on port {donor_node.port} of replica set " - f"'{split_opts.get_donor_name()}'.") - - donor_node_client = self._create_client(donor_node) - while True: - try: - res = donor_node_client.config.command({ - "count": "shardSplitDonors", - "query": {"tenantIds": split_opts.tenant_ids} - }) - if res["n"] == 0: - break - except pymongo.errors.ConnectionFailure: - self.logger.info( - f"Retrying waiting for shard split '{split_opts.migration_id}' to be " - f"garbage collected on donor node on port {donor_node.port} of " - f"replica set '{split_opts.get_donor_name()}'.") - continue - time.sleep(self.POLL_INTERVAL_SECS) + wait_for_gc_on_node(donor_node, split_opts.get_donor_name()) # If a shard split operation is aborted then the recipient is expected to be torn down, # we should not expect the state document will be garbage collected. @@ -497,29 +481,8 @@ class _ShardSplitThread(threading.Thread): recipient_nodes = split_opts.get_recipient_nodes() for recipient_node in recipient_nodes: - self.logger.info( - f"Waiting for shard split '{split_opts.migration_id}' to be garbage collected " - f"on recipient node on port {recipient_node.port} of replica set " - f"'{split_opts.recipient_set_name}'.") - - recipient_node_client = self._create_client(recipient_node) - while True: - try: - res = recipient_node_client.config.command({ - "count": "shardSplitDonors", - "query": {"tenantIds": split_opts.tenant_ids} - }) - if res["n"] == 0: - break - except pymongo.errors.ConnectionFailure: - self.logger.info( - f"Retrying waiting for shard split '{split_opts.migration_id}' to be " - f"garbage collected on recipient node on port {recipient_node.port} of " - f"replica set '{split_opts.recipient_set_name}'.") - continue - time.sleep(self.POLL_INTERVAL_SECS) - - except pymongo.errors.PyMongoError: + wait_for_gc_on_node(recipient_node, split_opts.recipient_set_name) + except PyMongoError: self.logger.exception( f"Error waiting for shard split '{split_opts.migration_id}' from donor replica set " f"'{split_opts.get_donor_name()} to recipient replica set " @@ -527,13 +490,12 @@ class _ShardSplitThread(threading.Thread): raise def _wait_for_reroute_or_test_completion(self, donor_client, split_opts): - start_time = time.time() - self.logger.info( f"Waiting for shard split '{split_opts.migration_id}' on replica set " f"'{split_opts.get_donor_name()}' to reroute at least one conflicting command. " f"Stop waiting when the test finishes.") + start_time = time.monotonic() while not self.__lifecycle.is_test_finished(): try: # We are reusing the infrastructure originally developed for tenant migrations, @@ -543,13 +505,8 @@ class _ShardSplitThread(threading.Thread): {"_id": split_opts.get_migration_id_as_binary()}) if doc is not None: return - except pymongo.errors.ConnectionFailure: - self.logger.info( - f"Retrying waiting for shard split '{split_opts.migration_id}' on replica set " - f"'{split_opts.get_donor_name()}' to reroute at least one conflicting command.") - continue - except pymongo.errors.PyMongoError: - end_time = time.time() + except PyMongoError: + end_time = time.monotonic() self.logger.exception( f"Error running find command on replica set '{split_opts.get_donor_name()}' " f"after waiting for reroute for {(end_time - start_time) * 1000} ms") diff --git a/buildscripts/resmokelib/testing/hooks/tenant_migration.py b/buildscripts/resmokelib/testing/hooks/tenant_migration.py index 19d527418dc..6b805fa137e 100644 --- a/buildscripts/resmokelib/testing/hooks/tenant_migration.py +++ b/buildscripts/resmokelib/testing/hooks/tenant_migration.py @@ -7,12 +7,12 @@ import threading import time import uuid -import bson -import pymongo.errors +from bson.binary import Binary, UUID_SUBTYPE +from pymongo.errors import OperationFailure, PyMongoError from buildscripts.resmokelib import errors -from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface from buildscripts.resmokelib.testing.fixtures import tenant_migration +from buildscripts.resmokelib.testing.fixtures.fixturelib import with_naive_retry from buildscripts.resmokelib.testing.hooks import dbhash_tenant_migration from buildscripts.resmokelib.testing.hooks import interface @@ -371,8 +371,12 @@ class _TenantMigrationThread(threading.Thread): return _TenantMigrationOptions(donor_rs, recipient_rs, self._tenant_id, read_preference, self.logger) - def _create_client(self, node): - return fixture_interface.build_client(node, self._auth_options) + def _create_client(self, fixture, **kwargs): + return fixture.mongo_client(username=self._auth_options["username"], + password=self._auth_options["password"], + authSource=self._auth_options["authenticationDatabase"], + authMechanism=self._auth_options["authenticationMechanism"], + uuidRepresentation='standard', **kwargs) def _check_tenant_migration_dbhash(self, migration_opts): # Set the donor connection string, recipient connection string, and migration uuid string @@ -399,8 +403,10 @@ class _TenantMigrationThread(threading.Thread): """ try: # Clean up any orphaned tenant databases on the recipient allow next migration to start. - self._drop_tenant_databases(migration_opts.recipient_rs) - res = self._start_and_wait_for_migration(migration_opts) + self._drop_tenant_databases_on_recipient(migration_opts) + + donor_client = self._create_client(migration_opts.donor_rs) + res = self._start_and_wait_for_migration(donor_client, migration_opts) is_committed = res["state"] == "committed" # Garbage collect the migration prior to throwing error to avoid migration conflict @@ -412,8 +418,8 @@ class _TenantMigrationThread(threading.Thread): # If the migration committed, to avoid routing commands incorrectly, wait for the # donor/proxy to reroute at least one command before doing garbage collection. Stop # waiting when the test finishes. - self._wait_for_reroute_or_test_completion(migration_opts) - self._forget_migration(migration_opts) + self._wait_for_reroute_or_test_completion(donor_client, migration_opts) + self._forget_migration(donor_client, migration_opts) self._wait_for_migration_garbage_collection(migration_opts) if not res["ok"]: @@ -435,177 +441,113 @@ class _TenantMigrationThread(threading.Thread): "' with donor replica set '" + migration_opts.get_donor_name() + "' aborted due to an error: " + str(res)) - except pymongo.errors.PyMongoError: + except PyMongoError: self.logger.exception( "Error running tenant migration '%s' with donor primary on replica set '%s'.", migration_opts.migration_id, migration_opts.get_donor_name()) raise - def _start_and_wait_for_migration(self, migration_opts): # noqa: D205,D400 + def _start_and_wait_for_migration(self, donor_client, migration_opts): # noqa: D205,D400 """Run donorStartMigration to start a tenant migration based on 'migration_opts', wait for the migration decision and return the last response for donorStartMigration. """ - cmd_obj = { - "donorStartMigration": - 1, - "migrationId": - bson.Binary(migration_opts.migration_id.bytes, 4), - "recipientConnectionString": - migration_opts.recipient_rs.get_driver_connection_url(), - "tenantId": - migration_opts.tenant_id, - "readPreference": - migration_opts.read_preference, - "donorCertificateForRecipient": - get_certificate_and_private_key("jstests/libs/tenant_migration_donor.pem"), - "recipientCertificateForDonor": - get_certificate_and_private_key("jstests/libs/tenant_migration_recipient.pem"), - } - donor_primary = migration_opts.get_donor_primary() self.logger.info( - "Starting tenant migration '%s' on donor primary on port %d of replica set '%s'.", - migration_opts.migration_id, donor_primary.port, migration_opts.get_donor_name()) + f"Starting tenant migration '{migration_opts.migration_id}' on replica set " + f"'{migration_opts.get_donor_name()}'.") while True: - try: - # Keep polling the migration state until the migration completes. - donor_primary_client = self._create_client(donor_primary) - res = donor_primary_client.admin.command( - cmd_obj, - bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE)) - except (pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError): - donor_primary = migration_opts.get_donor_primary() - self.logger.info( - "Retrying tenant migration '%s' against donor primary on port %d of replica " + - "set '%s'.", migration_opts.migration_id, donor_primary.port, - migration_opts.get_donor_name()) - continue + # Keep polling the migration state until the migration completes. + res = with_naive_retry(lambda: donor_client.admin.command({ + "donorStartMigration": + 1, + "migrationId": + Binary(migration_opts.migration_id.bytes, UUID_SUBTYPE), + "recipientConnectionString": + migration_opts.recipient_rs.get_driver_connection_url(), + "tenantId": + migration_opts.tenant_id, + "readPreference": + migration_opts.read_preference, + "donorCertificateForRecipient": + get_certificate_and_private_key("jstests/libs/tenant_migration_donor.pem"), + "recipientCertificateForDonor": + get_certificate_and_private_key("jstests/libs/tenant_migration_recipient.pem"), + })) + if res["state"] == "committed": - self.logger.info( - "Tenant migration '%s' with donor primary on port %d of replica set '%s' has " + - "committed.", migration_opts.migration_id, donor_primary.port, - migration_opts.get_donor_name()) + self.logger.info(f"Tenant migration '{migration_opts.migration_id}' on replica set " + f"'{migration_opts.get_donor_name()}' has committed.") return res if res["state"] == "aborted": - self.logger.info( - "Tenant migration '%s' with donor primary on port %d of replica set '%s' has " + - "aborted: %s.", migration_opts.migration_id, donor_primary.port, - migration_opts.get_donor_name(), str(res)) + self.logger.info(f"Tenant migration '{migration_opts.migration_id}' on replica set " + f"'{migration_opts.get_donor_name()}' has aborted: '{str(res)}'.") return res if not res["ok"]: - self.logger.info( - "Tenant migration '%s' with donor primary on port %d of replica set '%s' has " + - "failed: %s.", migration_opts.migration_id, donor_primary.port, - migration_opts.get_donor_name(), str(res)) + self.logger.info(f"Tenant migration '{migration_opts.migration_id}' on replica set " + f"'{migration_opts.get_donor_name()}' has failed: '{str(res)}'.") return res + time.sleep(self.POLL_INTERVAL_SECS) - def _forget_migration(self, migration_opts): + def _forget_migration(self, donor_client, migration_opts): """Run donorForgetMigration to garbage collection the tenant migration denoted by migration_opts'.""" - self.logger.info("Forgetting tenant migration: %s.", str(migration_opts)) - - cmd_obj = { - "donorForgetMigration": 1, "migrationId": bson.Binary(migration_opts.migration_id.bytes, - 4) - } - donor_primary = migration_opts.get_donor_primary() - self.logger.info( - "Forgetting tenant migration '%s' on donor primary on port %d of replica set '%s'.", - migration_opts.migration_id, donor_primary.port, migration_opts.get_donor_name()) + f"Forgetting tenant migration '{migration_opts.migration_id}' on replica set " + f"'{migration_opts.get_donor_name()}'.") - while True: - try: - donor_primary_client = self._create_client(donor_primary) - donor_primary_client.admin.command( - cmd_obj, - bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE)) - return - except (pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError): - donor_primary = migration_opts.get_donor_primary() - self.logger.info( - "Retrying forgetting tenant migration '%s' against donor primary on port %d of " - + "replica set '%s'.", migration_opts.migration_id, donor_primary.port, - migration_opts.get_donor_name()) - continue - except pymongo.errors.OperationFailure as err: - if err.code != self.NO_SUCH_MIGRATION_ERR_CODE: - raise - # The fixture was restarted. - self.logger.info( - "Could not find tenant migration '%s' on donor primary on" + - " port %d of replica set '%s': %s.", migration_opts.migration_id, - donor_primary.port, migration_opts.get_donor_name(), str(err)) - return - except pymongo.errors.PyMongoError: - self.logger.exception( - "Error forgetting tenant migration '%s' on donor primary on" + - " port %d of replica set '%s'.", migration_opts.migration_id, - donor_primary.port, migration_opts.get_donor_name()) + try: + with_naive_retry(lambda: donor_client.admin.command({ + "donorForgetMigration": 1, "migrationId": Binary(migration_opts.migration_id.bytes, + UUID_SUBTYPE) + })) + except OperationFailure as err: + if err.code != self.NO_SUCH_MIGRATION_ERR_CODE: raise + self.logger.info(f"Could not find tenant migration '{migration_opts.migration_id}' on " + f"replica set '{migration_opts.get_donor_name()}': {str(err)}.") + except PyMongoError: + self.logger.exception( + f"Error forgetting tenant migration '{migration_opts.migration_id}' on " + f"replica set '{migration_opts.get_donor_name()}'.") + raise + def _wait_for_migration_garbage_collection(self, migration_opts): # noqa: D205,D400 """Wait until the persisted state for tenant migration denoted by 'migration_opts' has been garbage collected on both the donor and recipient. """ + timeout_secs = self._tenant_migration_fixture.AWAIT_REPL_TIMEOUT_MINS * 60 + + def wait_for_gc_on_node(node, rs_name, collection_name): + self.logger.info( + "Waiting for tenant migration '%s' to be garbage collected on donor node on " + + "port %d of replica set '%s'.", migration_opts.migration_id, node.port, rs_name) + + node_client = self._create_client(node) + + start = time.monotonic() + while time.monotonic() - start < timeout_secs: + res = with_naive_retry(lambda: node_client.config.command( + {"count": collection_name, "query": {"tenantId": migration_opts.tenant_id}})) + if res["n"] == 0: + return + time.sleep(self.POLL_INTERVAL_SECS) + + raise errors.ServerFailure( + f"Timed out while waiting for garbage collection of node on port {node.port}.") + try: donor_nodes = migration_opts.get_donor_nodes() for donor_node in donor_nodes: - self.logger.info( - "Waiting for tenant migration '%s' to be garbage collected on donor node on " + - "port %d of replica set '%s'.", migration_opts.migration_id, donor_node.port, - migration_opts.get_donor_name()) - - while True: - try: - donor_node_client = self._create_client(donor_node) - res = donor_node_client.config.command({ - "count": "tenantMigrationDonors", - "query": {"tenantId": migration_opts.tenant_id} - }) - if res["n"] == 0: - break - except (pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError): - # Ignore NotPrimaryErrors because it's possible to fail with - # InterruptedDueToReplStateChange if the donor primary steps down or shuts - # down during the garbage collection check. - self.logger.info( - "Retrying waiting for tenant migration '%s' to be garbage collected on" - + " donor node on port %d of replica set '%s'.", - migration_opts.migration_id, donor_node.port, - migration_opts.get_donor_name()) - continue - time.sleep(self.POLL_INTERVAL_SECS) + wait_for_gc_on_node(donor_node, migration_opts.get_donor_name(), + "tenantMigrationDonors") recipient_nodes = migration_opts.get_recipient_nodes() for recipient_node in recipient_nodes: - self.logger.info( - "Waiting for tenant migration '%s' to be garbage collected on recipient node on" - + " port %d of replica set '%s'.", migration_opts.migration_id, - recipient_node.port, migration_opts.get_recipient_name()) - - while True: - try: - recipient_node_client = self._create_client(recipient_node) - res = recipient_node_client.config.command({ - "count": "tenantMigrationRecipients", - "query": {"tenantId": migration_opts.tenant_id} - }) - if res["n"] == 0: - break - except (pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError): - # Ignore NotPrimaryErrors because it's possible to fail with - # InterruptedDueToReplStateChange if the recipient primary steps down or - # shuts down during the garbage collection check. - self.logger.info( - "Retrying waiting for tenant migration '%s' to be garbage collected on" - + " recipient node on port %d of replica set '%s'.", - migration_opts.migration_id, recipient_node.port, - migration_opts.get_recipient_name()) - continue - time.sleep(self.POLL_INTERVAL_SECS) - except pymongo.errors.PyMongoError: + wait_for_gc_on_node(recipient_node, migration_opts.get_recipient_name(), + "tenantMigrationRecipients") + except PyMongoError: self.logger.exception( "Error waiting for tenant migration '%s' from donor replica set '%s" + " to recipient replica set '%s' to be garbage collected.", @@ -613,67 +555,44 @@ class _TenantMigrationThread(threading.Thread): migration_opts.get_recipient_name()) raise - def _wait_for_reroute_or_test_completion(self, migration_opts): - start_time = time.time() - donor_primary = migration_opts.get_donor_primary() - + def _wait_for_reroute_or_test_completion(self, donor_client, migration_opts): self.logger.info( - "Waiting for donor primary on port %d of replica set '%s' for tenant migration '%s' " + - "to reroute at least one conflicting command. Stop waiting when the test finishes.", - donor_primary.port, migration_opts.get_donor_name(), migration_opts.migration_id) + f"Waiting for tenant migration '{migration_opts.migration_id}' on replica set " + f"'{migration_opts.get_donor_name()}' to reroute at least one conflicting command. " + f"Stop waiting when the test finishes.") + start_time = time.time() while not self.__lifecycle.is_test_finished(): try: - donor_primary_client = self._create_client(donor_primary) - doc = donor_primary_client["testTenantMigration"]["rerouted"].find_one( - {"_id": bson.Binary(migration_opts.migration_id.bytes, 4)}) + doc = donor_client["testTenantMigration"]["rerouted"].find_one( + {"_id": Binary(migration_opts.migration_id.bytes, UUID_SUBTYPE)}) if doc is not None: return - except (pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError): - donor_primary = migration_opts.get_donor_primary() - self.logger.info( - "Retrying waiting for donor primary on port '%d' of replica set '%s' for " + - "tenant migration '%s' to reroute at least one conflicting command.", - donor_primary.port, migration_opts.get_donor_name(), - migration_opts.migration_id) - continue - except pymongo.errors.PyMongoError: + except PyMongoError: end_time = time.time() self.logger.exception( - "Error running find command on donor primary on port %d of replica set '%s' " + - "after waiting for reroute for %0d ms", donor_primary.port, - migration_opts.get_donor_name(), (end_time - start_time) * 1000) + f"Error running find command on replica set '{migration_opts.get_donor_name()}' " + f"after waiting for reroute for {(end_time - start_time) * 1000} ms") raise time.sleep(self.POLL_INTERVAL_SECS) - def _drop_tenant_databases(self, rs): - self.logger.info("Dropping tenant databases from replica set '%s'.", rs.replset_name) - - primary = get_primary(rs, self.logger) - self.logger.info("Running dropDatabase commands against primary on port %d.", primary.port) + def _drop_tenant_databases_on_recipient(self, migration_opts): + self.logger.info( + f"Dropping tenant databases from replica set '{migration_opts.get_recipient_name()}'.") + recipient_client = self._create_client(migration_opts.recipient_rs) - while True: - try: - primary_client = self._create_client(primary) - res = primary_client.admin.command({"listDatabases": 1}) - for database in res["databases"]: - db_name = database["name"] - if db_name.startswith(self._tenant_id + "_"): - primary_client.drop_database(db_name) - return - # We retry on all write concern errors because we assume the only reason waiting for - # write concern should fail is because of a failover. - except (pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError, - pymongo.errors.WriteConcernError) as err: - primary = get_primary(rs, self.logger) - self.logger.info( - "Retrying dropDatabase commands against primary on port %d after error %s.", - primary.port, str(err)) - continue - except pymongo.errors.PyMongoError: - self.logger.exception( - "Error dropping databases for tenant id '%s' on primary on" + - " port %d of replica set '%s' to be garbage collection.", self._tenant_id, - primary.port, rs.replset_name) - raise + try: + self.logger.info( + f"Running dropDatabase commands against replica set '{migration_opts.get_recipient_name()}'" + ) + res = with_naive_retry(lambda: recipient_client.admin.command({"listDatabases": 1})) + for database in res["databases"]: + db_name = database["name"] + if db_name.startswith(self._tenant_id + "_"): + recipient_client.drop_database(db_name) + except PyMongoError as err: + self.logger.exception( + f"Error dropping databases for tenant '{self._tenant_id}' on replica set '{migration_opts.get_recipient_name()}': '{str(err)}'." + ) + raise diff --git a/buildscripts/resmokelib/testing/retry.py b/buildscripts/resmokelib/testing/retry.py new file mode 100644 index 00000000000..3b5287c4b61 --- /dev/null +++ b/buildscripts/resmokelib/testing/retry.py @@ -0,0 +1,58 @@ +import time +from pymongo.errors import PyMongoError, ConnectionFailure, OperationFailure, ExecutionTimeout + +# TODO(DRIVERS-1401): Use error labels instead of checking against an allow list of error codes. +retryable_codes = [ + # From the SDAM spec, the "node is shutting down" codes. + 11600, # InterruptedAtShutdown + 91, # ShutdownInProgress + # From the SDAM spec, the "not primary" and "node is recovering" error codes. + 10058, # LegacyNotPrimary <=3.2 "not primary" error code + 10107, # NotWritablePrimary + 13435, # NotPrimaryNoSecondaryOk + 11602, # InterruptedDueToReplStateChange + 13436, # NotPrimaryOrSecondary + 189, # PrimarySteppedDown + # From the retryable reads/writes spec. + 7, # HostNotFound + 6, # HostUnreachable + 89, # NetworkTimeout + 9001, # SocketException + 262, # ExceededTimeLimit +] + + +def is_retryable_error(exc): + if isinstance(exc, ConnectionFailure): + return True + if exc.has_error_label("RetryableWriteError"): + return True + if isinstance(exc, OperationFailure) and exc.code in retryable_codes: + return True + return False + + +def with_naive_retry(func, timeout=100): + """ + Retry execution of a provided function naively for up to `timeout` seconds. + + This method is only suitable for reads or other idempotent operations. It is not suitable for + retrying non-idempotent operations (most writes). + + :param func: The function to execute + :param timeout: The maximum amount of time to retry execution + """ + + last_exc = None + start = time.monotonic() + while time.monotonic() - start < timeout: + try: + return func() + except PyMongoError as exc: + last_exc = exc + if not is_retryable_error(exc): + raise + time.sleep(0.1) + + raise ExecutionTimeout( + f"Operation exceeded time limit after {timeout} seconds, last error: {last_exc}") |