summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2023-04-19 20:29:59 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-19 22:52:06 +0000
commitd674d78896b572797df816a093ab8682ab860eca (patch)
treee55e908b3c090706b085a93cc0e1fcd5a1b18aa7
parentb8a84c69d428492c9c003dbdb9c035133c696685 (diff)
downloadmongo-d674d78896b572797df816a093ab8682ab860eca.tar.gz
SERVER-75470 Reject change stream getMore commands after a split or merge has been committed
-rw-r--r--jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js20
-rw-r--r--jstests/serverless/shard_split_change_collections_test.js58
-rw-r--r--src/mongo/base/error_codes.yml3
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp7
-rw-r--r--src/mongo/db/error_labels.cpp3
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker.h5
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_util.cpp11
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_util.h5
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp16
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_access_blocker.h10
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp4
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_access_blocker.h5
12 files changed, 136 insertions, 11 deletions
diff --git a/jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js b/jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js
index 2a816b1ee50..7a6b5b86deb 100644
--- a/jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js
+++ b/jstests/replsets/tenant_migration_recipient_shard_merge_copies_change_collections.js
@@ -185,6 +185,26 @@ donorSession2.commitTransaction_forTesting();
fpBeforeMarkingCloneSuccess.off();
TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+
+// Test that running a getMore on a change stream cursor after the migration commits throws a
+// resumable change stream exception.
+const failedGetMore = donorTenantConn2.getDB("database").runCommand("getMore", {
+ getMore: donorCursor2._cursorid,
+ collection: "collection"
+});
+assert.commandFailedWithCode(
+ failedGetMore,
+ ErrorCodes.ResumeTenantChangeStream,
+ "Tailing a change stream on the donor after completion of a shard merge should fail.");
+assert(failedGetMore.hasOwnProperty("errorLabels"));
+assert.contains("ResumableChangeStreamError", failedGetMore.errorLabels);
+
+// The cursor should have been deleted after the error so a getMore should fail.
+assert.commandFailedWithCode(
+ donorTenantConn2.getDB("database")
+ .runCommand("getMore", {getMore: donorCursor2._cursorid, collection: "collection"}),
+ ErrorCodes.CursorNotFound);
+
assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
tenantMigrationTest.waitForMigrationGarbageCollection(migrationUuid, tenantIds[0]);
diff --git a/jstests/serverless/shard_split_change_collections_test.js b/jstests/serverless/shard_split_change_collections_test.js
index 8279b71a68e..d7c647ec469 100644
--- a/jstests/serverless/shard_split_change_collections_test.js
+++ b/jstests/serverless/shard_split_change_collections_test.js
@@ -21,12 +21,21 @@ const donorTenantConn =
ChangeStreamMultitenantReplicaSetTest.getTenantConnection(donorPrimary.host, tenantIds[0]);
test.donor.setChangeStreamState(donorTenantConn, true);
+const donorNonMovingTenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(donorPrimary.host, ObjectId());
+test.donor.setChangeStreamState(donorNonMovingTenantConn, true);
+const donorNonMovingCursor = donorNonMovingTenantConn.getDB("database").collection.watch();
+
// Open a change stream and insert documents into database.collection before the split
// starts.
const donorCursor = donorTenantConn.getDB("database").collection.watch([]);
const insertedDocs = [{_id: "tenant1_1"}, {_id: "tenant1_2"}, {_id: "tenant1_3"}];
donorTenantConn.getDB("database").collection.insertMany(insertedDocs);
+// Start up a cursor to check if we can getMore after the tenant has been migrated and change
+// collection is dropped.
+const donorCursor2 = donorTenantConn.getDB("database").collection.watch([]);
+
const donorTenantSession = donorTenantConn.startSession({retryWrites: true});
const donorTenantSessionCollection = donorTenantSession.getDatabase("database").collection;
assert.commandWorked(donorTenantSessionCollection.insert({_id: "tenant1_4", w: "RETRYABLE"}));
@@ -54,18 +63,49 @@ const operation = test.createSplitOperation(tenantIds);
assert.commandWorked(operation.commit());
assertMigrationState(donorPrimary, operation.migrationId, "committed");
-let errCode;
-try {
- donorTenantConn.getDB("database").collection.watch([]);
-} catch (err) {
- errCode = err.code;
-}
-assert.eq(errCode,
- ErrorCodes.TenantMigrationCommitted,
- "Opening a change stream on the donor after completion of a shard split should fail.");
+// Test that we cannot open a new change stream after the tenant has been migrated.
+assert.commandFailedWithCode(
+ donorTenantConn.getDB("database")
+ .runCommand({aggregate: "collection", cursor: {}, pipeline: [{$changeStream: {}}]}),
+ ErrorCodes.TenantMigrationCommitted,
+ "Opening a change stream on the donor after completion of a shard split should fail.");
+
+// Test change stream cursor behavior on the donor for a tenant which was migrated, and for one
+// which remains on the donor.
+assert.commandWorked(
+ donorNonMovingTenantConn.getDB("database")
+ .runCommand("getMore", {getMore: donorNonMovingCursor._cursorid, collection: "collection"}),
+ "Tailing a change stream for a tenant that wasn't moved by a split" +
+ "should not be blocked after the split was committed");
+
+// Test that running a getMore on a change stream cursor after the migration commits throws a
+// resumable change stream exception.
+const failedGetMore = donorTenantConn.getDB("database").runCommand("getMore", {
+ getMore: donorCursor._cursorid,
+ collection: "collection"
+});
+assert.commandFailedWithCode(
+ failedGetMore,
+ ErrorCodes.ResumeTenantChangeStream,
+ "Tailing a change stream on the donor after completion of a shard split should fail.");
+assert(failedGetMore.hasOwnProperty("errorLabels"));
+assert.contains("ResumableChangeStreamError", failedGetMore.errorLabels);
+
+// The cursor should have been deleted after the error so a getMore should fail.
+assert.commandFailedWithCode(
+ donorTenantConn.getDB("database")
+ .runCommand("getMore", {getMore: donorCursor._cursorid, collection: "collection"}),
+ ErrorCodes.CursorNotFound);
operation.forget();
+// getMore cursor to check if we can getMore after the database is dropped.
+donorTenantSession.getDatabase("config")["system.change_collection"].drop();
+assert.commandFailedWithCode(
+ donorTenantConn.getDB("database")
+ .runCommand("getMore", {getMore: donorCursor2._cursorid, collection: "collection"}),
+ ErrorCodes.QueryPlanKilled);
+
const recipientRst = test.getRecipient();
const recipientPrimary = recipientRst.getPrimary();
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index ff48a9c4fac..fddc067a20d 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -529,6 +529,8 @@ error_codes:
- {code: 399, name: MovePrimaryRecipientPastAbortableStage, categories: [InternalOnly]}
+ - {code: 400, name: ResumeTenantChangeStream}
+
# Error codes 4000-8999 are reserved.
# Non-sequential error codes for compatibility only)
@@ -558,4 +560,3 @@ error_codes:
- {code: 50768,name: NotARetryableWriteCommand}
- {code: 50915,name: BackupCursorOpenConflictWithCheckpoint, categories: [RetriableError]}
- {code: 56846,name: ConfigServerUnreachable}
-
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 40e307e999c..796f4efd621 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/speculative_majority_read_info.h"
+#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/resource_consumption_metrics.h"
@@ -588,6 +589,12 @@ public:
curOp->setGenericCursor_inlock(cursorPin->toGenericCursor());
}
+ // If this is a change stream cursor, check whether the tenant has migrated elsewhere.
+ if (cursorPin->getExecutor()->getPostBatchResumeToken()["_data"]) {
+ tenant_migration_access_blocker::assertCanGetMoreChangeStream(opCtx,
+ _cmd.getDbName());
+ }
+
// If the 'failGetMoreAfterCursorCheckout' failpoint is enabled, throw an exception with
// the given 'errorCode' value, or ErrorCodes::InternalError if 'errorCode' is omitted.
failGetMoreAfterCursorCheckout.executeIf(
diff --git a/src/mongo/db/error_labels.cpp b/src/mongo/db/error_labels.cpp
index 96e1b05ede6..fefac7ba52f 100644
--- a/src/mongo/db/error_labels.cpp
+++ b/src/mongo/db/error_labels.cpp
@@ -105,7 +105,8 @@ bool ErrorLabelBuilder::isResumableChangeStreamError() const {
(_commandName == "aggregate" || _commandName == "getMore") && _code && !_wcCode &&
(ErrorCodes::isRetriableError(*_code) || ErrorCodes::isNetworkError(*_code) ||
ErrorCodes::isNeedRetargettingError(*_code) || _code == ErrorCodes::RetryChangeStream ||
- _code == ErrorCodes::FailedToSatisfyReadPreference);
+ _code == ErrorCodes::FailedToSatisfyReadPreference ||
+ _code == ErrorCodes::ResumeTenantChangeStream);
// If the command or exception is not relevant, bail out early.
if (!mayNeedResumableChangeStreamErrorLabel) {
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.h b/src/mongo/db/repl/tenant_migration_access_blocker.h
index 0ff8928d5c7..896f92607a0 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker.h
+++ b/src/mongo/db/repl/tenant_migration_access_blocker.h
@@ -72,6 +72,11 @@ public:
//
virtual Status checkIfCanBuildIndex() = 0;
+ /**
+ * Checks if getMores for change streams should fail.
+ */
+ virtual Status checkIfCanGetMoreChangeStream() = 0;
+
// We suspend TTL deletions at the recipient side to avoid the race when a document is updated
// at the donor side, which may prevent it from being garbage collected by TTL, while the
// recipient side document is deleted by the TTL. The donor side update will fail to propagate
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
index 1f16deea8f6..5372a209a7c 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
@@ -529,6 +529,17 @@ Status checkIfCanBuildIndex(OperationContext* opCtx, const DatabaseName& dbName)
return Status::OK();
}
+void assertCanGetMoreChangeStream(OperationContext* opCtx, const DatabaseName& dbName) {
+ // We only block change stream getMores on the donor.
+ auto mtab = TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
+ .getTenantMigrationAccessBlockerForDbName(dbName, MtabType::kDonor);
+ if (mtab) {
+ auto status = mtab->checkIfCanGetMoreChangeStream();
+ mtab->recordTenantMigrationError(status);
+ uassertStatusOK(status);
+ }
+}
+
bool hasActiveTenantMigration(OperationContext* opCtx, const DatabaseName& dbName) {
if (dbName.db().empty()) {
return false;
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.h b/src/mongo/db/repl/tenant_migration_access_blocker_util.h
index fc3a9035ca5..a86aa35ce4a 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_util.h
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.h
@@ -109,6 +109,11 @@ void checkIfCanWriteOrThrow(OperationContext* opCtx, const DatabaseName& dbName,
Status checkIfCanBuildIndex(OperationContext* opCtx, const DatabaseName& dbName);
/**
+ * Asserts if getMores for change streams should fail.
+ */
+void assertCanGetMoreChangeStream(OperationContext* opCtx, const DatabaseName& dbName);
+
+/**
* Returns true if there is either a donor or recipient access blocker for the given dbName.
*/
bool hasActiveTenantMigration(OperationContext* opCtx, const DatabaseName& dbName);
diff --git a/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp
index a4e45fdbc91..01d73378e94 100644
--- a/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp
@@ -224,6 +224,22 @@ Status TenantMigrationDonorAccessBlocker::checkIfCanBuildIndex() {
MONGO_UNREACHABLE;
}
+Status TenantMigrationDonorAccessBlocker::checkIfCanGetMoreChangeStream() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ switch (_state.getState()) {
+ case BlockerState::State::kAllow:
+ case BlockerState::State::kBlockWrites:
+ case BlockerState::State::kBlockWritesAndReads:
+ return Status::OK();
+ case BlockerState::State::kReject:
+ return {ErrorCodes::ResumeTenantChangeStream,
+ "Change stream must be resumed on the new owner of this tenant"};
+ case BlockerState::State::kAborted:
+ return Status::OK();
+ }
+ MONGO_UNREACHABLE;
+}
+
void TenantMigrationDonorAccessBlocker::startBlockingWrites() {
stdx::lock_guard<Latch> lg(_mutex);
diff --git a/src/mongo/db/repl/tenant_migration_donor_access_blocker.h b/src/mongo/db/repl/tenant_migration_donor_access_blocker.h
index 488359f09f2..ff7e7b1ab83 100644
--- a/src/mongo/db/repl/tenant_migration_donor_access_blocker.h
+++ b/src/mongo/db/repl/tenant_migration_donor_access_blocker.h
@@ -127,6 +127,11 @@ inline RepeatableSharedPromise<void>::~RepeatableSharedPromise() {
* to be rejected if it is possible that some writes have been accepted by the recipient (i.e. the
* migration has committed).
*
+ * Change stream getMore commands call assertCanGetMoreChangeStream. After a commit normal getMores
+ * are allowed to proceed and drain the cursors, but change stream cursors are infinite and can't be
+ * fully drained. We added this special check for change streams specifically to signal that they
+ * must be resumed on the recipient.
+ *
* Index build user threads call checkIfCanBuildIndex. Index builds are blocked and rejected
* similarly to regular writes except that they are blocked from the start of the migration (i.e.
* before "blockTimestamp" is chosen).
@@ -198,6 +203,11 @@ public:
//
Status checkIfCanBuildIndex() final;
+ /**
+ * Returns error status if "getMore" command of a change stream should fail.
+ */
+ Status checkIfCanGetMoreChangeStream() final;
+
bool checkIfShouldBlockTTL() const final {
// There is no TTL race at the donor side. See parent class for details.
return false;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp
index 02fec7a5d92..e53a5326950 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp
@@ -158,6 +158,10 @@ Status TenantMigrationRecipientAccessBlocker::checkIfCanBuildIndex() {
return Status::OK();
}
+Status TenantMigrationRecipientAccessBlocker::checkIfCanGetMoreChangeStream() {
+ return Status::OK();
+}
+
bool TenantMigrationRecipientAccessBlocker::checkIfShouldBlockTTL() const {
stdx::lock_guard<Latch> lg(_mutex);
return _ttlIsBlocked;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h
index 39caef917aa..23ef8356377 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h
@@ -91,6 +91,11 @@ public:
//
Status checkIfCanBuildIndex() final;
+ /**
+ * Returns error status if "getMore" command of a change stream should fail.
+ */
+ Status checkIfCanGetMoreChangeStream() final;
+
// @return true if TTL is blocked
bool checkIfShouldBlockTTL() const final;