diff options
author | Jason Zhang <jason.zhang@mongodb.com> | 2020-11-05 15:53:56 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-20 06:21:33 +0000 |
commit | 34a81def84f07db8bfff2a9a378c46328c2f8aca (patch) | |
tree | 40ce97d02becb46c023f6fd203ca291e3db94f5c | |
parent | 91a31f597533ef25964e8f831ecec51219ca6084 (diff) | |
download | mongo-34a81def84f07db8bfff2a9a378c46328c2f8aca.tar.gz |
SERVER-50494 Implement proxy’s retry logic for batch write commands
Co-authored-by: Cheahuychou Mao <mao.cheahuychou@gmail.com>
4 files changed, 583 insertions, 108 deletions
diff --git a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml index 6df777169de..f61b8e88834 100644 --- a/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/tenant_migration_jscore_passthrough.yml @@ -17,12 +17,6 @@ selector: - jstests/core/top.js # The override cannot deep copy very large or small dates. - jstests/core/index_large_and_small_dates.js - # These tests expect the profiler to observe batched write operations but batched writes are - # disabled in this suite. - - jstests/core/profile_insert.js - - jstests/core/profile_delete.js - - jstests/core/profile_findandmodify.js - - jstests/core/profile_update.js # These tests are not expected to pass with replica-sets. - jstests/core/opcounters_write_cmd.js - jstests/core/read_after_optime.js @@ -54,7 +48,38 @@ selector: - jstests/core/comment_field.js - jstests/core/invalidated_legacy_cursors.js # TODO (SERVER-51753): Handle applyOps running concurrently with a tenant migration. + - jstests/core/apply_ops1.js + - jstests/core/apply_ops1.js + - jstests/core/apply_ops2.js + - jstests/core/apply_ops_dups.js + - jstests/core/apply_ops_index_collation.js + - jstests/core/apply_ops_invalid_index_spec.js + - jstests/core/apply_ops_missing_field.js + - jstests/core/apply_ops_system_dot_views.js + - jstests/core/apply_ops_without_ns.js + - jstests/core/bypass_doc_validation.js + - jstests/core/collation.js + - jstests/core/collmod_without_uuid.js + - jstests/core/txns/commands_banning_txnnumber_outside_transactions.js + - jstests/core/txns/commands_not_allowed_in_txn.js + - jstests/core/txns/prepare_transaction_fails_on_temp_collections.js + - jstests/core/txns/statement_ids_accepted.js + - jstests/core/list_collections1.js + - jstests/core/list_collections_filter.js + - jstests/core/list_collections_no_views.js + - jstests/core/views/duplicate_ns.js + - jstests/core/views/view_with_invalid_dbname.js + - jstests/core/views/views_creation.js + - jstests/core/views/invalid_system_views.js + - jstests/core/views/views_all_commands.js - jstests/core/rename_stayTemp.js + # TODO (SERVER-52727): Synchronize cloneCollectionAsCapped with tenant migrations. + - jstests/core/capped_convertToCapped1.js + # TODO (SERVER-52866): Synchronize getLastError with tenant migrations. + - jstests/core/bulk_legacy_enforce_gle.js + # These tests run getMore commands which are not supported in the tenant migration *passthrough*. + exclude_with_any_tags: + - requires_getmore executor: archive: @@ -70,8 +95,6 @@ executor: global_vars: TestData: &TestData tenantId: "tenantMigrationTenantId" - # TODO (SERVER-50494): Implement proxy's retry logic for batch write commands. - disableBatchWrites: true readMode: commands hooks: - class: ContinuousTenantMigration @@ -85,20 +108,24 @@ executor: - class: CheckReplDBHash - class: ValidateCollections - class: CleanEveryN - # TODO (SERVER-49204): The ContinuousTenantMigration hook reuses TestData.tenantId hook for all - # migrations. Therefore, it needs to run the donorForgetMigration command after each migration - # (i.e. test) in order to make the subsequent migration not conflict with the one run by the - # previous test. Until we have the donorForgetMigration command, we need to specify n: 1 to - # clear the in-memory and persisted state for each migration. - n: 1 + n: 20 fixture: class: TenantMigrationFixture mongod_options: set_parameters: enableTestCommands: 1 + failpoint.abortTenantMigrationAfterBlockingStarts: + mode: alwaysOn + data: + blockTimeMS: 250 # TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. failpoint.returnResponseOkForRecipientSyncDataCmd: mode: alwaysOn + # Set the delay before migration state machine is garbage collected to be short to avoid + # migration conflicts since the ContinuousTenantMigration hook migrates a single tenant + # between the replica sets in the fixture. + tenantMigrationGarbageCollectionDelayMS: 100 + ttlMonitorSleepSecs: 1 num_replica_sets: 2 num_nodes_per_replica_set: 3 use_replica_set_connection_string: true diff --git a/buildscripts/resmokelib/testing/hooks/tenant_migration.py b/buildscripts/resmokelib/testing/hooks/tenant_migration.py index 5b6db427bb0..f0804758e3c 100644 --- a/buildscripts/resmokelib/testing/hooks/tenant_migration.py +++ b/buildscripts/resmokelib/testing/hooks/tenant_migration.py @@ -32,48 +32,123 @@ class ContinuousTenantMigration(interface.Hook): # pylint: disable=too-many-ins """ interface.Hook.__init__(self, hook_logger, fixture, ContinuousTenantMigration.DESCRIPTION) - self._tenant_id = shell_options["global_vars"]["TestData"]["tenantId"] - if not isinstance(fixture, tenant_migration.TenantMigrationFixture) or \ - fixture.get_num_replica_sets() != 2: - raise ValueError( - "The ContinuousTenantMigration hook requires a TenantMigrationFixture with two replica sets" - ) + if not isinstance(fixture, tenant_migration.TenantMigrationFixture): + raise ValueError("The ContinuousTenantMigration hook requires a TenantMigrationFixture") self._tenant_migration_fixture = fixture + self._tenant_id = shell_options["global_vars"]["TestData"]["tenantId"] self._tenant_migration_thread = None def before_suite(self, test_report): """Before suite.""" - # TODO (SERVER-50496): Make the hook start the migration thread once here instead of inside - # before_test and make it run migrations continuously back and forth between the two replica - # sets. if not self._tenant_migration_fixture: - raise ValueError("No replica set pair to run migrations on") + raise ValueError("No TenantMigrationFixture to run migrations on") + self.logger.info("Starting the tenant migration thread.") + self._tenant_migration_thread = _TenantMigrationThread( + self.logger, self._tenant_migration_fixture, self._tenant_id) + self._tenant_migration_thread.start() def after_suite(self, test_report): """After suite.""" - return + self.logger.info("Stopping the tenant migration thread.") + self._tenant_migration_thread.stop() + self.logger.info("Stopped the tenant migration thread.") def before_test(self, test, test_report): """Before test.""" - self.logger.info("Starting the migration thread.") - self._tenant_migration_thread = _TenantMigrationThread( - self.logger, self._tenant_migration_fixture, self._tenant_id) - self._tenant_migration_thread.start() + self.logger.info("Resuming the tenant migration thread.") + self._tenant_migration_thread.resume() def after_test(self, test, test_report): """After test.""" - self.logger.info("Stopping the migration thread.") - self._tenant_migration_thread.stop() - self.logger.info("migration thread stopped.") + self.logger.info("Pausing the tenant migration thread.") + self._tenant_migration_thread.pause() + self.logger.info("Paused the tenant migration thread.") + + +class TenantMigrationLifeCycle(object): + """Class for managing the various states of the tenant migration thread. + + The job thread alternates between calling mark_test_started() and mark_test_finished(). The + tenant migration thread is allowed to perform migrations at any point between these two calls. + Note that the job thread synchronizes with the tenant migration thread outside the context of + this object to know it isn't in the process of running a migration. + """ + + _TEST_STARTED_STATE = "start" + _TEST_FINISHED_STATE = "finished" + + def __init__(self): + """Initialize the MigrationLifecycle instance.""" + self.__lock = threading.Lock() + self.__cond = threading.Condition(self.__lock) + + self.__test_state = self._TEST_FINISHED_STATE + self.__should_stop = False + + def mark_test_started(self): + """Signal to the tenant migration thread that a new test has started. + + This function should be called during before_test(). Calling it causes the + wait_for_tenant_migration_permitted() function to no longer block and to instead return + true. + """ + with self.__lock: + self.__test_state = self._TEST_STARTED_STATE + self.__cond.notify_all() + + def mark_test_finished(self): + """Signal to the tenant migration thread that the current test has finished. + + This function should be called during after_test(). Calling it causes the + wait_for_tenant_migration_permitted() function to block until mark_test_started() is called + again. + """ + with self.__lock: + self.__test_state = self._TEST_FINISHED_STATE + self.__cond.notify_all() + + def stop(self): + """Signal to the tenant migration thread that it should exit. + + This function should be called during after_suite(). Calling it causes the + wait_for_tenant_migration_permitted() function to no longer block and to instead return + false. + """ + with self.__lock: + self.__should_stop = True + self.__cond.notify_all() + + def wait_for_tenant_migration_permitted(self): + """Block until migrations are permitted, or until stop() is called. + + Return true if migrations are permitted, and false if migrations are not permitted. + """ + with self.__lock: + while not self.__should_stop: + if self.__test_state == self._TEST_STARTED_STATE: + return True + + self.__cond.wait() + + return False + + def wait_for_tenant_migration_interval(self, timeout): + """Block for 'timeout' seconds, or until stop() is called.""" + with self.__lock: + self.__cond.wait(timeout) + + def poll_for_idle_request(self): # noqa: D205,D400 + """Return true if the tenant migration thread should continue running migrations, or false + if it should temporarily stop running migrations. + """ + with self.__lock: + return self.__test_state == self._TEST_FINISHED_STATE class _TenantMigrationThread(threading.Thread): # pylint: disable=too-many-instance-attributes - MIN_START_MIGRATION_DELAY_SECS = 0.1 - MAX_START_MIGRATION_DELAY_SECS = 0.25 - MIN_BLOCK_TIME_SECS = 1 - MAX_BLOCK_TIME_SECS = 2.5 - DONOR_START_MIGRATION_POLL_INTERVAL_SECS = 0.1 + MAX_MIGRATION_INTERVAL_SECS = 5 + MIGRATION_STATE_POLL_INTERVAL_SECS = 0.1 def __init__(self, logger, tenant_migration_fixture, tenant_id): """Initialize _TenantMigrationThread.""" @@ -83,106 +158,203 @@ class _TenantMigrationThread(threading.Thread): # pylint: disable=too-many-inst self._tenant_migration_fixture = tenant_migration_fixture self._tenant_id = tenant_id + self.__lifecycle = TenantMigrationLifeCycle() self._last_exec = time.time() + # Event set when the thread has been stopped using the 'stop()' method. + self._is_stopped_evt = threading.Event() + # Event set when the thread is not performing migrations. + self._is_idle_evt = threading.Event() + self._is_idle_evt.set() def run(self): """Execute the thread.""" if not self._tenant_migration_fixture: - self.logger.warning("No replica set pair to run migrations on.") + self.logger.warning("No TenantMigrationFixture to run migrations on.") return + donor_rs = self._tenant_migration_fixture.get_replset(0) + recipient_rs = self._tenant_migration_fixture.get_replset(1) + try: - now = time.time() - self.logger.info("Starting a tenant migration for tenantId '%s'", self._tenant_id) - self._run_migration(self._tenant_migration_fixture) - self._last_exec = time.time() - self.logger.info("Completed a tenant migration in %0d ms", - (self._last_exec - now) * 1000) + while True: + self._is_idle_evt.set() + + permitted = self.__lifecycle.wait_for_tenant_migration_permitted() + if not permitted: + break + + self._is_idle_evt.clear() + + self.logger.info("Starting tenant migration for tenant id '%s' from %s to %s.", + self._tenant_id, donor_rs.replset_name, recipient_rs.replset_name) + now = time.time() + self._run_migration(donor_rs, recipient_rs) + self._last_exec = time.time() + self.logger.info( + "Completed tenant migration in %0d ms for tenant id '%s' from %s to %s.", + (self._last_exec - now) * 1000, self._tenant_id, donor_rs.replset_name, + recipient_rs.replset_name) + + found_idle_request = self.__lifecycle.poll_for_idle_request() + if found_idle_request: + continue + + # The 'wait_secs' is used to wait [0.1, 1.0] * MAX_MIGRATION_INTERVAL_SECS from the moment the last + # tenant migration completed. We want to sometimes wait for a larger interval to allow long-running + # commands, like large batched writes, to complete without continuously conflicting with a + # migration. + now = time.time() + wait_secs = max( + 0, + random.uniform(0.1, 1.0) * _TenantMigrationThread.MAX_MIGRATION_INTERVAL_SECS - + (now - self._last_exec)) + self.__lifecycle.wait_for_tenant_migration_interval(wait_secs) except Exception: # pylint: disable=W0703 - # Proactively log the exception when it happens so it will be - # flushed immediately. - self.logger.exception("Migration Thread threw exception") + # Proactively log the exception when it happens so it will be flushed immediately. + self.logger.exception("Tenant migration thread threw exception") + # The event should be signaled whenever the thread is not performing migrations. + self._is_idle_evt.set() def stop(self): """Stop the thread.""" + self.__lifecycle.stop() + self._is_stopped_evt.set() + # Unpause to allow the thread to finish. + self.resume() self.join() - def _enable_abort(self, donor_primary_client, donor_primary_port, donor_primary_rs_name): - # Configure the failpoint to make the migration abort after the migration has been - # blocking reads and writes for a randomly generated number of milliseconds - # (< MAX_BLOCK_TIME_MILLISECS). Must be called with _disable_abort at the start and - # end of each test so that each test uses its own randomly generated block time. - try: - donor_primary_client.admin.command( - bson.SON( - [("configureFailPoint", "abortTenantMigrationAfterBlockingStarts"), - ("mode", "alwaysOn"), - ("data", - bson.SON( - [("blockTimeMS", - 1000 * random.uniform(_TenantMigrationThread.MIN_BLOCK_TIME_SECS, - _TenantMigrationThread.MAX_BLOCK_TIME_SECS))]))])) - except pymongo.errors.OperationFailure as err: - self.logger.exception( - "Unable to enable the failpoint to make migrations abort on donor primary on port " - + "%d of replica set '%s'.", donor_primary_port, donor_primary_rs_name) + def pause(self): + """Pause the thread.""" + self.__lifecycle.mark_test_finished() + + # Wait until we are no longer executing migrations. + self._is_idle_evt.wait() + # Check if the thread is alive in case it has thrown an exception while running. + self._check_thread() + + # Check that the fixture is still running. + if not self._tenant_migration_fixture.is_running(): raise errors.ServerFailure( - "Unable to enable the failpoint to make migrations abort on donor primary on port " - + "{} of replica set '{}': {}".format(donor_primary_port, donor_primary_rs_name, - err.args[0])) + "TenantMigrationFixture with pids {} expected to be running in" + " ContinuousTenantMigration, but wasn't".format( + self._tenant_migration_fixture.pids())) + + def resume(self): + """Resume the thread.""" + self.__lifecycle.mark_test_started() + + def _wait(self, timeout): + """Wait until stop or timeout.""" + self._is_stopped_evt.wait(timeout) + + def _check_thread(self): + """Throw an error if the thread is not running.""" + if not self.is_alive(): + msg = "The tenant migration thread is not running." + self.logger.error(msg) + raise errors.ServerFailure(msg) + + def _wait_for_migration_garbage_collection(self, rs): + primary = rs.get_primary() + primary_client = primary.mongo_client() - def _disable_abort(self, donor_primary_client, donor_primary_port, donor_primary_rs_name): try: - donor_primary_client.admin.command( - bson.SON([("configureFailPoint", "abortTenantMigrationAfterBlockingStarts"), - ("mode", "off")])) - except pymongo.errors.OperationFailure as err: + while True: + res = primary_client.config.command( + {"count": "tenantMigrationDonors", "query": {"tenantId": self._tenant_id}}) + if res["n"] == 0: + break + except pymongo.errors.PyMongoError: self.logger.exception( - "Unable to disable the failpoint to make migrations abort on donor primary on port " - + "%d of replica set '%s'.", donor_primary_port, donor_primary_rs_name) - raise errors.ServerFailure( - "Unable to disable the failpoint to make migrations abort on donor primary on port " - + "{} of replica set '{}': {}".format(donor_primary_port, donor_primary_rs_name, - err.args[0])) + "Error waiting for tenant migration for tenant id '%s' on primary on" + + " port %d of replica set '%s' to be garbage collection.", self._tenant_id, + primary.port, rs.replset_name) + raise - def _run_migration(self, tenant_migration_fixture): - donor_rs = tenant_migration_fixture.get_replset(0) - recipient_rs = tenant_migration_fixture.get_replset(1) + def _drop_tenant_databases(self, rs): + primary = rs.get_primary() + primary_client = primary.mongo_client() + try: + res = primary_client.admin.command({"listDatabases": 1}) + for database in res["databases"]: + db_name = database["name"] + if db_name.startswith(self._tenant_id + "_"): + primary_client.drop_database(db_name) + except pymongo.errors.PyMongoError: + self.logger.exception( + "Error dropping databases for tenant id '%s' on primary on" + + " port %d of replica set '%s' to be garbage collection.", self._tenant_id, + primary.port, rs.replset_name) + raise + + def _forget_migration(self, donor_rs, migration_id): donor_primary = donor_rs.get_primary() donor_primary_client = donor_primary.mongo_client() - time.sleep( - random.uniform(_TenantMigrationThread.MIN_START_MIGRATION_DELAY_SECS, - _TenantMigrationThread.MAX_START_MIGRATION_DELAY_SECS)) + self.logger.info( + "Forgeting tenant migration with donor primary on port %d of replica set '%s'.", + donor_primary.port, donor_rs.replset_name) + + try: + donor_primary_client.admin.command( + {"donorForgetMigration": 1, "migrationId": migration_id}, + bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE)) + except pymongo.errors.PyMongoError: + self.logger.exception( + "Error forgetting tenant migration with donor primary on port %d of replica set '%s'.", + donor_primary.port, donor_rs.replset_name) + raise + + def _run_migration(self, donor_rs, recipient_rs): + donor_primary = donor_rs.get_primary() + donor_primary_client = donor_primary.mongo_client() self.logger.info( - "Starting a tenant migration with donor primary on port %d of replica set '%s'.", + "Starting tenant migration with donor primary on port %d of replica set '%s'.", donor_primary.port, donor_rs.replset_name) + migration_id = bson.Binary(uuid.uuid4().bytes, 4) cmd_obj = { - "donorStartMigration": 1, "migrationId": bson.Binary(uuid.uuid4().bytes, 4), + "donorStartMigration": 1, "migrationId": migration_id, "recipientConnectionString": recipient_rs.get_driver_connection_url(), "tenantId": self._tenant_id, "readPreference": {"mode": "primary"} } try: - self._enable_abort(donor_primary_client, donor_primary.port, donor_rs.replset_name) - while True: - # Keep polling the migration state until the migration completes, otherwise we might - # end up disabling 'abortTenantMigrationAfterBlockingStarts' before the migration - # enters the blocking state and aborts. + # Keep polling the migration state until the migration completes, otherwise we + # might end up disabling 'pauseTenantMigrationAfterBlockingStartsWithTimeout' + # before the tenant migration enters the blocking state and aborts. res = donor_primary_client.admin.command( cmd_obj, bson.codec_options.CodecOptions(uuid_representation=bson.binary.UUID_SUBTYPE)) - if (not res["ok"] or res["state"] == "committed" or res["state"] == "aborted"): + + if res["state"] == "committed": + # TODO (SERVER-50495): Make tenant_migration_jscore_passthrough simulate a + # migration that commits. + errors.ServerFailure("Tenant migration with donor primary on port " + + str(donor_primary.port) + " of replica set '" + + donor_rs.replset_name + "' has committed.") + elif res["state"] == "aborted": + self.logger.info("Tenant migration with donor primary on port " + + str(donor_primary.port) + " of replica set '" + + donor_rs.replset_name + "' has aborted: " + str(res)) break - time.sleep(_TenantMigrationThread.DONOR_START_MIGRATION_POLL_INTERVAL_SECS) + elif not res["ok"]: + errors.ServerFailure("Tenant migration with donor primary on port " + + str(donor_primary.port) + " of replica set '" + + donor_rs.replset_name + "' has failed: " + str(res)) + + time.sleep(_TenantMigrationThread.MIGRATION_STATE_POLL_INTERVAL_SECS) + + self._forget_migration(donor_rs, migration_id) + # Wait for the donor to garbage the migration state. + self._wait_for_migration_garbage_collection(donor_rs) + # Drop any tenant databases that the recipient cloned during the migration. + self._drop_tenant_databases(recipient_rs) except pymongo.errors.PyMongoError: self.logger.exception( "Error running tenant migration with donor primary on port %d of replica set '%s'.", donor_primary.port, donor_rs.replset_name) raise - finally: - self._disable_abort(donor_primary_client, donor_primary.port, donor_rs.replset_name) diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js index 1b4840d8d0e..83752915a27 100644 --- a/jstests/libs/override_methods/inject_tenant_prefix.js +++ b/jstests/libs/override_methods/inject_tenant_prefix.js @@ -197,6 +197,92 @@ function extractTenantMigrationAbortedError(resObj) { } return null; } +/** + * If the command was a batch command where some of the operations failed, modifies the command + * object so that only failed operations are retried. + */ +function modifyCmdObjForRetry(cmdObj, resObj) { + if (cmdObj.insert) { + let retryOps = []; + if (cmdObj.ordered) { + retryOps = cmdObj.documents.slice(resObj.writeErrors[0].index); + } else { + for (let writeError of resObj.writeErrors) { + if (writeError.code == ErrorCodes.TenantMigrationAborted) { + retryOps.push(cmdObj.documents[writeError.index]); + } + } + } + cmdObj.documents = retryOps; + } + + // findAndModify may also have an update field, but is not a batched command. + if (cmdObj.update && !cmdObj.findAndModify && !cmdObj.findandmodify) { + let retryOps = []; + if (cmdObj.ordered) { + retryOps = cmdObj.updates.slice(resObj.writeErrors[0].index); + } else { + for (let writeError of resObj.writeErrors) { + if (writeError.code == ErrorCodes.TenantMigrationAborted) { + retryOps.push(cmdObj.updates[writeError.index]); + } + } + } + cmdObj.updates = retryOps; + } + + if (cmdObj.delete) { + let retryOps = []; + if (cmdObj.ordered) { + retryOps = cmdObj.deletes.slice(resObj.writeErrors[0].index); + } else { + for (let writeError of resObj.writeErrors) { + if (writeError.code == ErrorCodes.TenantMigrationAborted) { + retryOps.push(cmdObj.deletes[writeError.index]); + } + } + } + cmdObj.deletes = retryOps; + } +} + +/** + * Sets the keys of the given index map to consecutive non-negative integers starting from 0. + */ +function resetIndices(indexMap) { + let newIndexMap = {}; + Object.keys(indexMap).map((key, index) => { + newIndexMap[index] = indexMap[key]; + }); + return newIndexMap; +} + +function toIndexSet(indexedDocs) { + let set = new Set(); + if (indexedDocs) { + for (let doc of indexedDocs) { + set.add(doc.index); + } + } + return set; +} + +/** + * Remove the indices for non-upsert writes that succeeded. + */ +function removeSuccessfulOpIndexesExceptForUpserted(resObj, indexMap) { + // Optimization to only look through the indices in a set rather than in an array. + let indexSetForUpserted = toIndexSet(resObj.upserted); + let indexSetForWriteErrors = toIndexSet(resObj.writeErrors); + + for (let index in Object.keys(indexMap)) { + if ((!indexSetForUpserted.has(parseInt(index)) && + !indexSetForWriteErrors.has(parseInt(index)))) { + delete indexMap[index]; + } + } + return indexMap; +} Mongo.prototype.runCommand = function(dbName, cmdObj, options) { // Create another cmdObj from this command with TestData.tenantId prepended to all the @@ -205,6 +291,31 @@ Mongo.prototype.runCommand = function(dbName, cmdObj, options) { let numAttempts = 0; + // Keep track of the write operations that were applied. + let n = 0; + let nModified = 0; + let upserted = []; + let nonRetryableWriteErrors = []; + + // 'indexMap' is a mapping from a write's index in the current cmdObj to its index in the + // original cmdObj. + let indexMap = {}; + if (cmdObjWithTenantId.documents) { + for (let i = 0; i < cmdObjWithTenantId.documents.length; i++) { + indexMap[i] = i; + } + } + if (cmdObjWithTenantId.updates) { + for (let i = 0; i < cmdObjWithTenantId.updates.length; i++) { + indexMap[i] = i; + } + } + if (cmdObjWithTenantId.deletes) { + for (let i = 0; i < cmdObjWithTenantId.deletes.length; i++) { + indexMap[i] = i; + } + } + while (true) { numAttempts++; let resObj = originalRunCommand.apply( @@ -214,22 +325,119 @@ Mongo.prototype.runCommand = function(dbName, cmdObj, options) { // assume the command was run against the original database. removeTenantId(resObj); + // If the write didn't encounter a TenantMigrationAborted error at all, return the result + // directly. let tenantMigrationAbortedErr = extractTenantMigrationAbortedError(resObj); - if (!tenantMigrationAbortedErr) { + if (numAttempts == 1 && !tenantMigrationAbortedErr) { + return resObj; + } + + // Add/modify the shells's n, nModified, upserted, and writeErrors. + if (resObj.n) { + n += resObj.n; + } + if (resObj.nModified) { + nModified += resObj.nModified; + } + if (resObj.upserted || resObj.writeErrors) { + // This is an optimization to make later lookups into 'indexMap' faster, since it + // removes any key that is not pertinent in the current cmdObj execution. + indexMap = removeSuccessfulOpIndexesExceptForUpserted(resObj, indexMap); + + if (resObj.upserted) { + for (let upsert of resObj.upserted) { + // Set the entry's index to the write's index in the original cmdObj. + upsert.index = indexMap[upsert.index]; + + // Track that this write resulted in an upsert. + upserted.push(upsert); + + // This write will not need to be retried, so remove it from 'indexMap'. + delete indexMap[upsert.index]; + } + } + if (resObj.writeErrors) { + for (let writeError of resObj.writeErrors) { + // If we encounter a TenantMigrationAborted error, the rest of the batch must + // have failed with the same code. + if (writeError.code === ErrorCodes.TenantMigrationAborted) { + break; + } + + // Set the entry's index to the write's index in the original cmdObj. + writeError.index = indexMap[writeError.index]; + + // Track that this write resulted in a non-retryable error. + nonRetryableWriteErrors.push(writeError); + + // This write will not need to be retried, so remove it from 'indexMap'. + delete indexMap[writeError.index]; + } + } + } + + if (tenantMigrationAbortedErr) { + modifyCmdObjForRetry(cmdObjWithTenantId, resObj); + + // Build a new indexMap where the keys are the index that each write that needs to be + // retried will have in the next attempt's cmdObj. + indexMap = resetIndices(indexMap); + + jsTest.log( + `Got TenantMigrationAborted for command against database ` + + `"${dbName}" with response ${tojson(resObj)} after trying ${numAttempts} times, ` + + `retrying the command`); + } else { + // Modify the resObj before returning the result. + if (resObj.n) { + resObj.n = n; + } + if (resObj.nModified) { + resObj.nModified = nModified; + } + if (upserted.length > 0) { + resObj.upserted = upserted; + } + if (nonRetryableWriteErrors.length > 0) { + resObj.writeErrors = nonRetryableWriteErrors; + } return resObj; } - jsTest.log("Got TenantMigrationAborted after trying " + numAttempts + - " times, retrying command " + tojson(cmdObj)); } }; Mongo.prototype.runCommandWithMetadata = function(dbName, metadata, commandArgs) { // Create another cmdObj from this command with TestData.tenantId prepended to all the // applicable database names and namespaces. - const commandArgsWithTenantId = createCmdObjWithTenantId(commandArgs); + const cmdObjWithTenantId = createCmdObjWithTenantId(cmdObj); let numAttempts = 0; + // Keep track of the write operations that were applied. + let n = 0; + let nModified = 0; + let upserted = []; + let nonRetryableWriteErrors = []; + + // 'indexMap' is a mapping from a write's index in the current cmdObj to its index in the + // original cmdObj. + let indexMap = {}; + if (cmdObjWithTenantId.documents) { + for (let i = 0; i < cmdObjWithTenantId.documents.length; i++) { + indexMap[i] = i; + } + } + if (cmdObjWithTenantId.updates) { + for (let i = 0; i < cmdObjWithTenantId.updates.length; i++) { + indexMap[i] = i; + } + } + if (cmdObjWithTenantId.deletes) { + for (let i = 0; i < cmdObjWithTenantId.deletes.length; i++) { + indexMap[i] = i; + } + } + while (true) { numAttempts++; let resObj = originalRunCommand.apply( @@ -239,12 +447,84 @@ Mongo.prototype.runCommandWithMetadata = function(dbName, metadata, commandArgs) // assume the command was run against the original database. removeTenantId(resObj); + // If the write didn't encounter a TenantMigrationAborted error at all, return the result + // directly. let tenantMigrationAbortedErr = extractTenantMigrationAbortedError(resObj); - if (!tenantMigrationAbortedErr) { + if (numAttempts == 1 && !tenantMigrationAbortedErr) { + return resObj; + } + + // Add/modify the shells's n, nModified, upserted, and writeErrors. + if (resObj.n) { + n += resObj.n; + } + if (resObj.nModified) { + nModified += resObj.nModified; + } + if (resObj.upserted || resObj.writeErrors) { + // This is an optimization to make later lookups into 'indexMap' faster, since it + // removes any key that is not pertinent in the current cmdObj execution. + indexMap = removeSuccessfulOpIndexesExceptForUpserted(resObj, indexMap); + + if (resObj.upserted) { + for (let upsert of resObj.upserted) { + // Set the entry's index to the write's index in the original cmdObj. + upsert.index = indexMap[upsert.index]; + + // Track that this write resulted in an upsert. + upserted.push(upsert); + + // This write will not need to be retried, so remove it from 'indexMap'. + delete indexMap[upsert.index]; + } + } + if (resObj.writeErrors) { + for (let writeError of resObj.writeErrors) { + // If we encounter a TenantMigrationAborted error, the rest of the batch must + // have failed with the same code. + if (writeError.code === ErrorCodes.TenantMigrationAborted) { + break; + } + + // Set the entry's index to the write's index in the original cmdObj. + writeError.index = indexMap[writeError.index]; + + // Track that this write resulted in a non-retryable error. + nonRetryableWriteErrors.push(writeError); + + // This write will not need to be retried, so remove it from 'indexMap'. + delete indexMap[writeError.index]; + } + } + } + + if (tenantMigrationAbortedErr) { + modifyCmdObjForRetry(cmdObjWithTenantId, resObj); + + // Build a new indexMap where the keys are the index that each write that needs to be + // retried will have in the next attempt's cmdObj. + indexMap = resetIndices(indexMap); + + jsTest.log( + `Got TenantMigrationAborted for command against database ` + + `"${dbName}" with response ${tojson(resObj)} after trying ${numAttempts} times, ` + + `retrying the command`); + } else { + // Modify the resObj before returning the result. + if (resObj.n) { + resObj.n = n; + } + if (resObj.nModified) { + resObj.nModified = nModified; + } + if (upserted.length > 0) { + resObj.upserted = upserted; + } + if (nonRetryableWriteErrors.length > 0) { + resObj.writeErrors = nonRetryableWriteErrors; + } return resObj; } - jsTest.log("Got TenantMigrationAborted after trying " + numAttempts + - " times, retrying command " + tojson(commandArgs)); } }; diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 8637c6c7fa0..bf918b79f80 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -55,7 +55,6 @@ namespace { MONGO_FAIL_POINT_DEFINE(abortTenantMigrationAfterBlockingStarts); MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationAfterBlockingStarts); MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationAfterDataSync); -MONGO_FAIL_POINT_DEFINE(skipSendingRecipientSyncDataCommand); const std::string kTTLIndexName = "TenantMigrationDonorTTLIndex"; const Seconds kRecipientSyncDataTimeout(30); @@ -448,9 +447,6 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand( std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS) { - if (skipSendingRecipientSyncDataCommand.shouldFail()) { - return ExecutorFuture<void>(**executor, Status::OK()); - } auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); |