diff options
author | auto-revert-processor <dev-prod-dag@mongodb.com> | 2023-04-20 02:55:12 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-20 03:42:38 +0000 |
commit | 77c517c92f0a24ae67e1a5949882570da9a98934 (patch) | |
tree | ab209a582085ca3be546bb5004e22fdd1fac687a | |
parent | 46fbe5b380dfaa32ee22b137162d90db9debd23e (diff) | |
download | mongo-77c517c92f0a24ae67e1a5949882570da9a98934.tar.gz |
Revert "SERVER-75470 Reject change stream getMore commands after a split or merge has been committed"
This reverts commit d674d78896b572797df816a093ab8682ab860eca.
12 files changed, 11 insertions, 136 deletions
diff --git a/jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js b/jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js index 7a6b5b86deb..2a816b1ee50 100644 --- a/jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js +++ b/jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js @@ -185,26 +185,6 @@ donorSession2.commitTransaction_forTesting(); fpBeforeMarkingCloneSuccess.off(); TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); - -// Test that running a getMore on a change stream cursor after the migration commits throws a -// resumable change stream exception. -const failedGetMore = donorTenantConn2.getDB("database").runCommand("getMore", { - getMore: donorCursor2._cursorid, - collection: "collection" -}); -assert.commandFailedWithCode( - failedGetMore, - ErrorCodes.ResumeTenantChangeStream, - "Tailing a change stream on the donor after completion of a shard merge should fail."); -assert(failedGetMore.hasOwnProperty("errorLabels")); -assert.contains("ResumableChangeStreamError", failedGetMore.errorLabels); - -// The cursor should have been deleted after the error so a getMore should fail. -assert.commandFailedWithCode( - donorTenantConn2.getDB("database") - .runCommand("getMore", {getMore: donorCursor2._cursorid, collection: "collection"}), - ErrorCodes.CursorNotFound); - assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString)); tenantMigrationTest.waitForMigrationGarbageCollection(migrationUuid, tenantIds[0]); diff --git a/jstests/serverless/shard_split_change_collections_test.js b/jstests/serverless/shard_split_change_collections_test.js index d7c647ec469..8279b71a68e 100644 --- a/jstests/serverless/shard_split_change_collections_test.js +++ b/jstests/serverless/shard_split_change_collections_test.js @@ -21,21 +21,12 @@ const donorTenantConn = ChangeStreamMultitenantReplicaSetTest.getTenantConnection(donorPrimary.host, tenantIds[0]); test.donor.setChangeStreamState(donorTenantConn, true); -const donorNonMovingTenantConn = - ChangeStreamMultitenantReplicaSetTest.getTenantConnection(donorPrimary.host, ObjectId()); -test.donor.setChangeStreamState(donorNonMovingTenantConn, true); -const donorNonMovingCursor = donorNonMovingTenantConn.getDB("database").collection.watch(); - // Open a change stream and insert documents into database.collection before the split // starts. const donorCursor = donorTenantConn.getDB("database").collection.watch([]); const insertedDocs = [{_id: "tenant1_1"}, {_id: "tenant1_2"}, {_id: "tenant1_3"}]; donorTenantConn.getDB("database").collection.insertMany(insertedDocs); -// Start up a cursor to check if we can getMore after the tenant has been migrated and change -// collection is dropped. -const donorCursor2 = donorTenantConn.getDB("database").collection.watch([]); - const donorTenantSession = donorTenantConn.startSession({retryWrites: true}); const donorTenantSessionCollection = donorTenantSession.getDatabase("database").collection; assert.commandWorked(donorTenantSessionCollection.insert({_id: "tenant1_4", w: "RETRYABLE"})); @@ -63,49 +54,18 @@ const operation = test.createSplitOperation(tenantIds); assert.commandWorked(operation.commit()); assertMigrationState(donorPrimary, operation.migrationId, "committed"); -// Test that we cannot open a new change stream after the tenant has been migrated. -assert.commandFailedWithCode( - donorTenantConn.getDB("database") - .runCommand({aggregate: "collection", cursor: {}, pipeline: [{$changeStream: {}}]}), - ErrorCodes.TenantMigrationCommitted, - "Opening a change stream on the donor after completion of a shard split should fail."); - -// Test change stream cursor behavior on the donor for a tenant which was migrated, and for one -// which remains on the donor. -assert.commandWorked( - donorNonMovingTenantConn.getDB("database") - .runCommand("getMore", {getMore: donorNonMovingCursor._cursorid, collection: "collection"}), - "Tailing a change stream for a tenant that wasn't moved by a split" + - "should not be blocked after the split was committed"); - -// Test that running a getMore on a change stream cursor after the migration commits throws a -// resumable change stream exception. -const failedGetMore = donorTenantConn.getDB("database").runCommand("getMore", { - getMore: donorCursor._cursorid, - collection: "collection" -}); -assert.commandFailedWithCode( - failedGetMore, - ErrorCodes.ResumeTenantChangeStream, - "Tailing a change stream on the donor after completion of a shard split should fail."); -assert(failedGetMore.hasOwnProperty("errorLabels")); -assert.contains("ResumableChangeStreamError", failedGetMore.errorLabels); - -// The cursor should have been deleted after the error so a getMore should fail. -assert.commandFailedWithCode( - donorTenantConn.getDB("database") - .runCommand("getMore", {getMore: donorCursor._cursorid, collection: "collection"}), - ErrorCodes.CursorNotFound); +let errCode; +try { + donorTenantConn.getDB("database").collection.watch([]); +} catch (err) { + errCode = err.code; +} +assert.eq(errCode, + ErrorCodes.TenantMigrationCommitted, + "Opening a change stream on the donor after completion of a shard split should fail."); operation.forget(); -// getMore cursor to check if we can getMore after the database is dropped. -donorTenantSession.getDatabase("config")["system.change_collection"].drop(); -assert.commandFailedWithCode( - donorTenantConn.getDB("database") - .runCommand("getMore", {getMore: donorCursor2._cursorid, collection: "collection"}), - ErrorCodes.QueryPlanKilled); - const recipientRst = test.getRecipient(); const recipientPrimary = recipientRst.getPrimary(); diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index fddc067a20d..ff48a9c4fac 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -529,8 +529,6 @@ error_codes: - {code: 399, name: MovePrimaryRecipientPastAbortableStage, categories: [InternalOnly]} - - {code: 400, name: ResumeTenantChangeStream} - # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) @@ -560,3 +558,4 @@ error_codes: - {code: 50768,name: NotARetryableWriteCommand} - {code: 50915,name: BackupCursorOpenConflictWithCheckpoint, categories: [RetriableError]} - {code: 56846,name: ConfigServerUnreachable} + diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 796f4efd621..40e307e999c 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -56,7 +56,6 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/speculative_majority_read_info.h" -#include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/resource_consumption_metrics.h" @@ -589,12 +588,6 @@ public: curOp->setGenericCursor_inlock(cursorPin->toGenericCursor()); } - // If this is a change stream cursor, check whether the tenant has migrated elsewhere. - if (cursorPin->getExecutor()->getPostBatchResumeToken()["_data"]) { - tenant_migration_access_blocker::assertCanGetMoreChangeStream(opCtx, - _cmd.getDbName()); - } - // If the 'failGetMoreAfterCursorCheckout' failpoint is enabled, throw an exception with // the given 'errorCode' value, or ErrorCodes::InternalError if 'errorCode' is omitted. failGetMoreAfterCursorCheckout.executeIf( diff --git a/src/mongo/db/error_labels.cpp b/src/mongo/db/error_labels.cpp index fefac7ba52f..96e1b05ede6 100644 --- a/src/mongo/db/error_labels.cpp +++ b/src/mongo/db/error_labels.cpp @@ -105,8 +105,7 @@ bool ErrorLabelBuilder::isResumableChangeStreamError() const { (_commandName == "aggregate" || _commandName == "getMore") && _code && !_wcCode && (ErrorCodes::isRetriableError(*_code) || ErrorCodes::isNetworkError(*_code) || ErrorCodes::isNeedRetargettingError(*_code) || _code == ErrorCodes::RetryChangeStream || - _code == ErrorCodes::FailedToSatisfyReadPreference || - _code == ErrorCodes::ResumeTenantChangeStream); + _code == ErrorCodes::FailedToSatisfyReadPreference); // If the command or exception is not relevant, bail out early. if (!mayNeedResumableChangeStreamErrorLabel) { diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.h b/src/mongo/db/repl/tenant_migration_access_blocker.h index 896f92607a0..0ff8928d5c7 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker.h +++ b/src/mongo/db/repl/tenant_migration_access_blocker.h @@ -72,11 +72,6 @@ public: // virtual Status checkIfCanBuildIndex() = 0; - /** - * Checks if getMores for change streams should fail. - */ - virtual Status checkIfCanGetMoreChangeStream() = 0; - // We suspend TTL deletions at the recipient side to avoid the race when a document is updated // at the donor side, which may prevent it from being garbage collected by TTL, while the // recipient side document is deleted by the TTL. The donor side update will fail to propagate diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp index 5372a209a7c..1f16deea8f6 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp @@ -529,17 +529,6 @@ Status checkIfCanBuildIndex(OperationContext* opCtx, const DatabaseName& dbName) return Status::OK(); } -void assertCanGetMoreChangeStream(OperationContext* opCtx, const DatabaseName& dbName) { - // We only block change stream getMores on the donor. - auto mtab = TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) - .getTenantMigrationAccessBlockerForDbName(dbName, MtabType::kDonor); - if (mtab) { - auto status = mtab->checkIfCanGetMoreChangeStream(); - mtab->recordTenantMigrationError(status); - uassertStatusOK(status); - } -} - bool hasActiveTenantMigration(OperationContext* opCtx, const DatabaseName& dbName) { if (dbName.db().empty()) { return false; diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.h b/src/mongo/db/repl/tenant_migration_access_blocker_util.h index a86aa35ce4a..fc3a9035ca5 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.h +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.h @@ -109,11 +109,6 @@ void checkIfCanWriteOrThrow(OperationContext* opCtx, const DatabaseName& dbName, Status checkIfCanBuildIndex(OperationContext* opCtx, const DatabaseName& dbName); /** - * Asserts if getMores for change streams should fail. - */ -void assertCanGetMoreChangeStream(OperationContext* opCtx, const DatabaseName& dbName); - -/** * Returns true if there is either a donor or recipient access blocker for the given dbName. */ bool hasActiveTenantMigration(OperationContext* opCtx, const DatabaseName& dbName); diff --git a/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp index 01d73378e94..a4e45fdbc91 100644 --- a/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp @@ -224,22 +224,6 @@ Status TenantMigrationDonorAccessBlocker::checkIfCanBuildIndex() { MONGO_UNREACHABLE; } -Status TenantMigrationDonorAccessBlocker::checkIfCanGetMoreChangeStream() { - stdx::lock_guard<Latch> lg(_mutex); - switch (_state.getState()) { - case BlockerState::State::kAllow: - case BlockerState::State::kBlockWrites: - case BlockerState::State::kBlockWritesAndReads: - return Status::OK(); - case BlockerState::State::kReject: - return {ErrorCodes::ResumeTenantChangeStream, - "Change stream must be resumed on the new owner of this tenant"}; - case BlockerState::State::kAborted: - return Status::OK(); - } - MONGO_UNREACHABLE; -} - void TenantMigrationDonorAccessBlocker::startBlockingWrites() { stdx::lock_guard<Latch> lg(_mutex); diff --git a/src/mongo/db/repl/tenant_migration_donor_access_blocker.h b/src/mongo/db/repl/tenant_migration_donor_access_blocker.h index ff7e7b1ab83..488359f09f2 100644 --- a/src/mongo/db/repl/tenant_migration_donor_access_blocker.h +++ b/src/mongo/db/repl/tenant_migration_donor_access_blocker.h @@ -127,11 +127,6 @@ inline RepeatableSharedPromise<void>::~RepeatableSharedPromise() { * to be rejected if it is possible that some writes have been accepted by the recipient (i.e. the * migration has committed). * - * Change stream getMore commands call assertCanGetMoreChangeStream. After a commit normal getMores - * are allowed to proceed and drain the cursors, but change stream cursors are infinite and can't be - * fully drained. We added this special check for change streams specifically to signal that they - * must be resumed on the recipient. - * * Index build user threads call checkIfCanBuildIndex. Index builds are blocked and rejected * similarly to regular writes except that they are blocked from the start of the migration (i.e. * before "blockTimestamp" is chosen). @@ -203,11 +198,6 @@ public: // Status checkIfCanBuildIndex() final; - /** - * Returns error status if "getMore" command of a change stream should fail. - */ - Status checkIfCanGetMoreChangeStream() final; - bool checkIfShouldBlockTTL() const final { // There is no TTL race at the donor side. See parent class for details. return false; diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp index e53a5326950..02fec7a5d92 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp @@ -158,10 +158,6 @@ Status TenantMigrationRecipientAccessBlocker::checkIfCanBuildIndex() { return Status::OK(); } -Status TenantMigrationRecipientAccessBlocker::checkIfCanGetMoreChangeStream() { - return Status::OK(); -} - bool TenantMigrationRecipientAccessBlocker::checkIfShouldBlockTTL() const { stdx::lock_guard<Latch> lg(_mutex); return _ttlIsBlocked; diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h index 23ef8356377..39caef917aa 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h +++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h @@ -91,11 +91,6 @@ public: // Status checkIfCanBuildIndex() final; - /** - * Returns error status if "getMore" command of a change stream should fail. - */ - Status checkIfCanGetMoreChangeStream() final; - // @return true if TTL is blocked bool checkIfShouldBlockTTL() const final; |