diff options
author | Mickey. J Winters <mickey.winters@mongodb.com> | 2023-04-19 20:29:59 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-19 22:52:06 +0000 |
commit | d674d78896b572797df816a093ab8682ab860eca (patch) | |
tree | e55e908b3c090706b085a93cc0e1fcd5a1b18aa7 | |
parent | b8a84c69d428492c9c003dbdb9c035133c696685 (diff) | |
download | mongo-d674d78896b572797df816a093ab8682ab860eca.tar.gz |
SERVER-75470 Reject change stream getMore commands after a split or merge has been committed
12 files changed, 136 insertions, 11 deletions
diff --git a/jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js b/jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js index 2a816b1ee50..7a6b5b86deb 100644 --- a/jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js +++ b/jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js @@ -185,6 +185,26 @@ donorSession2.commitTransaction_forTesting(); fpBeforeMarkingCloneSuccess.off(); TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + +// Test that running a getMore on a change stream cursor after the migration commits throws a +// resumable change stream exception. +const failedGetMore = donorTenantConn2.getDB("database").runCommand("getMore", { + getMore: donorCursor2._cursorid, + collection: "collection" +}); +assert.commandFailedWithCode( + failedGetMore, + ErrorCodes.ResumeTenantChangeStream, + "Tailing a change stream on the donor after completion of a shard merge should fail."); +assert(failedGetMore.hasOwnProperty("errorLabels")); +assert.contains("ResumableChangeStreamError", failedGetMore.errorLabels); + +// The cursor should have been deleted after the error so a getMore should fail. +assert.commandFailedWithCode( + donorTenantConn2.getDB("database") + .runCommand("getMore", {getMore: donorCursor2._cursorid, collection: "collection"}), + ErrorCodes.CursorNotFound); + assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString)); tenantMigrationTest.waitForMigrationGarbageCollection(migrationUuid, tenantIds[0]); diff --git a/jstests/serverless/shard_split_change_collections_test.js b/jstests/serverless/shard_split_change_collections_test.js index 8279b71a68e..d7c647ec469 100644 --- a/jstests/serverless/shard_split_change_collections_test.js +++ b/jstests/serverless/shard_split_change_collections_test.js @@ -21,12 +21,21 @@ const donorTenantConn = ChangeStreamMultitenantReplicaSetTest.getTenantConnection(donorPrimary.host, tenantIds[0]); test.donor.setChangeStreamState(donorTenantConn, true); +const donorNonMovingTenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(donorPrimary.host, ObjectId()); +test.donor.setChangeStreamState(donorNonMovingTenantConn, true); +const donorNonMovingCursor = donorNonMovingTenantConn.getDB("database").collection.watch(); + // Open a change stream and insert documents into database.collection before the split // starts. const donorCursor = donorTenantConn.getDB("database").collection.watch([]); const insertedDocs = [{_id: "tenant1_1"}, {_id: "tenant1_2"}, {_id: "tenant1_3"}]; donorTenantConn.getDB("database").collection.insertMany(insertedDocs); +// Start up a cursor to check if we can getMore after the tenant has been migrated and change +// collection is dropped. +const donorCursor2 = donorTenantConn.getDB("database").collection.watch([]); + const donorTenantSession = donorTenantConn.startSession({retryWrites: true}); const donorTenantSessionCollection = donorTenantSession.getDatabase("database").collection; assert.commandWorked(donorTenantSessionCollection.insert({_id: "tenant1_4", w: "RETRYABLE"})); @@ -54,18 +63,49 @@ const operation = test.createSplitOperation(tenantIds); assert.commandWorked(operation.commit()); assertMigrationState(donorPrimary, operation.migrationId, "committed"); -let errCode; -try { - donorTenantConn.getDB("database").collection.watch([]); -} catch (err) { - errCode = err.code; -} -assert.eq(errCode, - ErrorCodes.TenantMigrationCommitted, - "Opening a change stream on the donor after completion of a shard split should fail."); +// Test that we cannot open a new change stream after the tenant has been migrated. +assert.commandFailedWithCode( + donorTenantConn.getDB("database") + .runCommand({aggregate: "collection", cursor: {}, pipeline: [{$changeStream: {}}]}), + ErrorCodes.TenantMigrationCommitted, + "Opening a change stream on the donor after completion of a shard split should fail."); + +// Test change stream cursor behavior on the donor for a tenant which was migrated, and for one +// which remains on the donor. +assert.commandWorked( + donorNonMovingTenantConn.getDB("database") + .runCommand("getMore", {getMore: donorNonMovingCursor._cursorid, collection: "collection"}), + "Tailing a change stream for a tenant that wasn't moved by a split" + + "should not be blocked after the split was committed"); + +// Test that running a getMore on a change stream cursor after the migration commits throws a +// resumable change stream exception. +const failedGetMore = donorTenantConn.getDB("database").runCommand("getMore", { + getMore: donorCursor._cursorid, + collection: "collection" +}); +assert.commandFailedWithCode( + failedGetMore, + ErrorCodes.ResumeTenantChangeStream, + "Tailing a change stream on the donor after completion of a shard split should fail."); +assert(failedGetMore.hasOwnProperty("errorLabels")); +assert.contains("ResumableChangeStreamError", failedGetMore.errorLabels); + +// The cursor should have been deleted after the error so a getMore should fail. +assert.commandFailedWithCode( + donorTenantConn.getDB("database") + .runCommand("getMore", {getMore: donorCursor._cursorid, collection: "collection"}), + ErrorCodes.CursorNotFound); operation.forget(); +// getMore cursor to check if we can getMore after the database is dropped. +donorTenantSession.getDatabase("config")["system.change_collection"].drop(); +assert.commandFailedWithCode( + donorTenantConn.getDB("database") + .runCommand("getMore", {getMore: donorCursor2._cursorid, collection: "collection"}), + ErrorCodes.QueryPlanKilled); + const recipientRst = test.getRecipient(); const recipientPrimary = recipientRst.getPrimary(); diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index ff48a9c4fac..fddc067a20d 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -529,6 +529,8 @@ error_codes: - {code: 399, name: MovePrimaryRecipientPastAbortableStage, categories: [InternalOnly]} + - {code: 400, name: ResumeTenantChangeStream} + # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) @@ -558,4 +560,3 @@ error_codes: - {code: 50768,name: NotARetryableWriteCommand} - {code: 50915,name: BackupCursorOpenConflictWithCheckpoint, categories: [RetriableError]} - {code: 56846,name: ConfigServerUnreachable} - diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 40e307e999c..796f4efd621 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -56,6 +56,7 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/speculative_majority_read_info.h" +#include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/resource_consumption_metrics.h" @@ -588,6 +589,12 @@ public: curOp->setGenericCursor_inlock(cursorPin->toGenericCursor()); } + // If this is a change stream cursor, check whether the tenant has migrated elsewhere. + if (cursorPin->getExecutor()->getPostBatchResumeToken()["_data"]) { + tenant_migration_access_blocker::assertCanGetMoreChangeStream(opCtx, + _cmd.getDbName()); + } + // If the 'failGetMoreAfterCursorCheckout' failpoint is enabled, throw an exception with // the given 'errorCode' value, or ErrorCodes::InternalError if 'errorCode' is omitted. failGetMoreAfterCursorCheckout.executeIf( diff --git a/src/mongo/db/error_labels.cpp b/src/mongo/db/error_labels.cpp index 96e1b05ede6..fefac7ba52f 100644 --- a/src/mongo/db/error_labels.cpp +++ b/src/mongo/db/error_labels.cpp @@ -105,7 +105,8 @@ bool ErrorLabelBuilder::isResumableChangeStreamError() const { (_commandName == "aggregate" || _commandName == "getMore") && _code && !_wcCode && (ErrorCodes::isRetriableError(*_code) || ErrorCodes::isNetworkError(*_code) || ErrorCodes::isNeedRetargettingError(*_code) || _code == ErrorCodes::RetryChangeStream || - _code == ErrorCodes::FailedToSatisfyReadPreference); + _code == ErrorCodes::FailedToSatisfyReadPreference || + _code == ErrorCodes::ResumeTenantChangeStream); // If the command or exception is not relevant, bail out early. if (!mayNeedResumableChangeStreamErrorLabel) { diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.h b/src/mongo/db/repl/tenant_migration_access_blocker.h index 0ff8928d5c7..896f92607a0 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker.h +++ b/src/mongo/db/repl/tenant_migration_access_blocker.h @@ -72,6 +72,11 @@ public: // virtual Status checkIfCanBuildIndex() = 0; + /** + * Checks if getMores for change streams should fail. + */ + virtual Status checkIfCanGetMoreChangeStream() = 0; + // We suspend TTL deletions at the recipient side to avoid the race when a document is updated // at the donor side, which may prevent it from being garbage collected by TTL, while the // recipient side document is deleted by the TTL. The donor side update will fail to propagate diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp index 1f16deea8f6..5372a209a7c 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp @@ -529,6 +529,17 @@ Status checkIfCanBuildIndex(OperationContext* opCtx, const DatabaseName& dbName) return Status::OK(); } +void assertCanGetMoreChangeStream(OperationContext* opCtx, const DatabaseName& dbName) { + // We only block change stream getMores on the donor. + auto mtab = TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .getTenantMigrationAccessBlockerForDbName(dbName, MtabType::kDonor); + if (mtab) { + auto status = mtab->checkIfCanGetMoreChangeStream(); + mtab->recordTenantMigrationError(status); + uassertStatusOK(status); + } +} + bool hasActiveTenantMigration(OperationContext* opCtx, const DatabaseName& dbName) { if (dbName.db().empty()) { return false; diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.h b/src/mongo/db/repl/tenant_migration_access_blocker_util.h index fc3a9035ca5..a86aa35ce4a 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.h +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.h @@ -109,6 +109,11 @@ void checkIfCanWriteOrThrow(OperationContext* opCtx, const DatabaseName& dbName, Status checkIfCanBuildIndex(OperationContext* opCtx, const DatabaseName& dbName); /** + * Asserts if getMores for change streams should fail. + */ +void assertCanGetMoreChangeStream(OperationContext* opCtx, const DatabaseName& dbName); + +/** * Returns true if there is either a donor or recipient access blocker for the given dbName. */ bool hasActiveTenantMigration(OperationContext* opCtx, const DatabaseName& dbName); diff --git a/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp index a4e45fdbc91..01d73378e94 100644 --- a/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp @@ -224,6 +224,22 @@ Status TenantMigrationDonorAccessBlocker::checkIfCanBuildIndex() { MONGO_UNREACHABLE; } +Status TenantMigrationDonorAccessBlocker::checkIfCanGetMoreChangeStream() { + stdx::lock_guard<Latch> lg(_mutex); + switch (_state.getState()) { + case BlockerState::State::kAllow: + case BlockerState::State::kBlockWrites: + case BlockerState::State::kBlockWritesAndReads: + return Status::OK(); + case BlockerState::State::kReject: + return {ErrorCodes::ResumeTenantChangeStream, + "Change stream must be resumed on the new owner of this tenant"}; + case BlockerState::State::kAborted: + return Status::OK(); + } + MONGO_UNREACHABLE; +} + void TenantMigrationDonorAccessBlocker::startBlockingWrites() { stdx::lock_guard<Latch> lg(_mutex); diff --git a/src/mongo/db/repl/tenant_migration_donor_access_blocker.h b/src/mongo/db/repl/tenant_migration_donor_access_blocker.h index 488359f09f2..ff7e7b1ab83 100644 --- a/src/mongo/db/repl/tenant_migration_donor_access_blocker.h +++ b/src/mongo/db/repl/tenant_migration_donor_access_blocker.h @@ -127,6 +127,11 @@ inline RepeatableSharedPromise<void>::~RepeatableSharedPromise() { * to be rejected if it is possible that some writes have been accepted by the recipient (i.e. the * migration has committed). * + * Change stream getMore commands call assertCanGetMoreChangeStream. After a commit normal getMores + * are allowed to proceed and drain the cursors, but change stream cursors are infinite and can't be + * fully drained. We added this special check for change streams specifically to signal that they + * must be resumed on the recipient. + * * Index build user threads call checkIfCanBuildIndex. Index builds are blocked and rejected * similarly to regular writes except that they are blocked from the start of the migration (i.e. * before "blockTimestamp" is chosen). @@ -198,6 +203,11 @@ public: // Status checkIfCanBuildIndex() final; + /** + * Returns error status if "getMore" command of a change stream should fail. + */ + Status checkIfCanGetMoreChangeStream() final; + bool checkIfShouldBlockTTL() const final { // There is no TTL race at the donor side. See parent class for details. return false; diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp index 02fec7a5d92..e53a5326950 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp @@ -158,6 +158,10 @@ Status TenantMigrationRecipientAccessBlocker::checkIfCanBuildIndex() { return Status::OK(); } +Status TenantMigrationRecipientAccessBlocker::checkIfCanGetMoreChangeStream() { + return Status::OK(); +} + bool TenantMigrationRecipientAccessBlocker::checkIfShouldBlockTTL() const { stdx::lock_guard<Latch> lg(_mutex); return _ttlIsBlocked; diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h index 39caef917aa..23ef8356377 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h +++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h @@ -91,6 +91,11 @@ public: // Status checkIfCanBuildIndex() final; + /** + * Returns error status if "getMore" command of a change stream should fail. + */ + Status checkIfCanGetMoreChangeStream() final; + // @return true if TTL is blocked bool checkIfShouldBlockTTL() const final; |