summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Zhang <jason.zhang@mongodb.com>2020-11-05 15:53:56 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-20 06:21:33 +0000
commit34a81def84f07db8bfff2a9a378c46328c2f8aca (patch)
tree40ce97d02becb46c023f6fd203ca291e3db94f5c
parent91a31f597533ef25964e8f831ecec51219ca6084 (diff)
downloadmongo-34a81def84f07db8bfff2a9a378c46328c2f8aca.tar.gz
SERVER-50494 Implement proxy’s retry logic for batch write commands
Co-authored-by: Cheahuychou Mao <mao.cheahuychou@gmail.com>
-rw-r--r--buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml55
-rw-r--r--buildscripts/resmokelib/testing/hooks/tenant_migration.py338
-rw-r--r--jstests/libs/override_methods/inject_tenant_prefix.js294
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp4
4 files changed, 583 insertions, 108 deletions
diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml
index 6df777169de..f61b8e88834 100644
--- a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml
@@ -17,12 +17,6 @@ selector:
- jstests/core/top.js
# The override cannot deep copy very large or small dates.
- jstests/core/index_large_and_small_dates.js
- # These tests expect the profiler to observe batched write operations but batched writes are
- # disabled in this suite.
- - jstests/core/profile_insert.js
- - jstests/core/profile_delete.js
- - jstests/core/profile_findandmodify.js
- - jstests/core/profile_update.js
# These tests are not expected to pass with replica-sets.
- jstests/core/opcounters_write_cmd.js
- jstests/core/read_after_optime.js
@@ -54,7 +48,38 @@ selector:
- jstests/core/comment_field.js
- jstests/core/invalidated_legacy_cursors.js
# TODO (SERVER-51753): Handle applyOps running concurrently with a tenant migration.
+ - jstests/core/apply_ops1.js
+ - jstests/core/apply_ops1.js
+ - jstests/core/apply_ops2.js
+ - jstests/core/apply_ops_dups.js
+ - jstests/core/apply_ops_index_collation.js
+ - jstests/core/apply_ops_invalid_index_spec.js
+ - jstests/core/apply_ops_missing_field.js
+ - jstests/core/apply_ops_system_dot_views.js
+ - jstests/core/apply_ops_without_ns.js
+ - jstests/core/bypass_doc_validation.js
+ - jstests/core/collation.js
+ - jstests/core/collmod_without_uuid.js
+ - jstests/core/txns/commands_banning_txnnumber_outside_transactions.js
+ - jstests/core/txns/commands_not_allowed_in_txn.js
+ - jstests/core/txns/prepare_transaction_fails_on_temp_collections.js
+ - jstests/core/txns/statement_ids_accepted.js
+ - jstests/core/list_collections1.js
+ - jstests/core/list_collections_filter.js
+ - jstests/core/list_collections_no_views.js
+ - jstests/core/views/duplicate_ns.js
+ - jstests/core/views/view_with_invalid_dbname.js
+ - jstests/core/views/views_creation.js
+ - jstests/core/views/invalid_system_views.js
+ - jstests/core/views/views_all_commands.js
- jstests/core/rename_stayTemp.js
+ # TODO (SERVER-52727): Synchronize cloneCollectionAsCapped with tenant migrations.
+ - jstests/core/capped_convertToCapped1.js
+ # TODO (SERVER-52866): Synchronize getLastError with tenant migrations.
+ - jstests/core/bulk_legacy_enforce_gle.js
+ # These tests run getMore commands which are not supported in the tenant migration *passthrough*.
+ exclude_with_any_tags:
+ - requires_getmore
executor:
archive:
@@ -70,8 +95,6 @@ executor:
global_vars:
TestData: &TestData
tenantId: "tenantMigrationTenantId"
- # TODO (SERVER-50494): Implement proxy's retry logic for batch write commands.
- disableBatchWrites: true
readMode: commands
hooks:
- class: ContinuousTenantMigration
@@ -85,20 +108,24 @@ executor:
- class: CheckReplDBHash
- class: ValidateCollections
- class: CleanEveryN
- # TODO (SERVER-49204): The ContinuousTenantMigration hook reuses TestData.tenantId hook for all
- # migrations. Therefore, it needs to run the donorForgetMigration command after each migration
- # (i.e. test) in order to make the subsequent migration not conflict with the one run by the
- # previous test. Until we have the donorForgetMigration command, we need to specify n: 1 to
- # clear the in-memory and persisted state for each migration.
- n: 1
+ n: 20
fixture:
class: TenantMigrationFixture
mongod_options:
set_parameters:
enableTestCommands: 1
+ failpoint.abortTenantMigrationAfterBlockingStarts:
+ mode: alwaysOn
+ data:
+ blockTimeMS: 250
# TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
failpoint.returnResponseOkForRecipientSyncDataCmd:
mode: alwaysOn
+ # Set the delay before migration state machine is garbage collected to be short to avoid
+ # migration conflicts since the ContinuousTenantMigration hook migrates a single tenant
+ # between the replica sets in the fixture.
+ tenantMigrationGarbageCollectionDelayMS: 100
+ ttlMonitorSleepSecs: 1
num_replica_sets: 2
num_nodes_per_replica_set: 3
use_replica_set_connection_string: true
diff --git a/buildscripts/resmokelib/testing/hooks/tenant_migration.py b/buildscripts/resmokelib/testing/hooks/tenant_migration.py
index 5b6db427bb0..f0804758e3c 100644
--- a/buildscripts/resmokelib/testing/hooks/tenant_migration.py
+++ b/buildscripts/resmokelib/testing/hooks/tenant_migration.py
@@ -32,48 +32,123 @@ class ContinuousTenantMigration(interface.Hook): # pylint: disable=too-many-ins
"""
interface.Hook.__init__(self, hook_logger, fixture, ContinuousTenantMigration.DESCRIPTION)
- self._tenant_id = shell_options["global_vars"]["TestData"]["tenantId"]
- if not isinstance(fixture, tenant_migration.TenantMigrationFixture) or \
- fixture.get_num_replica_sets() != 2:
- raise ValueError(
- "The ContinuousTenantMigration hook requires a TenantMigrationFixture with two replica sets"
- )
+ if not isinstance(fixture, tenant_migration.TenantMigrationFixture):
+ raise ValueError("The ContinuousTenantMigration hook requires a TenantMigrationFixture")
self._tenant_migration_fixture = fixture
+ self._tenant_id = shell_options["global_vars"]["TestData"]["tenantId"]
self._tenant_migration_thread = None
def before_suite(self, test_report):
"""Before suite."""
- # TODO (SERVER-50496): Make the hook start the migration thread once here instead of inside
- # before_test and make it run migrations continuously back and forth between the two replica
- # sets.
if not self._tenant_migration_fixture:
- raise ValueError("No replica set pair to run migrations on")
+ raise ValueError("No TenantMigrationFixture to run migrations on")
+ self.logger.info("Starting the tenant migration thread.")
+ self._tenant_migration_thread = _TenantMigrationThread(
+ self.logger, self._tenant_migration_fixture, self._tenant_id)
+ self._tenant_migration_thread.start()
def after_suite(self, test_report):
"""After suite."""
- return
+ self.logger.info("Stopping the tenant migration thread.")
+ self._tenant_migration_thread.stop()
+ self.logger.info("Stopped the tenant migration thread.")
def before_test(self, test, test_report):
"""Before test."""
- self.logger.info("Starting the migration thread.")
- self._tenant_migration_thread = _TenantMigrationThread(
- self.logger, self._tenant_migration_fixture, self._tenant_id)
- self._tenant_migration_thread.start()
+ self.logger.info("Resuming the tenant migration thread.")
+ self._tenant_migration_thread.resume()
def after_test(self, test, test_report):
"""After test."""
- self.logger.info("Stopping the migration thread.")
- self._tenant_migration_thread.stop()
- self.logger.info("migration thread stopped.")
+ self.logger.info("Pausing the tenant migration thread.")
+ self._tenant_migration_thread.pause()
+ self.logger.info("Paused the tenant migration thread.")
+
+
+class TenantMigrationLifeCycle(object):
+ """Class for managing the various states of the tenant migration thread.
+
+ The job thread alternates between calling mark_test_started() and mark_test_finished(). The
+ tenant migration thread is allowed to perform migrations at any point between these two calls.
+ Note that the job thread synchronizes with the tenant migration thread outside the context of
+ this object to know it isn't in the process of running a migration.
+ """
+
+ _TEST_STARTED_STATE = "start"
+ _TEST_FINISHED_STATE = "finished"
+
+ def __init__(self):
+ """Initialize the MigrationLifecycle instance."""
+ self.__lock = threading.Lock()
+ self.__cond = threading.Condition(self.__lock)
+
+ self.__test_state = self._TEST_FINISHED_STATE
+ self.__should_stop = False
+
+ def mark_test_started(self):
+ """Signal to the tenant migration thread that a new test has started.
+
+ This function should be called during before_test(). Calling it causes the
+ wait_for_tenant_migration_permitted() function to no longer block and to instead return
+ true.
+ """
+ with self.__lock:
+ self.__test_state = self._TEST_STARTED_STATE
+ self.__cond.notify_all()
+
+ def mark_test_finished(self):
+ """Signal to the tenant migration thread that the current test has finished.
+
+ This function should be called during after_test(). Calling it causes the
+ wait_for_tenant_migration_permitted() function to block until mark_test_started() is called
+ again.
+ """
+ with self.__lock:
+ self.__test_state = self._TEST_FINISHED_STATE
+ self.__cond.notify_all()
+
+ def stop(self):
+ """Signal to the tenant migration thread that it should exit.
+
+ This function should be called during after_suite(). Calling it causes the
+ wait_for_tenant_migration_permitted() function to no longer block and to instead return
+ false.
+ """
+ with self.__lock:
+ self.__should_stop = True
+ self.__cond.notify_all()
+
+ def wait_for_tenant_migration_permitted(self):
+ """Block until migrations are permitted, or until stop() is called.
+
+ Return true if migrations are permitted, and false if migrations are not permitted.
+ """
+ with self.__lock:
+ while not self.__should_stop:
+ if self.__test_state == self._TEST_STARTED_STATE:
+ return True
+
+ self.__cond.wait()
+
+ return False
+
+ def wait_for_tenant_migration_interval(self, timeout):
+ """Block for 'timeout' seconds, or until stop() is called."""
+ with self.__lock:
+ self.__cond.wait(timeout)
+
+ def poll_for_idle_request(self): # noqa: D205,D400
+ """Return true if the tenant migration thread should continue running migrations, or false
+ if it should temporarily stop running migrations.
+ """
+ with self.__lock:
+ return self.__test_state == self._TEST_FINISHED_STATE
class _TenantMigrationThread(threading.Thread): # pylint: disable=too-many-instance-attributes
- MIN_START_MIGRATION_DELAY_SECS = 0.1
- MAX_START_MIGRATION_DELAY_SECS = 0.25
- MIN_BLOCK_TIME_SECS = 1
- MAX_BLOCK_TIME_SECS = 2.5
- DONOR_START_MIGRATION_POLL_INTERVAL_SECS = 0.1
+ MAX_MIGRATION_INTERVAL_SECS = 5
+ MIGRATION_STATE_POLL_INTERVAL_SECS = 0.1
def __init__(self, logger, tenant_migration_fixture, tenant_id):
"""Initialize _TenantMigrationThread."""
@@ -83,106 +158,203 @@ class _TenantMigrationThread(threading.Thread): # pylint: disable=too-many-inst
self._tenant_migration_fixture = tenant_migration_fixture
self._tenant_id = tenant_id
+ self.__lifecycle = TenantMigrationLifeCycle()
self._last_exec = time.time()
+ # Event set when the thread has been stopped using the 'stop()' method.
+ self._is_stopped_evt = threading.Event()
+ # Event set when the thread is not performing migrations.
+ self._is_idle_evt = threading.Event()
+ self._is_idle_evt.set()
def run(self):
"""Execute the thread."""
if not self._tenant_migration_fixture:
- self.logger.warning("No replica set pair to run migrations on.")
+ self.logger.warning("No TenantMigrationFixture to run migrations on.")
return
+ donor_rs = self._tenant_migration_fixture.get_replset(0)
+ recipient_rs = self._tenant_migration_fixture.get_replset(1)
+
try:
- now = time.time()
- self.logger.info("Starting a tenant migration for tenantId '%s'", self._tenant_id)
- self._run_migration(self._tenant_migration_fixture)
- self._last_exec = time.time()
- self.logger.info("Completed a tenant migration in %0d ms",
- (self._last_exec - now) * 1000)
+ while True:
+ self._is_idle_evt.set()
+
+ permitted = self.__lifecycle.wait_for_tenant_migration_permitted()
+ if not permitted:
+ break
+
+ self._is_idle_evt.clear()
+
+ self.logger.info("Starting tenant migration for tenant id '%s' from %s to %s.",
+ self._tenant_id, donor_rs.replset_name, recipient_rs.replset_name)
+ now = time.time()
+ self._run_migration(donor_rs, recipient_rs)
+ self._last_exec = time.time()
+ self.logger.info(
+ "Completed tenant migration in %0d ms for tenant id '%s' from %s to %s.",
+ (self._last_exec - now) * 1000, self._tenant_id, donor_rs.replset_name,
+ recipient_rs.replset_name)
+
+ found_idle_request = self.__lifecycle.poll_for_idle_request()
+ if found_idle_request:
+ continue
+
+ # The 'wait_secs' is used to wait [0.1, 1.0] * MAX_MIGRATION_INTERVAL_SECS from the moment the last
+ # tenant migration completed. We want to sometimes wait for a larger interval to allow long-running
+ # commands, like large batched writes, to complete without continuously conflicting with a
+ # migration.
+ now = time.time()
+ wait_secs = max(
+ 0,
+ random.uniform(0.1, 1.0) * _TenantMigrationThread.MAX_MIGRATION_INTERVAL_SECS -
+ (now - self._last_exec))
+ self.__lifecycle.wait_for_tenant_migration_interval(wait_secs)
except Exception: # pylint: disable=W0703
- # Proactively log the exception when it happens so it will be
- # flushed immediately.
- self.logger.exception("Migration Thread threw exception")
+ # Proactively log the exception when it happens so it will be flushed immediately.
+ self.logger.exception("Tenant migration thread threw exception")
+ # The event should be signaled whenever the thread is not performing migrations.
+ self._is_idle_evt.set()
def stop(self):
"""Stop the thread."""
+ self.__lifecycle.stop()
+ self._is_stopped_evt.set()
+ # Unpause to allow the thread to finish.
+ self.resume()
self.join()
- def _enable_abort(self, donor_primary_client, donor_primary_port, donor_primary_rs_name):
- # Configure the failpoint to make the migration abort after the migration has been
- # blocking reads and writes for a randomly generated number of milliseconds
- # (< MAX_BLOCK_TIME_MILLISECS). Must be called with _disable_abort at the start and
- # end of each test so that each test uses its own randomly generated block time.
- try:
- donor_primary_client.admin.command(
- bson.SON(
- [("configureFailPoint", "abortTenantMigrationAfterBlockingStarts"),
- ("mode", "alwaysOn"),
- ("data",
- bson.SON(
- [("blockTimeMS",
- 1000 * random.uniform(_TenantMigrationThread.MIN_BLOCK_TIME_SECS,
- _TenantMigrationThread.MAX_BLOCK_TIME_SECS))]))]))
- except pymongo.errors.OperationFailure as err:
- self.logger.exception(
- "Unable to enable the failpoint to make migrations abort on donor primary on port "
- + "%d of replica set '%s'.", donor_primary_port, donor_primary_rs_name)
+ def pause(self):
+ """Pause the thread."""
+ self.__lifecycle.mark_test_finished()
+
+ # Wait until we are no longer executing migrations.
+ self._is_idle_evt.wait()
+ # Check if the thread is alive in case it has thrown an exception while running.
+ self._check_thread()
+
+ # Check that the fixture is still running.
+ if not self._tenant_migration_fixture.is_running():
raise errors.ServerFailure(
- "Unable to enable the failpoint to make migrations abort on donor primary on port "
- + "{} of replica set '{}': {}".format(donor_primary_port, donor_primary_rs_name,
- err.args[0]))
+ "TenantMigrationFixture with pids {} expected to be running in"
+ " ContinuousTenantMigration, but wasn't".format(
+ self._tenant_migration_fixture.pids()))
+
+ def resume(self):
+ """Resume the thread."""
+ self.__lifecycle.mark_test_started()
+
+ def _wait(self, timeout):
+ """Wait until stop or timeout."""
+ self._is_stopped_evt.wait(timeout)
+
+ def _check_thread(self):
+ """Throw an error if the thread is not running."""
+ if not self.is_alive():
+ msg = "The tenant migration thread is not running."
+ self.logger.error(msg)
+ raise errors.ServerFailure(msg)
+
+ def _wait_for_migration_garbage_collection(self, rs):
+ primary = rs.get_primary()
+ primary_client = primary.mongo_client()
- def _disable_abort(self, donor_primary_client, donor_primary_port, donor_primary_rs_name):
try:
- donor_primary_client.admin.command(
- bson.SON([("configureFailPoint", "abortTenantMigrationAfterBlockingStarts"),
- ("mode", "off")]))
- except pymongo.errors.OperationFailure as err:
+ while True:
+ res = primary_client.config.command(
+ {"count": "tenantMigrationDonors", "query": {"tenantId": self._tenant_id}})
+ if res["n"] == 0:
+ break
+ except pymongo.errors.PyMongoError:
self.logger.exception(
- "Unable to disable the failpoint to make migrations abort on donor primary on port "
- + "%d of replica set '%s'.", donor_primary_port, donor_primary_rs_name)
- raise errors.ServerFailure(
- "Unable to disable the failpoint to make migrations abort on donor primary on port "
- + "{} of replica set '{}': {}".format(donor_primary_port, donor_primary_rs_name,
- err.args[0]))
+ "Error waiting for tenant migration for tenant id '%s' on primary on" +
+ " port %d of replica set '%s' to be garbage collection.", self._tenant_id,
+ primary.port, rs.replset_name)
+ raise
- def _run_migration(self, tenant_migration_fixture):
- donor_rs = tenant_migration_fixture.get_replset(0)
- recipient_rs = tenant_migration_fixture.get_replset(1)
+ def _drop_tenant_databases(self, rs):
+ primary = rs.get_primary()
+ primary_client = primary.mongo_client()
+ try:
+ res = primary_client.admin.command({"listDatabases": 1})
+ for database in res["databases"]:
+ db_name = database["name"]
+ if db_name.startswith(self._tenant_id + "_"):
+ primary_client.drop_database(db_name)
+ except pymongo.errors.PyMongoError:
+ self.logger.exception(
+ "Error dropping databases for tenant id '%s' on primary on" +
+ " port %d of replica set '%s' to be garbage collection.", self._tenant_id,
+ primary.port, rs.replset_name)
+ raise
+
+ def _forget_migration(self, donor_rs, migration_id):
donor_primary = donor_rs.get_primary()
donor_primary_client = donor_primary.mongo_client()
- time.sleep(
- random.uniform(_TenantMigrationThread.MIN_START_MIGRATION_DELAY_SECS,
- _TenantMigrationThread.MAX_START_MIGRATION_DELAY_SECS))
+ self.logger.info(
+ "Forgeting tenant migration with donor primary on port %d of replica set '%s'.",
+ donor_primary.port, donor_rs.replset_name)
+
+ try:
+ donor_primary_client.admin.command(
+ {"donorForgetMigration": 1, "migrationId": migration_id},
+ bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE))
+ except pymongo.errors.PyMongoError:
+ self.logger.exception(
+ "Error forgetting tenant migration with donor primary on port %d of replica set '%s'.",
+ donor_primary.port, donor_rs.replset_name)
+ raise
+
+ def _run_migration(self, donor_rs, recipient_rs):
+ donor_primary = donor_rs.get_primary()
+ donor_primary_client = donor_primary.mongo_client()
self.logger.info(
- "Starting a tenant migration with donor primary on port %d of replica set '%s'.",
+ "Starting tenant migration with donor primary on port %d of replica set '%s'.",
donor_primary.port, donor_rs.replset_name)
+ migration_id = bson.Binary(uuid.uuid4().bytes, 4)
cmd_obj = {
- "donorStartMigration": 1, "migrationId": bson.Binary(uuid.uuid4().bytes, 4),
+ "donorStartMigration": 1, "migrationId": migration_id,
"recipientConnectionString": recipient_rs.get_driver_connection_url(),
"tenantId": self._tenant_id, "readPreference": {"mode": "primary"}
}
try:
- self._enable_abort(donor_primary_client, donor_primary.port, donor_rs.replset_name)
-
while True:
- # Keep polling the migration state until the migration completes, otherwise we might
- # end up disabling 'abortTenantMigrationAfterBlockingStarts' before the migration
- # enters the blocking state and aborts.
+ # Keep polling the migration state until the migration completes, otherwise we
+ # might end up disabling 'pauseTenantMigrationAfterBlockingStartsWithTimeout'
+ # before the tenant migration enters the blocking state and aborts.
res = donor_primary_client.admin.command(
cmd_obj,
bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE))
- if (not res["ok"] or res["state"] == "committed" or res["state"] == "aborted"):
+
+ if res["state"] == "committed":
+ # TODO (SERVER-50495): Make tenant_migration_jscore_passthrough simulate a
+ # migration that commits.
+ errors.ServerFailure("Tenant migration with donor primary on port " +
+ str(donor_primary.port) + " of replica set '" +
+ donor_rs.replset_name + "' has committed.")
+ elif res["state"] == "aborted":
+ self.logger.info("Tenant migration with donor primary on port " +
+ str(donor_primary.port) + " of replica set '" +
+ donor_rs.replset_name + "' has aborted: " + str(res))
break
- time.sleep(_TenantMigrationThread.DONOR_START_MIGRATION_POLL_INTERVAL_SECS)
+ elif not res["ok"]:
+ errors.ServerFailure("Tenant migration with donor primary on port " +
+ str(donor_primary.port) + " of replica set '" +
+ donor_rs.replset_name + "' has failed: " + str(res))
+
+ time.sleep(_TenantMigrationThread.MIGRATION_STATE_POLL_INTERVAL_SECS)
+
+ self._forget_migration(donor_rs, migration_id)
+ # Wait for the donor to garbage the migration state.
+ self._wait_for_migration_garbage_collection(donor_rs)
+ # Drop any tenant databases that the recipient cloned during the migration.
+ self._drop_tenant_databases(recipient_rs)
except pymongo.errors.PyMongoError:
self.logger.exception(
"Error running tenant migration with donor primary on port %d of replica set '%s'.",
donor_primary.port, donor_rs.replset_name)
raise
- finally:
- self._disable_abort(donor_primary_client, donor_primary.port, donor_rs.replset_name)
diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js
index 1b4840d8d0e..83752915a27 100644
--- a/jstests/libs/override_methods/inject_tenant_prefix.js
+++ b/jstests/libs/override_methods/inject_tenant_prefix.js
@@ -197,6 +197,92 @@ function extractTenantMigrationAbortedError(resObj) {
}
return null;
}
+/**
+ * If the command was a batch command where some of the operations failed, modifies the command
+ * object so that only failed operations are retried.
+ */
+function modifyCmdObjForRetry(cmdObj, resObj) {
+ if (cmdObj.insert) {
+ let retryOps = [];
+ if (cmdObj.ordered) {
+ retryOps = cmdObj.documents.slice(resObj.writeErrors[0].index);
+ } else {
+ for (let writeError of resObj.writeErrors) {
+ if (writeError.code == ErrorCodes.TenantMigrationAborted) {
+ retryOps.push(cmdObj.documents[writeError.index]);
+ }
+ }
+ }
+ cmdObj.documents = retryOps;
+ }
+
+ // findAndModify may also have an update field, but is not a batched command.
+ if (cmdObj.update && !cmdObj.findAndModify && !cmdObj.findandmodify) {
+ let retryOps = [];
+ if (cmdObj.ordered) {
+ retryOps = cmdObj.updates.slice(resObj.writeErrors[0].index);
+ } else {
+ for (let writeError of resObj.writeErrors) {
+ if (writeError.code == ErrorCodes.TenantMigrationAborted) {
+ retryOps.push(cmdObj.updates[writeError.index]);
+ }
+ }
+ }
+ cmdObj.updates = retryOps;
+ }
+
+ if (cmdObj.delete) {
+ let retryOps = [];
+ if (cmdObj.ordered) {
+ retryOps = cmdObj.deletes.slice(resObj.writeErrors[0].index);
+ } else {
+ for (let writeError of resObj.writeErrors) {
+ if (writeError.code == ErrorCodes.TenantMigrationAborted) {
+ retryOps.push(cmdObj.deletes[writeError.index]);
+ }
+ }
+ }
+ cmdObj.deletes = retryOps;
+ }
+}
+
+/**
+ * Sets the keys of the given index map to consecutive non-negative integers starting from 0.
+ */
+function resetIndices(indexMap) {
+ let newIndexMap = {};
+ Object.keys(indexMap).map((key, index) => {
+ newIndexMap[index] = indexMap[key];
+ });
+ return newIndexMap;
+}
+
+function toIndexSet(indexedDocs) {
+ let set = new Set();
+ if (indexedDocs) {
+ for (let doc of indexedDocs) {
+ set.add(doc.index);
+ }
+ }
+ return set;
+}
+
+/**
+ * Remove the indices for non-upsert writes that succeeded.
+ */
+function removeSuccessfulOpIndexesExceptForUpserted(resObj, indexMap) {
+ // Optimization to only look through the indices in a set rather than in an array.
+ let indexSetForUpserted = toIndexSet(resObj.upserted);
+ let indexSetForWriteErrors = toIndexSet(resObj.writeErrors);
+
+ for (let index in Object.keys(indexMap)) {
+ if ((!indexSetForUpserted.has(parseInt(index)) &&
+ !indexSetForWriteErrors.has(parseInt(index)))) {
+ delete indexMap[index];
+ }
+ }
+ return indexMap;
+}
Mongo.prototype.runCommand = function(dbName, cmdObj, options) {
// Create another cmdObj from this command with TestData.tenantId prepended to all the
@@ -205,6 +291,31 @@ Mongo.prototype.runCommand = function(dbName, cmdObj, options) {
let numAttempts = 0;
+ // Keep track of the write operations that were applied.
+ let n = 0;
+ let nModified = 0;
+ let upserted = [];
+ let nonRetryableWriteErrors = [];
+
+ // 'indexMap' is a mapping from a write's index in the current cmdObj to its index in the
+ // original cmdObj.
+ let indexMap = {};
+ if (cmdObjWithTenantId.documents) {
+ for (let i = 0; i < cmdObjWithTenantId.documents.length; i++) {
+ indexMap[i] = i;
+ }
+ }
+ if (cmdObjWithTenantId.updates) {
+ for (let i = 0; i < cmdObjWithTenantId.updates.length; i++) {
+ indexMap[i] = i;
+ }
+ }
+ if (cmdObjWithTenantId.deletes) {
+ for (let i = 0; i < cmdObjWithTenantId.deletes.length; i++) {
+ indexMap[i] = i;
+ }
+ }
+
while (true) {
numAttempts++;
let resObj = originalRunCommand.apply(
@@ -214,22 +325,119 @@ Mongo.prototype.runCommand = function(dbName, cmdObj, options) {
// assume the command was run against the original database.
removeTenantId(resObj);
+ // If the write didn't encounter a TenantMigrationAborted error at all, return the result
+ // directly.
let tenantMigrationAbortedErr = extractTenantMigrationAbortedError(resObj);
- if (!tenantMigrationAbortedErr) {
+ if (numAttempts == 1 && !tenantMigrationAbortedErr) {
+ return resObj;
+ }
+
+ // Add/modify the shells's n, nModified, upserted, and writeErrors.
+ if (resObj.n) {
+ n += resObj.n;
+ }
+ if (resObj.nModified) {
+ nModified += resObj.nModified;
+ }
+ if (resObj.upserted || resObj.writeErrors) {
+ // This is an optimization to make later lookups into 'indexMap' faster, since it
+ // removes any key that is not pertinent in the current cmdObj execution.
+ indexMap = removeSuccessfulOpIndexesExceptForUpserted(resObj, indexMap);
+
+ if (resObj.upserted) {
+ for (let upsert of resObj.upserted) {
+ // Set the entry's index to the write's index in the original cmdObj.
+ upsert.index = indexMap[upsert.index];
+
+ // Track that this write resulted in an upsert.
+ upserted.push(upsert);
+
+ // This write will not need to be retried, so remove it from 'indexMap'.
+ delete indexMap[upsert.index];
+ }
+ }
+ if (resObj.writeErrors) {
+ for (let writeError of resObj.writeErrors) {
+ // If we encounter a TenantMigrationAborted error, the rest of the batch must
+ // have failed with the same code.
+ if (writeError.code === ErrorCodes.TenantMigrationAborted) {
+ break;
+ }
+
+ // Set the entry's index to the write's index in the original cmdObj.
+ writeError.index = indexMap[writeError.index];
+
+ // Track that this write resulted in a non-retryable error.
+ nonRetryableWriteErrors.push(writeError);
+
+ // This write will not need to be retried, so remove it from 'indexMap'.
+ delete indexMap[writeError.index];
+ }
+ }
+ }
+
+ if (tenantMigrationAbortedErr) {
+ modifyCmdObjForRetry(cmdObjWithTenantId, resObj);
+
+ // Build a new indexMap where the keys are the index that each write that needs to be
+ // retried will have in the next attempt's cmdObj.
+ indexMap = resetIndices(indexMap);
+
+ jsTest.log(
+ `Got TenantMigrationAborted for command against database ` +
+ `"${dbName}" with response ${tojson(resObj)} after trying ${numAttempts} times, ` +
+ `retrying the command`);
+ } else {
+ // Modify the resObj before returning the result.
+ if (resObj.n) {
+ resObj.n = n;
+ }
+ if (resObj.nModified) {
+ resObj.nModified = nModified;
+ }
+ if (upserted.length > 0) {
+ resObj.upserted = upserted;
+ }
+ if (nonRetryableWriteErrors.length > 0) {
+ resObj.writeErrors = nonRetryableWriteErrors;
+ }
return resObj;
}
- jsTest.log("Got TenantMigrationAborted after trying " + numAttempts +
- " times, retrying command " + tojson(cmdObj));
}
};
Mongo.prototype.runCommandWithMetadata = function(dbName, metadata, commandArgs) {
// Create another cmdObj from this command with TestData.tenantId prepended to all the
// applicable database names and namespaces.
- const commandArgsWithTenantId = createCmdObjWithTenantId(commandArgs);
+ const cmdObjWithTenantId = createCmdObjWithTenantId(cmdObj);
let numAttempts = 0;
+ // Keep track of the write operations that were applied.
+ let n = 0;
+ let nModified = 0;
+ let upserted = [];
+ let nonRetryableWriteErrors = [];
+
+ // 'indexMap' is a mapping from a write's index in the current cmdObj to its index in the
+ // original cmdObj.
+ let indexMap = {};
+ if (cmdObjWithTenantId.documents) {
+ for (let i = 0; i < cmdObjWithTenantId.documents.length; i++) {
+ indexMap[i] = i;
+ }
+ }
+ if (cmdObjWithTenantId.updates) {
+ for (let i = 0; i < cmdObjWithTenantId.updates.length; i++) {
+ indexMap[i] = i;
+ }
+ }
+ if (cmdObjWithTenantId.deletes) {
+ for (let i = 0; i < cmdObjWithTenantId.deletes.length; i++) {
+ indexMap[i] = i;
+ }
+ }
+
while (true) {
numAttempts++;
let resObj = originalRunCommand.apply(
@@ -239,12 +447,84 @@ Mongo.prototype.runCommandWithMetadata = function(dbName, metadata, commandArgs)
// assume the command was run against the original database.
removeTenantId(resObj);
+ // If the write didn't encounter a TenantMigrationAborted error at all, return the result
+ // directly.
let tenantMigrationAbortedErr = extractTenantMigrationAbortedError(resObj);
- if (!tenantMigrationAbortedErr) {
+ if (numAttempts == 1 && !tenantMigrationAbortedErr) {
+ return resObj;
+ }
+
+ // Add/modify the shells's n, nModified, upserted, and writeErrors.
+ if (resObj.n) {
+ n += resObj.n;
+ }
+ if (resObj.nModified) {
+ nModified += resObj.nModified;
+ }
+ if (resObj.upserted || resObj.writeErrors) {
+ // This is an optimization to make later lookups into 'indexMap' faster, since it
+ // removes any key that is not pertinent in the current cmdObj execution.
+ indexMap = removeSuccessfulOpIndexesExceptForUpserted(resObj, indexMap);
+
+ if (resObj.upserted) {
+ for (let upsert of resObj.upserted) {
+ // Set the entry's index to the write's index in the original cmdObj.
+ upsert.index = indexMap[upsert.index];
+
+ // Track that this write resulted in an upsert.
+ upserted.push(upsert);
+
+ // This write will not need to be retried, so remove it from 'indexMap'.
+ delete indexMap[upsert.index];
+ }
+ }
+ if (resObj.writeErrors) {
+ for (let writeError of resObj.writeErrors) {
+ // If we encounter a TenantMigrationAborted error, the rest of the batch must
+ // have failed with the same code.
+ if (writeError.code === ErrorCodes.TenantMigrationAborted) {
+ break;
+ }
+
+ // Set the entry's index to the write's index in the original cmdObj.
+ writeError.index = indexMap[writeError.index];
+
+ // Track that this write resulted in a non-retryable error.
+ nonRetryableWriteErrors.push(writeError);
+
+ // This write will not need to be retried, so remove it from 'indexMap'.
+ delete indexMap[writeError.index];
+ }
+ }
+ }
+
+ if (tenantMigrationAbortedErr) {
+ modifyCmdObjForRetry(cmdObjWithTenantId, resObj);
+
+ // Build a new indexMap where the keys are the index that each write that needs to be
+ // retried will have in the next attempt's cmdObj.
+ indexMap = resetIndices(indexMap);
+
+ jsTest.log(
+ `Got TenantMigrationAborted for command against database ` +
+ `"${dbName}" with response ${tojson(resObj)} after trying ${numAttempts} times, ` +
+ `retrying the command`);
+ } else {
+ // Modify the resObj before returning the result.
+ if (resObj.n) {
+ resObj.n = n;
+ }
+ if (resObj.nModified) {
+ resObj.nModified = nModified;
+ }
+ if (upserted.length > 0) {
+ resObj.upserted = upserted;
+ }
+ if (nonRetryableWriteErrors.length > 0) {
+ resObj.writeErrors = nonRetryableWriteErrors;
+ }
return resObj;
}
- jsTest.log("Got TenantMigrationAborted after trying " + numAttempts +
- " times, retrying command " + tojson(commandArgs));
}
};
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 8637c6c7fa0..bf918b79f80 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -55,7 +55,6 @@ namespace {
MONGO_FAIL_POINT_DEFINE(abortTenantMigrationAfterBlockingStarts);
MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationAfterBlockingStarts);
MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationAfterDataSync);
-MONGO_FAIL_POINT_DEFINE(skipSendingRecipientSyncDataCommand);
const std::string kTTLIndexName = "TenantMigrationDonorTTLIndex";
const Seconds kRecipientSyncDataTimeout(30);
@@ -448,9 +447,6 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi
ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS) {
- if (skipSendingRecipientSyncDataCommand.shouldFail()) {
- return ExecutorFuture<void>(**executor, Status::OK());
- }
auto opCtxHolder = cc().makeOperationContext();
auto opCtx = opCtxHolder.get();