summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib
diff options
context:
space:
mode:
authorMatt Broadstone <mbroadst@mongodb.com>2023-04-06 19:15:58 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-06 23:48:17 +0000
commit579a14858a79058e9ed6adeac98274b3423a4d4f (patch)
tree451ba121c44c6d891ad438b6cf9db14cf09acd84 /buildscripts/resmokelib
parent5758aa24889d38935af4ce54444ec18910fdfa45 (diff)
downloadmongo-579a14858a79058e9ed6adeac98274b3423a4d4f.tar.gz
SERVER-73129 Use driver retryability in tenant migration hook
Diffstat (limited to 'buildscripts/resmokelib')
-rw-r--r--buildscripts/resmokelib/testing/fixtures/fixturelib.py1
-rw-r--r--buildscripts/resmokelib/testing/fixtures/shard_split.py166
-rw-r--r--buildscripts/resmokelib/testing/fixtures/tenant_migration.py22
-rw-r--r--buildscripts/resmokelib/testing/hooks/shard_split.py207
-rw-r--r--buildscripts/resmokelib/testing/hooks/tenant_migration.py311
-rw-r--r--buildscripts/resmokelib/testing/retry.py58
6 files changed, 351 insertions, 414 deletions
diff --git a/buildscripts/resmokelib/testing/fixtures/fixturelib.py b/buildscripts/resmokelib/testing/fixtures/fixturelib.py
index e2ff39d983c..ba08735fe42 100644
--- a/buildscripts/resmokelib/testing/fixtures/fixturelib.py
+++ b/buildscripts/resmokelib/testing/fixtures/fixturelib.py
@@ -10,6 +10,7 @@ from buildscripts.resmokelib.core import network
from buildscripts.resmokelib.utils.dictionary import merge_dicts
from buildscripts.resmokelib.utils.history import make_historic as _make_historic
from buildscripts.resmokelib.testing.fixtures import _builder
+from buildscripts.resmokelib.testing.retry import with_naive_retry
class FixtureLib:
diff --git a/buildscripts/resmokelib/testing/fixtures/shard_split.py b/buildscripts/resmokelib/testing/fixtures/shard_split.py
index 6a19db5d8f1..5be3e390efa 100644
--- a/buildscripts/resmokelib/testing/fixtures/shard_split.py
+++ b/buildscripts/resmokelib/testing/fixtures/shard_split.py
@@ -9,6 +9,7 @@ import pymongo
from bson.objectid import ObjectId
import buildscripts.resmokelib.testing.fixtures.interface as interface
+from buildscripts.resmokelib.testing.fixtures.fixturelib import with_naive_retry
def _is_replica_set_fixture(fixture):
@@ -200,10 +201,11 @@ class ShardSplitFixture(interface.MultiClusterFixture):
return self.fixtures[1:]
def _create_client(self, fixture, **kwargs):
- return fixture.mongo_client(
- username=self.auth_options["username"], password=self.auth_options["password"],
- authSource=self.auth_options["authenticationDatabase"],
- authMechanism=self.auth_options["authenticationMechanism"], **kwargs)
+ return fixture.mongo_client(username=self.auth_options["username"],
+ password=self.auth_options["password"],
+ authSource=self.auth_options["authenticationDatabase"],
+ authMechanism=self.auth_options["authenticationMechanism"],
+ uuidRepresentation='standard', **kwargs)
def add_recipient_nodes(self, recipient_set_name, recipient_tag_name=None):
"""Build recipient nodes, and reconfig them into the donor as non-voting members."""
@@ -245,63 +247,64 @@ class ShardSplitFixture(interface.MultiClusterFixture):
# Reconfig the donor to add the recipient nodes as non-voting members
donor_client = self._create_client(self.get_donor_rs())
- while True:
- try:
- repl_config = donor_client.admin.command({"replSetGetConfig": 1})["config"]
- repl_members = repl_config["members"]
-
- for recipient_node in recipient_nodes:
- # It is possible for the reconfig below to fail with a retryable error code like
- # 'InterruptedDueToReplStateChange'. In these cases, we need to run the reconfig
- # again, but some or all of the recipient nodes might have already been added to
- # the member list. Only add recipient nodes which have not yet been added on a
- # retry.
- recipient_host = recipient_node.get_internal_connection_string()
- recipient_entry = {
- "host": recipient_host, "votes": 0, "priority": 0, "hidden": True,
- "tags": {recipient_tag_name: str(ObjectId())}
- }
- member_exists = False
- for index, member in enumerate(repl_members):
- if member["host"] == recipient_host:
- repl_members[index] = recipient_entry
- member_exists = True
-
- if not member_exists:
- repl_members.append(recipient_entry)
-
- # Re-index all members from 0
- for idx, member in enumerate(repl_members):
- member["_id"] = idx
-
- # Prepare the new config
- repl_config["version"] = repl_config["version"] + 1
- repl_config["members"] = repl_members
+ repl_config = with_naive_retry(lambda: donor_client.admin.command({"replSetGetConfig": 1})[
+ "config"])
+ repl_members = repl_config["members"]
- self.logger.info(
- f"Reconfiguring donor replica set to add non-voting recipient nodes: {repl_config}"
- )
- donor_client.admin.command({
- "replSetReconfig": repl_config,
- "maxTimeMS": self.AWAIT_REPL_TIMEOUT_MINS * 60 * 1000
- })
-
- # Wait for recipient nodes to become secondaries
- self._await_recipient_nodes()
- return
- except pymongo.errors.ConnectionFailure as err:
- self.logger.info(
- f"Retrying adding recipient nodes on replica set '{donor_rs_name}' after error: {str(err)}."
- )
- continue
+ for recipient_node in recipient_nodes:
+ # It is possible for the reconfig below to fail with a retryable error code like
+ # 'InterruptedDueToReplStateChange'. In these cases, we need to run the reconfig
+ # again, but some or all of the recipient nodes might have already been added to
+ # the member list. Only add recipient nodes which have not yet been added on a
+ # retry.
+ recipient_host = recipient_node.get_internal_connection_string()
+ recipient_entry = {
+ "host": recipient_host, "votes": 0, "priority": 0, "hidden": True,
+ "tags": {recipient_tag_name: str(ObjectId())}
+ }
+ member_exists = False
+ for index, member in enumerate(repl_members):
+ if member["host"] == recipient_host:
+ repl_members[index] = recipient_entry
+ member_exists = True
+
+ if not member_exists:
+ repl_members.append(recipient_entry)
+
+ # Re-index all members from 0
+ for idx, member in enumerate(repl_members):
+ member["_id"] = idx
+
+ # Prepare the new config
+ repl_config["version"] = repl_config["version"] + 1
+ repl_config["members"] = repl_members
- def _await_recipient_nodes(self):
+ self.logger.info(
+ f"Reconfiguring donor replica set to add non-voting recipient nodes: {repl_config}")
+ with_naive_retry(lambda: donor_client.admin.command({
+ "replSetReconfig": repl_config, "maxTimeMS": self.AWAIT_REPL_TIMEOUT_MINS * 60 * 1000
+ }))
+
+ # Wait for recipient nodes to become secondaries
+ self._await_recipient_nodes()
+
+ def _await_recipient_nodes(self, timeout_secs=None):
"""Wait for recipient nodes to become available."""
+ if timeout_secs is None:
+ timeout_secs = self.AWAIT_REPL_TIMEOUT_MINS * 60
+
+ start = time.time()
recipient_nodes = self.get_recipient_nodes()
for recipient_node in recipient_nodes:
recipient_client = self._create_client(recipient_node,
read_preference=pymongo.ReadPreference.SECONDARY)
while True:
+ now = time.time()
+ if (now - start) >= timeout_secs:
+ msg = f"Timed out while waiting for secondary on port {recipient_node.port} to become available."
+ self.logger.error(msg)
+ raise self.fixturelib.ServerFailure(msg)
+
self.logger.info(
f"Waiting for secondary on port {recipient_node.port} to become available.")
try:
@@ -311,8 +314,6 @@ class ShardSplitFixture(interface.MultiClusterFixture):
except pymongo.errors.OperationFailure as err:
if err.code != ShardSplitFixture._INTERRUPTED_DUE_TO_STORAGE_CHANGE:
raise
- except pymongo.errors.ConnectionFailure:
- pass
time.sleep(0.1) # Wait a little bit before trying again.
self.logger.info(f"Secondary on port {recipient_node.port} is now available.")
@@ -333,39 +334,30 @@ class ShardSplitFixture(interface.MultiClusterFixture):
self.fixtures = [donor_rs]
donor_client = self._create_client(self.get_donor_rs())
- while True:
- try:
- repl_config = donor_client.admin.command({"replSetGetConfig": 1})["config"]
- repl_members = [
- member for member in repl_config["members"]
- if not 'tags' in member or not recipient_tag_name in member["tags"]
- ]
-
- # Re-index all members from 0
- for idx, member in enumerate(repl_members):
- member["_id"] = idx
-
- # Prepare the new config
- repl_config["version"] = repl_config["version"] + 1
- repl_config["members"] = repl_members
-
- # It's possible that the recipient config has been removed in a previous remove attempt.
- if "recipientConfig" in repl_config:
- del repl_config["recipientConfig"]
+ repl_config = with_naive_retry(lambda: donor_client.admin.command({"replSetGetConfig": 1})[
+ "config"])
+ repl_members = [
+ member for member in repl_config["members"]
+ if not 'tags' in member or not recipient_tag_name in member["tags"]
+ ]
- self.logger.info(
- f"Reconfiguring donor '{donor_rs_name}' to remove recipient nodes: {repl_config}"
- )
- donor_client.admin.command({
- "replSetReconfig": repl_config,
- "maxTimeMS": self.AWAIT_REPL_TIMEOUT_MINS * 60 * 1000
- })
- break
- except pymongo.errors.ConnectionFailure as err:
- self.logger.info(
- f"Retrying removing recipient nodes from donor '{donor_rs_name}' after error: {str(err)}."
- )
- continue
+ # Re-index all members from 0
+ for idx, member in enumerate(repl_members):
+ member["_id"] = idx
+
+ # Prepare the new config
+ repl_config["version"] = repl_config["version"] + 1
+ repl_config["members"] = repl_members
+
+ # It's possible that the recipient config has been removed in a previous remove attempt.
+ if "recipientConfig" in repl_config:
+ del repl_config["recipientConfig"]
+
+ self.logger.info(
+ f"Reconfiguring donor '{donor_rs_name}' to remove recipient nodes: {repl_config}")
+ with_naive_retry(lambda: donor_client.admin.command({
+ "replSetReconfig": repl_config, "maxTimeMS": self.AWAIT_REPL_TIMEOUT_MINS * 60 * 1000
+ }))
self.logger.info("Tearing down recipient nodes and removing data directories.")
for recipient_node in reversed(recipient_nodes):
diff --git a/buildscripts/resmokelib/testing/fixtures/tenant_migration.py b/buildscripts/resmokelib/testing/fixtures/tenant_migration.py
index 3a45424a66a..5e4ede264f9 100644
--- a/buildscripts/resmokelib/testing/fixtures/tenant_migration.py
+++ b/buildscripts/resmokelib/testing/fixtures/tenant_migration.py
@@ -3,12 +3,15 @@
import os.path
import buildscripts.resmokelib.testing.fixtures.interface as interface
-from buildscripts.resmokelib.testing.fixtures.fixturelib import FixtureLib
+from buildscripts.resmokelib.testing.fixtures.fixturelib import with_naive_retry
class TenantMigrationFixture(interface.MultiClusterFixture):
"""Fixture which provides JSTests with a set of replica sets to run tenant migration against."""
+ AWAIT_REPL_TIMEOUT_MINS = 5
+ AWAIT_REPL_TIMEOUT_FOREVER_MINS = 24 * 60
+
def __init__(self, logger, job_num, fixturelib, common_mongod_options=None,
per_mongod_options=None, dbpath_prefix=None, preserve_dbpath=False,
num_replica_sets=1, num_nodes_per_replica_set=2, start_initial_sync_node=False,
@@ -155,18 +158,25 @@ class TenantMigrationFixture(interface.MultiClusterFixture):
"""Return the replica sets involved in the tenant migration."""
return self.replica_sets.copy()
+ def _create_client(self, fixture, **kwargs):
+ return fixture.mongo_client(username=self.auth_options["username"],
+ password=self.auth_options["password"],
+ authSource=self.auth_options["authenticationDatabase"],
+ authMechanism=self.auth_options["authenticationMechanism"],
+ uuidRepresentation='standard', **kwargs)
+
def _create_tenant_migration_donor_and_recipient_roles(self, rs):
"""Create a role for tenant migration donor and recipient."""
primary = rs.get_primary()
- primary_client = interface.build_client(primary, self.auth_options)
+ primary_client = self._create_client(primary)
try:
- primary_client.admin.command({
+ with_naive_retry(lambda: primary_client.admin.command({
"createRole": "tenantMigrationDonorRole", "privileges": [{
"resource": {"cluster": True}, "actions": ["runTenantMigration"]
}, {"resource": {"db": "admin", "collection": "system.keys"}, "actions": ["find"]}],
"roles": []
- })
+ }))
except:
self.logger.exception(
"Error creating tenant migration donor role on primary on port %d of replica" +
@@ -174,7 +184,7 @@ class TenantMigrationFixture(interface.MultiClusterFixture):
raise
try:
- primary_client.admin.command({
+ with_naive_retry(lambda: primary_client.admin.command({
"createRole": "tenantMigrationRecipientRole",
"privileges": [{
"resource": {"cluster": True},
@@ -184,7 +194,7 @@ class TenantMigrationFixture(interface.MultiClusterFixture):
"resource": {"anyResource": True},
"actions": ["dbStats", "collStats", "find", "listIndexes"]
}], "roles": []
- })
+ }))
except:
self.logger.exception(
"Error creating tenant migration recipient role on primary on port %d of replica" +
diff --git a/buildscripts/resmokelib/testing/hooks/shard_split.py b/buildscripts/resmokelib/testing/hooks/shard_split.py
index 0d85e5fcdb9..876f35c6fd0 100644
--- a/buildscripts/resmokelib/testing/hooks/shard_split.py
+++ b/buildscripts/resmokelib/testing/hooks/shard_split.py
@@ -6,14 +6,14 @@ import threading
import time
import uuid
-import bson
-import pymongo.errors
-
+from bson.binary import Binary, UUID_SUBTYPE
from bson.objectid import ObjectId
+from pymongo.errors import OperationFailure, PyMongoError
from buildscripts.resmokelib import errors
from buildscripts.resmokelib.testing.fixtures import shard_split
from buildscripts.resmokelib.testing.fixtures.replicaset import ReplicaSetFixture
+from buildscripts.resmokelib.testing.fixtures.fixturelib import with_naive_retry
from buildscripts.resmokelib.testing.hooks import interface
from buildscripts.resmokelib.testing.hooks import dbhash_tenant_migration
@@ -167,7 +167,7 @@ class _ShardSplitOptions:
def get_migration_id_as_binary(self):
"""Return the migration id as BSON Binary."""
- return bson.Binary(self.migration_id.bytes, 4)
+ return Binary(self.migration_id.bytes, UUID_SUBTYPE)
def get_donor_rs(self):
"""Return the current donor for the split fixture."""
@@ -339,35 +339,33 @@ class _ShardSplitThread(threading.Thread):
recipient_tag_name, recipient_set_name)
def _create_client(self, fixture, **kwargs):
- return fixture.mongo_client(
- username=self._auth_options["username"], password=self._auth_options["password"],
- authSource=self._auth_options["authenticationDatabase"],
- authMechanism=self._auth_options["authenticationMechanism"], **kwargs)
+ return fixture.mongo_client(username=self._auth_options["username"],
+ password=self._auth_options["password"],
+ authSource=self._auth_options["authenticationDatabase"],
+ authMechanism=self._auth_options["authenticationMechanism"],
+ uuidRepresentation='standard', **kwargs)
def _get_recipient_primary(self, split_opts, timeout_secs=None):
if timeout_secs is None:
timeout_secs = self._shard_split_fixture.AWAIT_REPL_TIMEOUT_MINS * 60
nodes = split_opts.get_recipient_nodes()
- start = time.time()
clients = {}
- while True:
+ start = time.monotonic()
+ while time.monotonic() - start < timeout_secs:
for node in nodes:
- now = time.time()
- if (now - start) >= timeout_secs:
- msg = f"Timed out while waiting for a primary on replica set '{split_opts.recipient_set_name}'."
- self.logger.error(msg)
- raise errors.ServerFailure(msg)
-
- try:
- if node.port not in clients:
- clients[node.port] = self._create_client(node)
-
- client = clients[node.port]
- is_master = client.admin.command("isMaster")["ismaster"]
- if is_master:
- return node
- except pymongo.errors.ConnectionFailure:
- continue
+ if node.port not in clients:
+ clients[node.port] = self._create_client(node)
+
+ client = clients[node.port]
+ is_master = client.admin.command("isMaster")["ismaster"]
+ if is_master:
+ return node
+
+ time.sleep(self.POLL_INTERVAL_SECS)
+
+ raise errors.ServerFailure(
+ f"Timed out while waiting for a primary on replica set '{split_opts.recipient_set_name}'."
+ )
def _check_split_dbhash(self, split_opts):
# Set the donor connection string, recipient connection string, and migration uuid string
@@ -410,85 +408,71 @@ class _ShardSplitThread(threading.Thread):
self.logger.info(f"Committing shard split '{split_opts.migration_id}' on replica set "
f"'{split_opts.get_donor_name()}'.")
- while True:
- try:
- donor_client.admin.command({
- "commitShardSplit": 1, "migrationId": split_opts.get_migration_id_as_binary(),
- "tenantIds": split_opts.tenant_ids,
- "recipientTagName": split_opts.recipient_tag_name, "recipientSetName":
- split_opts.recipient_set_name
- }, bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE))
-
- self.logger.info(f"Shard split '{split_opts.migration_id}' on replica set "
- f"'{split_opts.get_donor_name()}' has committed.")
- return True
- except pymongo.errors.OperationFailure as err:
- if not self._is_fail_point_err(err):
- # This is an unexpected abort, raise it for debugging.
- raise
+ try:
+ with_naive_retry(lambda: donor_client.admin.command({
+ "commitShardSplit": 1, "migrationId": split_opts.get_migration_id_as_binary(),
+ "tenantIds": split_opts.tenant_ids, "recipientTagName":
+ split_opts.recipient_tag_name, "recipientSetName": split_opts.recipient_set_name
+ }))
+
+ self.logger.info(f"Shard split '{split_opts.migration_id}' on replica set "
+ f"'{split_opts.get_donor_name()}' has committed.")
+ return True
+ except OperationFailure as err:
+ if not self._is_fail_point_err(err):
+ # This is an unexpected abort, raise it for debugging.
+ raise
- self.logger.info(
- f"Shard split '{split_opts.migration_id}' on replica set "
- f"'{split_opts.get_donor_name()}' has aborted due to failpoint: {str(err)}.")
- return False
- except pymongo.errors.ConnectionFailure:
- self.logger.info(
- f"Retrying shard split '{split_opts.migration_id}' against replica set "
- f"'{split_opts.get_donor_name()}'.")
+ self.logger.info(
+ f"Shard split '{split_opts.migration_id}' on replica set "
+ f"'{split_opts.get_donor_name()}' has aborted due to failpoint: {str(err)}.")
+ return False
def _forget_shard_split(self, donor_client, split_opts):
self.logger.info(f"Forgetting shard split '{split_opts.migration_id}' on replica set "
f"'{split_opts.get_donor_name()}'.")
- while True:
- try:
- donor_client.admin.command(
- {"forgetShardSplit": 1, "migrationId": split_opts.get_migration_id_as_binary()},
- bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE))
- return
- except pymongo.errors.ConnectionFailure:
- self.logger.info(
- f"Retrying forget shard split '{split_opts.migration_id}' against replica "
- f"set '{split_opts.get_donor_name()}'.")
- continue
- except pymongo.errors.OperationFailure as err:
- if err.code != self.NO_SUCH_MIGRATION_ERR_CODE:
- raise
-
- self.logger.info(f"Could not find shard split '{split_opts.migration_id}' on "
- f"replica set '{split_opts.get_donor_name()}': {str(err)}.")
- return
- except pymongo.errors.PyMongoError:
- self.logger.exception(
- f"Error forgetting shard split '{split_opts.migration_id}' on "
- f"replica set '{split_opts.get_donor_name()}'.")
+ try:
+ with_naive_retry(lambda: donor_client.admin.command(
+ {"forgetShardSplit": 1, "migrationId": split_opts.get_migration_id_as_binary()}))
+ return
+ except OperationFailure as err:
+ if err.code != self.NO_SUCH_MIGRATION_ERR_CODE:
raise
+ self.logger.info(f"Could not find shard split '{split_opts.migration_id}' on "
+ f"replica set '{split_opts.get_donor_name()}': {str(err)}.")
+ except PyMongoError:
+ self.logger.exception(f"Error forgetting shard split '{split_opts.migration_id}' on "
+ f"replica set '{split_opts.get_donor_name()}'.")
+ raise
+
def _wait_for_garbage_collection(self, split_opts, is_committed): # noqa: D205,D400
+ timeout_secs = self._shard_split_fixture.AWAIT_REPL_TIMEOUT_MINS * 60
+
+ def wait_for_gc_on_node(node, rs_name):
+ self.logger.info(
+ f"Waiting for shard split '{split_opts.migration_id}' to be garbage collected "
+ f"on donor node on port {node.port} of replica set "
+ f"'{rs_name}'.")
+
+ node_client = self._create_client(node)
+
+ start = time.monotonic()
+ while time.monotonic() - start < timeout_secs:
+ res = with_naive_retry(lambda: node_client.config.command(
+ {"count": "shardSplitDonors", "query": {"tenantIds": split_opts.tenant_ids}}))
+ if res["n"] == 0:
+ return
+ time.sleep(self.POLL_INTERVAL_SECS)
+
+ raise errors.ServerFailure(
+ f"Timed out while waiting for garbage collection for node on port {node.port}.")
+
try:
donor_nodes = split_opts.get_donor_nodes()
for donor_node in donor_nodes:
- self.logger.info(
- f"Waiting for shard split '{split_opts.migration_id}' to be garbage collected "
- f"on donor node on port {donor_node.port} of replica set "
- f"'{split_opts.get_donor_name()}'.")
-
- donor_node_client = self._create_client(donor_node)
- while True:
- try:
- res = donor_node_client.config.command({
- "count": "shardSplitDonors",
- "query": {"tenantIds": split_opts.tenant_ids}
- })
- if res["n"] == 0:
- break
- except pymongo.errors.ConnectionFailure:
- self.logger.info(
- f"Retrying waiting for shard split '{split_opts.migration_id}' to be "
- f"garbage collected on donor node on port {donor_node.port} of "
- f"replica set '{split_opts.get_donor_name()}'.")
- continue
- time.sleep(self.POLL_INTERVAL_SECS)
+ wait_for_gc_on_node(donor_node, split_opts.get_donor_name())
# If a shard split operation is aborted then the recipient is expected to be torn down,
# we should not expect the state document will be garbage collected.
@@ -497,29 +481,8 @@ class _ShardSplitThread(threading.Thread):
recipient_nodes = split_opts.get_recipient_nodes()
for recipient_node in recipient_nodes:
- self.logger.info(
- f"Waiting for shard split '{split_opts.migration_id}' to be garbage collected "
- f"on recipient node on port {recipient_node.port} of replica set "
- f"'{split_opts.recipient_set_name}'.")
-
- recipient_node_client = self._create_client(recipient_node)
- while True:
- try:
- res = recipient_node_client.config.command({
- "count": "shardSplitDonors",
- "query": {"tenantIds": split_opts.tenant_ids}
- })
- if res["n"] == 0:
- break
- except pymongo.errors.ConnectionFailure:
- self.logger.info(
- f"Retrying waiting for shard split '{split_opts.migration_id}' to be "
- f"garbage collected on recipient node on port {recipient_node.port} of "
- f"replica set '{split_opts.recipient_set_name}'.")
- continue
- time.sleep(self.POLL_INTERVAL_SECS)
-
- except pymongo.errors.PyMongoError:
+ wait_for_gc_on_node(recipient_node, split_opts.recipient_set_name)
+ except PyMongoError:
self.logger.exception(
f"Error waiting for shard split '{split_opts.migration_id}' from donor replica set "
f"'{split_opts.get_donor_name()} to recipient replica set "
@@ -527,13 +490,12 @@ class _ShardSplitThread(threading.Thread):
raise
def _wait_for_reroute_or_test_completion(self, donor_client, split_opts):
- start_time = time.time()
-
self.logger.info(
f"Waiting for shard split '{split_opts.migration_id}' on replica set "
f"'{split_opts.get_donor_name()}' to reroute at least one conflicting command. "
f"Stop waiting when the test finishes.")
+ start_time = time.monotonic()
while not self.__lifecycle.is_test_finished():
try:
# We are reusing the infrastructure originally developed for tenant migrations,
@@ -543,13 +505,8 @@ class _ShardSplitThread(threading.Thread):
{"_id": split_opts.get_migration_id_as_binary()})
if doc is not None:
return
- except pymongo.errors.ConnectionFailure:
- self.logger.info(
- f"Retrying waiting for shard split '{split_opts.migration_id}' on replica set "
- f"'{split_opts.get_donor_name()}' to reroute at least one conflicting command.")
- continue
- except pymongo.errors.PyMongoError:
- end_time = time.time()
+ except PyMongoError:
+ end_time = time.monotonic()
self.logger.exception(
f"Error running find command on replica set '{split_opts.get_donor_name()}' "
f"after waiting for reroute for {(end_time - start_time) * 1000} ms")
diff --git a/buildscripts/resmokelib/testing/hooks/tenant_migration.py b/buildscripts/resmokelib/testing/hooks/tenant_migration.py
index 19d527418dc..6b805fa137e 100644
--- a/buildscripts/resmokelib/testing/hooks/tenant_migration.py
+++ b/buildscripts/resmokelib/testing/hooks/tenant_migration.py
@@ -7,12 +7,12 @@ import threading
import time
import uuid
-import bson
-import pymongo.errors
+from bson.binary import Binary, UUID_SUBTYPE
+from pymongo.errors import OperationFailure, PyMongoError
from buildscripts.resmokelib import errors
-from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
from buildscripts.resmokelib.testing.fixtures import tenant_migration
+from buildscripts.resmokelib.testing.fixtures.fixturelib import with_naive_retry
from buildscripts.resmokelib.testing.hooks import dbhash_tenant_migration
from buildscripts.resmokelib.testing.hooks import interface
@@ -371,8 +371,12 @@ class _TenantMigrationThread(threading.Thread):
return _TenantMigrationOptions(donor_rs, recipient_rs, self._tenant_id, read_preference,
self.logger)
- def _create_client(self, node):
- return fixture_interface.build_client(node, self._auth_options)
+ def _create_client(self, fixture, **kwargs):
+ return fixture.mongo_client(username=self._auth_options["username"],
+ password=self._auth_options["password"],
+ authSource=self._auth_options["authenticationDatabase"],
+ authMechanism=self._auth_options["authenticationMechanism"],
+ uuidRepresentation='standard', **kwargs)
def _check_tenant_migration_dbhash(self, migration_opts):
# Set the donor connection string, recipient connection string, and migration uuid string
@@ -399,8 +403,10 @@ class _TenantMigrationThread(threading.Thread):
"""
try:
# Clean up any orphaned tenant databases on the recipient allow next migration to start.
- self._drop_tenant_databases(migration_opts.recipient_rs)
- res = self._start_and_wait_for_migration(migration_opts)
+ self._drop_tenant_databases_on_recipient(migration_opts)
+
+ donor_client = self._create_client(migration_opts.donor_rs)
+ res = self._start_and_wait_for_migration(donor_client, migration_opts)
is_committed = res["state"] == "committed"
# Garbage collect the migration prior to throwing error to avoid migration conflict
@@ -412,8 +418,8 @@ class _TenantMigrationThread(threading.Thread):
# If the migration committed, to avoid routing commands incorrectly, wait for the
# donor/proxy to reroute at least one command before doing garbage collection. Stop
# waiting when the test finishes.
- self._wait_for_reroute_or_test_completion(migration_opts)
- self._forget_migration(migration_opts)
+ self._wait_for_reroute_or_test_completion(donor_client, migration_opts)
+ self._forget_migration(donor_client, migration_opts)
self._wait_for_migration_garbage_collection(migration_opts)
if not res["ok"]:
@@ -435,177 +441,113 @@ class _TenantMigrationThread(threading.Thread):
"' with donor replica set '" +
migration_opts.get_donor_name() +
"' aborted due to an error: " + str(res))
- except pymongo.errors.PyMongoError:
+ except PyMongoError:
self.logger.exception(
"Error running tenant migration '%s' with donor primary on replica set '%s'.",
migration_opts.migration_id, migration_opts.get_donor_name())
raise
- def _start_and_wait_for_migration(self, migration_opts): # noqa: D205,D400
+ def _start_and_wait_for_migration(self, donor_client, migration_opts): # noqa: D205,D400
"""Run donorStartMigration to start a tenant migration based on 'migration_opts', wait for
the migration decision and return the last response for donorStartMigration.
"""
- cmd_obj = {
- "donorStartMigration":
- 1,
- "migrationId":
- bson.Binary(migration_opts.migration_id.bytes, 4),
- "recipientConnectionString":
- migration_opts.recipient_rs.get_driver_connection_url(),
- "tenantId":
- migration_opts.tenant_id,
- "readPreference":
- migration_opts.read_preference,
- "donorCertificateForRecipient":
- get_certificate_and_private_key("jstests/libs/tenant_migration_donor.pem"),
- "recipientCertificateForDonor":
- get_certificate_and_private_key("jstests/libs/tenant_migration_recipient.pem"),
- }
- donor_primary = migration_opts.get_donor_primary()
self.logger.info(
- "Starting tenant migration '%s' on donor primary on port %d of replica set '%s'.",
- migration_opts.migration_id, donor_primary.port, migration_opts.get_donor_name())
+ f"Starting tenant migration '{migration_opts.migration_id}' on replica set "
+ f"'{migration_opts.get_donor_name()}'.")
while True:
- try:
- # Keep polling the migration state until the migration completes.
- donor_primary_client = self._create_client(donor_primary)
- res = donor_primary_client.admin.command(
- cmd_obj,
- bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE))
- except (pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError):
- donor_primary = migration_opts.get_donor_primary()
- self.logger.info(
- "Retrying tenant migration '%s' against donor primary on port %d of replica " +
- "set '%s'.", migration_opts.migration_id, donor_primary.port,
- migration_opts.get_donor_name())
- continue
+ # Keep polling the migration state until the migration completes.
+ res = with_naive_retry(lambda: donor_client.admin.command({
+ "donorStartMigration":
+ 1,
+ "migrationId":
+ Binary(migration_opts.migration_id.bytes, UUID_SUBTYPE),
+ "recipientConnectionString":
+ migration_opts.recipient_rs.get_driver_connection_url(),
+ "tenantId":
+ migration_opts.tenant_id,
+ "readPreference":
+ migration_opts.read_preference,
+ "donorCertificateForRecipient":
+ get_certificate_and_private_key("jstests/libs/tenant_migration_donor.pem"),
+ "recipientCertificateForDonor":
+ get_certificate_and_private_key("jstests/libs/tenant_migration_recipient.pem"),
+ }))
+
if res["state"] == "committed":
- self.logger.info(
- "Tenant migration '%s' with donor primary on port %d of replica set '%s' has " +
- "committed.", migration_opts.migration_id, donor_primary.port,
- migration_opts.get_donor_name())
+ self.logger.info(f"Tenant migration '{migration_opts.migration_id}' on replica set "
+ f"'{migration_opts.get_donor_name()}' has committed.")
return res
if res["state"] == "aborted":
- self.logger.info(
- "Tenant migration '%s' with donor primary on port %d of replica set '%s' has " +
- "aborted: %s.", migration_opts.migration_id, donor_primary.port,
- migration_opts.get_donor_name(), str(res))
+ self.logger.info(f"Tenant migration '{migration_opts.migration_id}' on replica set "
+ f"'{migration_opts.get_donor_name()}' has aborted: '{str(res)}'.")
return res
if not res["ok"]:
- self.logger.info(
- "Tenant migration '%s' with donor primary on port %d of replica set '%s' has " +
- "failed: %s.", migration_opts.migration_id, donor_primary.port,
- migration_opts.get_donor_name(), str(res))
+ self.logger.info(f"Tenant migration '{migration_opts.migration_id}' on replica set "
+ f"'{migration_opts.get_donor_name()}' has failed: '{str(res)}'.")
return res
+
time.sleep(self.POLL_INTERVAL_SECS)
- def _forget_migration(self, migration_opts):
+ def _forget_migration(self, donor_client, migration_opts):
"""Run donorForgetMigration to garbage collection the tenant migration denoted by migration_opts'."""
- self.logger.info("Forgetting tenant migration: %s.", str(migration_opts))
-
- cmd_obj = {
- "donorForgetMigration": 1, "migrationId": bson.Binary(migration_opts.migration_id.bytes,
- 4)
- }
- donor_primary = migration_opts.get_donor_primary()
-
self.logger.info(
- "Forgetting tenant migration '%s' on donor primary on port %d of replica set '%s'.",
- migration_opts.migration_id, donor_primary.port, migration_opts.get_donor_name())
+ f"Forgetting tenant migration '{migration_opts.migration_id}' on replica set "
+ f"'{migration_opts.get_donor_name()}'.")
- while True:
- try:
- donor_primary_client = self._create_client(donor_primary)
- donor_primary_client.admin.command(
- cmd_obj,
- bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE))
- return
- except (pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError):
- donor_primary = migration_opts.get_donor_primary()
- self.logger.info(
- "Retrying forgetting tenant migration '%s' against donor primary on port %d of "
- + "replica set '%s'.", migration_opts.migration_id, donor_primary.port,
- migration_opts.get_donor_name())
- continue
- except pymongo.errors.OperationFailure as err:
- if err.code != self.NO_SUCH_MIGRATION_ERR_CODE:
- raise
- # The fixture was restarted.
- self.logger.info(
- "Could not find tenant migration '%s' on donor primary on" +
- " port %d of replica set '%s': %s.", migration_opts.migration_id,
- donor_primary.port, migration_opts.get_donor_name(), str(err))
- return
- except pymongo.errors.PyMongoError:
- self.logger.exception(
- "Error forgetting tenant migration '%s' on donor primary on" +
- " port %d of replica set '%s'.", migration_opts.migration_id,
- donor_primary.port, migration_opts.get_donor_name())
+ try:
+ with_naive_retry(lambda: donor_client.admin.command({
+ "donorForgetMigration": 1, "migrationId": Binary(migration_opts.migration_id.bytes,
+ UUID_SUBTYPE)
+ }))
+ except OperationFailure as err:
+ if err.code != self.NO_SUCH_MIGRATION_ERR_CODE:
raise
+ self.logger.info(f"Could not find tenant migration '{migration_opts.migration_id}' on "
+ f"replica set '{migration_opts.get_donor_name()}': {str(err)}.")
+ except PyMongoError:
+ self.logger.exception(
+ f"Error forgetting tenant migration '{migration_opts.migration_id}' on "
+ f"replica set '{migration_opts.get_donor_name()}'.")
+ raise
+
def _wait_for_migration_garbage_collection(self, migration_opts): # noqa: D205,D400
"""Wait until the persisted state for tenant migration denoted by 'migration_opts' has been
garbage collected on both the donor and recipient.
"""
+ timeout_secs = self._tenant_migration_fixture.AWAIT_REPL_TIMEOUT_MINS * 60
+
+ def wait_for_gc_on_node(node, rs_name, collection_name):
+ self.logger.info(
+ "Waiting for tenant migration '%s' to be garbage collected on donor node on " +
+ "port %d of replica set '%s'.", migration_opts.migration_id, node.port, rs_name)
+
+ node_client = self._create_client(node)
+
+ start = time.monotonic()
+ while time.monotonic() - start < timeout_secs:
+ res = with_naive_retry(lambda: node_client.config.command(
+ {"count": collection_name, "query": {"tenantId": migration_opts.tenant_id}}))
+ if res["n"] == 0:
+ return
+ time.sleep(self.POLL_INTERVAL_SECS)
+
+ raise errors.ServerFailure(
+ f"Timed out while waiting for garbage collection of node on port {node.port}.")
+
try:
donor_nodes = migration_opts.get_donor_nodes()
for donor_node in donor_nodes:
- self.logger.info(
- "Waiting for tenant migration '%s' to be garbage collected on donor node on " +
- "port %d of replica set '%s'.", migration_opts.migration_id, donor_node.port,
- migration_opts.get_donor_name())
-
- while True:
- try:
- donor_node_client = self._create_client(donor_node)
- res = donor_node_client.config.command({
- "count": "tenantMigrationDonors",
- "query": {"tenantId": migration_opts.tenant_id}
- })
- if res["n"] == 0:
- break
- except (pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError):
- # Ignore NotPrimaryErrors because it's possible to fail with
- # InterruptedDueToReplStateChange if the donor primary steps down or shuts
- # down during the garbage collection check.
- self.logger.info(
- "Retrying waiting for tenant migration '%s' to be garbage collected on"
- + " donor node on port %d of replica set '%s'.",
- migration_opts.migration_id, donor_node.port,
- migration_opts.get_donor_name())
- continue
- time.sleep(self.POLL_INTERVAL_SECS)
+ wait_for_gc_on_node(donor_node, migration_opts.get_donor_name(),
+ "tenantMigrationDonors")
recipient_nodes = migration_opts.get_recipient_nodes()
for recipient_node in recipient_nodes:
- self.logger.info(
- "Waiting for tenant migration '%s' to be garbage collected on recipient node on"
- + " port %d of replica set '%s'.", migration_opts.migration_id,
- recipient_node.port, migration_opts.get_recipient_name())
-
- while True:
- try:
- recipient_node_client = self._create_client(recipient_node)
- res = recipient_node_client.config.command({
- "count": "tenantMigrationRecipients",
- "query": {"tenantId": migration_opts.tenant_id}
- })
- if res["n"] == 0:
- break
- except (pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError):
- # Ignore NotPrimaryErrors because it's possible to fail with
- # InterruptedDueToReplStateChange if the recipient primary steps down or
- # shuts down during the garbage collection check.
- self.logger.info(
- "Retrying waiting for tenant migration '%s' to be garbage collected on"
- + " recipient node on port %d of replica set '%s'.",
- migration_opts.migration_id, recipient_node.port,
- migration_opts.get_recipient_name())
- continue
- time.sleep(self.POLL_INTERVAL_SECS)
- except pymongo.errors.PyMongoError:
+ wait_for_gc_on_node(recipient_node, migration_opts.get_recipient_name(),
+ "tenantMigrationRecipients")
+ except PyMongoError:
self.logger.exception(
"Error waiting for tenant migration '%s' from donor replica set '%s" +
" to recipient replica set '%s' to be garbage collected.",
@@ -613,67 +555,44 @@ class _TenantMigrationThread(threading.Thread):
migration_opts.get_recipient_name())
raise
- def _wait_for_reroute_or_test_completion(self, migration_opts):
- start_time = time.time()
- donor_primary = migration_opts.get_donor_primary()
-
+ def _wait_for_reroute_or_test_completion(self, donor_client, migration_opts):
self.logger.info(
- "Waiting for donor primary on port %d of replica set '%s' for tenant migration '%s' " +
- "to reroute at least one conflicting command. Stop waiting when the test finishes.",
- donor_primary.port, migration_opts.get_donor_name(), migration_opts.migration_id)
+ f"Waiting for tenant migration '{migration_opts.migration_id}' on replica set "
+ f"'{migration_opts.get_donor_name()}' to reroute at least one conflicting command. "
+ f"Stop waiting when the test finishes.")
+ start_time = time.time()
while not self.__lifecycle.is_test_finished():
try:
- donor_primary_client = self._create_client(donor_primary)
- doc = donor_primary_client["testTenantMigration"]["rerouted"].find_one(
- {"_id": bson.Binary(migration_opts.migration_id.bytes, 4)})
+ doc = donor_client["testTenantMigration"]["rerouted"].find_one(
+ {"_id": Binary(migration_opts.migration_id.bytes, UUID_SUBTYPE)})
if doc is not None:
return
- except (pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError):
- donor_primary = migration_opts.get_donor_primary()
- self.logger.info(
- "Retrying waiting for donor primary on port '%d' of replica set '%s' for " +
- "tenant migration '%s' to reroute at least one conflicting command.",
- donor_primary.port, migration_opts.get_donor_name(),
- migration_opts.migration_id)
- continue
- except pymongo.errors.PyMongoError:
+ except PyMongoError:
end_time = time.time()
self.logger.exception(
- "Error running find command on donor primary on port %d of replica set '%s' " +
- "after waiting for reroute for %0d ms", donor_primary.port,
- migration_opts.get_donor_name(), (end_time - start_time) * 1000)
+ f"Error running find command on replica set '{migration_opts.get_donor_name()}' "
+ f"after waiting for reroute for {(end_time - start_time) * 1000} ms")
raise
time.sleep(self.POLL_INTERVAL_SECS)
- def _drop_tenant_databases(self, rs):
- self.logger.info("Dropping tenant databases from replica set '%s'.", rs.replset_name)
-
- primary = get_primary(rs, self.logger)
- self.logger.info("Running dropDatabase commands against primary on port %d.", primary.port)
+ def _drop_tenant_databases_on_recipient(self, migration_opts):
+ self.logger.info(
+ f"Dropping tenant databases from replica set '{migration_opts.get_recipient_name()}'.")
+ recipient_client = self._create_client(migration_opts.recipient_rs)
- while True:
- try:
- primary_client = self._create_client(primary)
- res = primary_client.admin.command({"listDatabases": 1})
- for database in res["databases"]:
- db_name = database["name"]
- if db_name.startswith(self._tenant_id + "_"):
- primary_client.drop_database(db_name)
- return
- # We retry on all write concern errors because we assume the only reason waiting for
- # write concern should fail is because of a failover.
- except (pymongo.errors.AutoReconnect, pymongo.errors.NotPrimaryError,
- pymongo.errors.WriteConcernError) as err:
- primary = get_primary(rs, self.logger)
- self.logger.info(
- "Retrying dropDatabase commands against primary on port %d after error %s.",
- primary.port, str(err))
- continue
- except pymongo.errors.PyMongoError:
- self.logger.exception(
- "Error dropping databases for tenant id '%s' on primary on" +
- " port %d of replica set '%s' to be garbage collection.", self._tenant_id,
- primary.port, rs.replset_name)
- raise
+ try:
+ self.logger.info(
+ f"Running dropDatabase commands against replica set '{migration_opts.get_recipient_name()}'"
+ )
+ res = with_naive_retry(lambda: recipient_client.admin.command({"listDatabases": 1}))
+ for database in res["databases"]:
+ db_name = database["name"]
+ if db_name.startswith(self._tenant_id + "_"):
+ recipient_client.drop_database(db_name)
+ except PyMongoError as err:
+ self.logger.exception(
+ f"Error dropping databases for tenant '{self._tenant_id}' on replica set '{migration_opts.get_recipient_name()}': '{str(err)}'."
+ )
+ raise
diff --git a/buildscripts/resmokelib/testing/retry.py b/buildscripts/resmokelib/testing/retry.py
new file mode 100644
index 00000000000..3b5287c4b61
--- /dev/null
+++ b/buildscripts/resmokelib/testing/retry.py
@@ -0,0 +1,58 @@
+import time
+from pymongo.errors import PyMongoError, ConnectionFailure, OperationFailure, ExecutionTimeout
+
+# TODO(DRIVERS-1401): Use error labels instead of checking against an allow list of error codes.
+retryable_codes = [
+ # From the SDAM spec, the "node is shutting down" codes.
+ 11600, # InterruptedAtShutdown
+ 91, # ShutdownInProgress
+ # From the SDAM spec, the "not primary" and "node is recovering" error codes.
+ 10058, # LegacyNotPrimary <=3.2 "not primary" error code
+ 10107, # NotWritablePrimary
+ 13435, # NotPrimaryNoSecondaryOk
+ 11602, # InterruptedDueToReplStateChange
+ 13436, # NotPrimaryOrSecondary
+ 189, # PrimarySteppedDown
+ # From the retryable reads/writes spec.
+ 7, # HostNotFound
+ 6, # HostUnreachable
+ 89, # NetworkTimeout
+ 9001, # SocketException
+ 262, # ExceededTimeLimit
+]
+
+
+def is_retryable_error(exc):
+ if isinstance(exc, ConnectionFailure):
+ return True
+ if exc.has_error_label("RetryableWriteError"):
+ return True
+ if isinstance(exc, OperationFailure) and exc.code in retryable_codes:
+ return True
+ return False
+
+
+def with_naive_retry(func, timeout=100):
+ """
+ Retry execution of a provided function naively for up to `timeout` seconds.
+
+ This method is only suitable for reads or other idempotent operations. It is not suitable for
+ retrying non-idempotent operations (most writes).
+
+ :param func: The function to execute
+ :param timeout: The maximum amount of time to retry execution
+ """
+
+ last_exc = None
+ start = time.monotonic()
+ while time.monotonic() - start < timeout:
+ try:
+ return func()
+ except PyMongoError as exc:
+ last_exc = exc
+ if not is_retryable_error(exc):
+ raise
+ time.sleep(0.1)
+
+ raise ExecutionTimeout(
+ f"Operation exceeded time limit after {timeout} seconds, last error: {last_exc}")