summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMisha Tyulenev <misha.tyulenev@mongodb.com>2022-03-16 21:52:09 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-16 22:55:45 +0000
commitcccb587590252e1b2bf2e199e783e52c65141825 (patch)
tree89a819d7e9f541b4ce45ac9cc36f0b6f53e68821
parent75c7f1ed4f99d3fa3bfbf942cf086b68c9a47286 (diff)
downloadmongo-cccb587590252e1b2bf2e199e783e52c65141825.tar.gz
SERVER-62591 Add change streams event for migrate last chunk off shard
-rw-r--r--jstests/change_streams/migrate_last_chunk_from_shard_event.js156
-rw-r--r--src/mongo/db/pipeline/change_stream_filter_helpers.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp9
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp56
-rw-r--r--src/mongo/db/s/migration_source_manager.h7
-rw-r--r--src/mongo/db/s/migration_util.cpp73
-rw-r--r--src/mongo/db/s/migration_util.h19
8 files changed, 280 insertions, 46 deletions
diff --git a/jstests/change_streams/migrate_last_chunk_from_shard_event.js b/jstests/change_streams/migrate_last_chunk_from_shard_event.js
new file mode 100644
index 00000000000..8ae2c423eaa
--- /dev/null
+++ b/jstests/change_streams/migrate_last_chunk_from_shard_event.js
@@ -0,0 +1,156 @@
+/**
+ * Test that change streams returns migrateLastChunkFromShard events.
+ *
+ * @tags: [
+ * requires_fcv_60,
+ * requires_sharding,
+ * uses_change_streams,
+ * change_stream_does_not_expect_txns,
+ * assumes_unsharded_collection,
+ * assumes_read_preference_unchanged,
+ * featureFlagChangeStreamsVisibility
+ * ]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/collection_drop_recreate.js"); // For assertDropCollection.
+load('jstests/libs/change_stream_util.js'); // For 'ChangeStreamTest' and
+ // 'assertChangeStreamEventEq'.
+const dbName = jsTestName();
+const collName = "test";
+const collNS = dbName + "." + collName;
+const ns = {
+ db: dbName,
+ coll: collName
+};
+const numDocs = 1;
+
+const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+});
+
+const mongosConn = st.s;
+const db = mongosConn.getDB(dbName);
+const test = new ChangeStreamTest(db);
+
+function getCollectionUuid(coll) {
+ const collInfo = db.getCollectionInfos({name: coll})[0];
+ return collInfo.info.uuid;
+}
+
+function assertMigrateEventObserved(cursor, expectedEvent) {
+ let events = test.getNextChanges(cursor, 1);
+ let event = events[0];
+ // Check the presence and the type of 'wallTime' field. We have no way to check the correctness
+ // of 'wallTime' value, so we delete it afterwards.
+ assert(event.wallTime instanceof Date);
+ delete event.wallTime;
+ expectedEvent.collectionUUID = getCollectionUuid(collName);
+ assertChangeStreamEventEq(event, expectedEvent);
+ return event._id;
+}
+
+function prepareCollection() {
+ assertDropCollection(db, collName);
+ assert.commandWorked(db[collName].insert({_id: 1}));
+ assert.commandWorked(st.s.adminCommand({shardCollection: collNS, key: {_id: 1}}));
+}
+
+// Test that if showSystemEvents is false, we do not see the migrateLastChunkFromShard event.
+function validateShowSystemEventsFalse() {
+ prepareCollection();
+ let pipeline = [
+ {$changeStream: {showExpandedEvents: true, showSystemEvents: false}},
+ {$match: {operationType: {$nin: ["create", "createIndexes"]}}}
+ ];
+ let cursor = test.startWatchingChanges(
+ {pipeline, collection: collName, aggregateOptions: {cursor: {batchSize: 0}}});
+
+ // Migrate a chunk, then insert a new document.
+ assert.commandWorked(
+ db.adminCommand({moveChunk: collNS, find: {_id: 0}, to: st.shard1.shardName}));
+ assert.commandWorked(db[collName].insert({_id: numDocs + 1}));
+
+ // Confirm that we don't observe the migrateLastChunkFromShard event in the stream, but only see
+ // the subsequent insert.
+ test.assertNextChangesEqual({
+ cursor: cursor,
+ expectedChanges: {
+ operationType: "insert",
+ ns: ns,
+ fullDocument: {_id: numDocs + 1},
+ documentKey: {_id: numDocs + 1},
+ }
+ });
+}
+
+// Test that if showSystemEvents is true, we see the migrateLastChunkFromShard event and can resume
+// after it.
+function validateExpectedEventAndConfirmResumability(collParam, expectedOutput) {
+ prepareCollection();
+
+ let pipeline = [
+ {$changeStream: {showExpandedEvents: true, showSystemEvents: true}},
+ {$match: {operationType: {$nin: ["create", "createIndexes"]}}}
+ ];
+ let cursor = test.startWatchingChanges(
+ {pipeline: pipeline, collection: collParam, aggregateOptions: {cursor: {batchSize: 0}}});
+
+ // Migrate a chunk from one shard to another.
+ assert.commandWorked(
+ db.adminCommand({moveChunk: collNS, find: {_id: 0}, to: st.shard1.shardName}));
+
+ // Confirm that we observe the migrateLastChunkFromShard event, and obtain its resume token.
+ const migrateResumeToken = assertMigrateEventObserved(cursor, expectedOutput);
+
+ // Insert a document before starting the next change stream so that we can validate the
+ // resuming behavior.
+ assert.commandWorked(db[collName].insert({_id: numDocs + 1}));
+
+ // Resume after the migrate event and confirm we see the subsequent insert.
+ pipeline = [{
+ $changeStream:
+ {showExpandedEvents: true, showSystemEvents: true, resumeAfter: migrateResumeToken}
+ }];
+ cursor = test.startWatchingChanges({pipeline: pipeline, collection: collParam});
+
+ test.assertNextChangesEqual({
+ cursor: cursor,
+ expectedChanges: {
+ operationType: "insert",
+ ns: ns,
+ fullDocument: {_id: numDocs + 1},
+ documentKey: {_id: numDocs + 1},
+ }
+ });
+}
+
+assert.commandWorked(db.adminCommand({enableSharding: dbName}));
+assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: st.shard0.shardName}));
+
+// Test the behaviour of migrateLastChunkFromShard for a single-collection stream
+validateExpectedEventAndConfirmResumability(collName, {
+ operationType: "migrateLastChunkFromShard",
+ ns: ns,
+ operationDescription: {
+ "shardId": st.shard0.shardName,
+ }
+});
+
+// Test the behaviour of migrateLastChunkFromShard for a whole-DB stream.
+validateExpectedEventAndConfirmResumability(1, {
+ operationType: "migrateLastChunkFromShard",
+ ns: ns,
+ operationDescription: {
+ "shardId": st.shard0.shardName,
+ }
+});
+
+// Test the behaviour of migrateLastChunkFromShard when showSystemEvents is false.
+validateShowSystemEventsFalse();
+
+st.stop();
+}());
diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
index e0016fa00ad..377c2bf2eb2 100644
--- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
@@ -256,6 +256,11 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter(
auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx);
internalOpTypeOrBuilder.append(BSON("o2.shardCollection" << BSONRegEx(nsRegex)));
+ // Only return the 'migrateLastChunkFromShard' event if 'showSystemEvents' is set.
+ if (expCtx->changeStreamSpec->getShowSystemEvents()) {
+ internalOpTypeOrBuilder.append(BSON("o2.migrateLastChunkFromShard" << BSONRegEx(nsRegex)));
+ }
+
// Finalize the array of $or filter predicates.
internalOpTypeOrBuilder.done();
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index ae281596679..05e6d623dcf 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -234,6 +234,7 @@ public:
static constexpr StringData kCreateIndexesOpType = "createIndexes"_sd;
static constexpr StringData kDropIndexesOpType = "dropIndexes"_sd;
static constexpr StringData kShardCollectionOpType = "shardCollection"_sd;
+ static constexpr StringData kMigrateLastChunkFromShardOpType = "migrateLastChunkFromShard"_sd;
// Default regex for collections match which prohibits system collections.
static constexpr StringData kRegexAllCollections = R"((?!(\$|system\.)))"_sd;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 17f35589a54..68893c56738 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -377,6 +377,15 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
break;
}
+ // Check if this is a migration of the last chunk off a shard.
+ if (!input.getNestedField("o2.migrateLastChunkFromShard").missing()) {
+ const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument();
+ operationType = DocumentSourceChangeStream::kMigrateLastChunkFromShardOpType;
+ operationDescription =
+ Value(copyDocExceptFields(o2Field, {"migrateLastChunkFromShard"_sd}));
+ break;
+ }
+
// Otherwise, o2.type determines the message type.
auto o2Type = input.getNestedField("o2.type");
tassert(5052200, "o2.type is missing from noop oplog event", !o2Type.missing());
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 84598e4e1a2..180f75b78ea 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -390,13 +390,20 @@ void MigrationSourceManager::enterCriticalSection() {
_stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
_cloneAndCommitTimer.reset();
- _notifyChangeStreamsOnRecipientFirstChunk(_getCurrentMetadataAndCheckEpoch());
+ const auto& metadata = _getCurrentMetadataAndCheckEpoch();
+
+ // Check that there are no chunks on the recepient shard. Write an oplog event for change
+ // streams if this is the first migration to the recipient.
+ if (!metadata.getChunkManager()->getVersion(_args.getToShardId()).isSet()) {
+ migrationutil::notifyChangeStreamsOnRecipientFirstChunk(
+ _opCtx, _args.getNss(), _args.getFromShardId(), _args.getToShardId(), _collectionUUID);
+ }
// Mark the shard as running critical operation, which requires recovery on crash.
//
// NOTE: The 'migrateChunkToNewShard' oplog message written by the above call to
- // '_notifyChangeStreamsOnRecipientFirstChunk' depends on this majority write to carry its local
- // write to majority committed.
+ // 'notifyChangeStreamsOnRecipientFirstChunk' depends on this majority write to carry its
+ // local write to majority committed.
uassertStatusOK(ShardingStateRecovery::startMetadataOp(_opCtx));
LOGV2_DEBUG_OPTIONS(4817402,
@@ -571,6 +578,13 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() {
// Migration succeeded
const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch();
+ // Check if there are no chunks left on donor shard. Write an oplog event for change streams if
+ // the last chunk migrated off the donor.
+ if (!refreshedMetadata.getChunkManager()->getVersion(_args.getFromShardId()).isSet()) {
+ migrationutil::notifyChangeStreamsOnDonorLastChunk(
+ _opCtx, _args.getNss(), _args.getFromShardId(), _collectionUUID);
+ }
+
LOGV2(22018,
"Migration succeeded and updated collection version to {updatedCollectionVersion}",
@@ -691,42 +705,6 @@ CollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch() {
return metadata;
}
-void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk(
- const CollectionMetadata& metadata) {
- // If this is not the first donation, there is nothing to be done
- if (metadata.getChunkManager()->getVersion(_args.getToShardId()).isSet())
- return;
-
- const std::string dbgMessage = str::stream()
- << "Migrating chunk from shard " << _args.getFromShardId() << " to shard "
- << _args.getToShardId() << " with no chunks for this collection";
-
- // The message expected by change streams
- const auto o2Message = BSON("type"
- << "migrateChunkToNewShard"
- << "from" << _args.getFromShardId() << "to"
- << _args.getToShardId());
-
- auto const serviceContext = _opCtx->getClient()->getServiceContext();
-
- UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
- AutoGetOplog oplogWrite(_opCtx, OplogAccessMode::kWrite);
- writeConflictRetry(
- _opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] {
- WriteUnitOfWork uow(_opCtx);
- serviceContext->getOpObserver()->onInternalOpMessage(_opCtx,
- _args.getNss(),
- *_collectionUUID,
- BSON("msg" << dbgMessage),
- o2Message,
- boost::none,
- boost::none,
- boost::none,
- boost::none);
- uow.commit();
- });
-}
-
void MigrationSourceManager::_cleanup(bool completeMigration) noexcept {
invariant(_state != kDone);
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index d51d6265867..9b1b64dd87f 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -187,13 +187,6 @@ private:
CollectionMetadata _getCurrentMetadataAndCheckEpoch();
/**
- * If this donation moves the first chunk to the recipient (i.e., the recipient didn't have any
- * chunks), this function writes a no-op message to the oplog, so that change stream will notice
- * that and close the cursor in order to notify mongos to target the new shard as well.
- */
- void _notifyChangeStreamsOnRecipientFirstChunk(const CollectionMetadata& metadata);
-
- /**
* Called when any of the states fails. May only be called once and will put the migration
* manager into the kDone state.
*/
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 364f5813f0a..5bde34b3591 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -42,9 +42,11 @@
#include "mongo/db/catalog/collection_catalog_helper.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/op_observer.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -706,6 +708,73 @@ int retrieveNumOrphansFromRecipient(OperationContext* opCtx,
return rangeDeletionResponse.docs[0].getIntField("numOrphanDocs");
}
+void notifyChangeStreamsOnRecipientFirstChunk(OperationContext* opCtx,
+ const NamespaceString& collNss,
+ const ShardId& fromShardId,
+ const ShardId& toShardId,
+ boost::optional<UUID> collUUID) {
+
+ const std::string dbgMessage = str::stream()
+ << "Migrating chunk from shard " << fromShardId << " to shard " << toShardId
+ << " with no chunks for this collection";
+
+ // The message expected by change streams
+ const auto o2Message = BSON("type"
+ << "migrateChunkToNewShard"
+ << "from" << fromShardId << "to" << toShardId);
+
+ auto const serviceContext = opCtx->getClient()->getServiceContext();
+
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite);
+ writeConflictRetry(
+ opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ WriteUnitOfWork uow(opCtx);
+ serviceContext->getOpObserver()->onInternalOpMessage(opCtx,
+ collNss,
+ *collUUID,
+ BSON("msg" << dbgMessage),
+ o2Message,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
+ uow.commit();
+ });
+}
+
+void notifyChangeStreamsOnDonorLastChunk(OperationContext* opCtx,
+ const NamespaceString& collNss,
+ const ShardId& donorShardId,
+ boost::optional<UUID> collUUID) {
+
+ const std::string oMessage = str::stream()
+ << "Migrate the last chunk for " << collNss << " off shard " << donorShardId;
+
+ // The message expected by change streams
+ const auto o2Message =
+ BSON("migrateLastChunkFromShard" << collNss.toString() << "shardId" << donorShardId);
+
+ auto const serviceContext = opCtx->getClient()->getServiceContext();
+
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite);
+ writeConflictRetry(
+ opCtx, "migrateLastChunkFromShard", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ WriteUnitOfWork uow(opCtx);
+ serviceContext->getOpObserver()->onInternalOpMessage(opCtx,
+ collNss,
+ *collUUID,
+ BSON("msg" << oMessage),
+ o2Message,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
+ uow.commit();
+ });
+}
+
void persistCommitDecision(OperationContext* opCtx,
const MigrationCoordinatorDocument& migrationDoc) {
invariant(migrationDoc.getDecision() &&
@@ -1068,6 +1137,10 @@ void recoverMigrationCoordinations(OperationContext* opCtx,
coordinator.setMigrationDecision(DecisionEnum::kAborted);
} else {
coordinator.setMigrationDecision(DecisionEnum::kCommitted);
+ if (!currentMetadata.getChunkManager()->getVersion(doc.getDonorShardId()).isSet()) {
+ notifyChangeStreamsOnDonorLastChunk(
+ opCtx, doc.getNss(), doc.getDonorShardId(), doc.getCollectionUuid());
+ }
}
coordinator.completeMigration(opCtx, acquireCSOnRecipient);
diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h
index 73012403600..b5af6673729 100644
--- a/src/mongo/db/s/migration_util.h
+++ b/src/mongo/db/s/migration_util.h
@@ -282,5 +282,24 @@ void drainMigrationsPendingRecovery(OperationContext* opCtx);
void asyncRecoverMigrationUntilSuccessOrStepDown(OperationContext* opCtx,
const NamespaceString& nss) noexcept;
+/**
+ * This function writes a no-op message to the oplog when migrating a first chunk to the recipient
+ * (i.e., the recipient didn't have any * chunks), so that change stream will notice that and close
+ * the cursor in order to notify mongos to target the new shard as well.
+ */
+void notifyChangeStreamsOnRecipientFirstChunk(OperationContext* opCtx,
+ const NamespaceString& collNss,
+ const ShardId& fromShardId,
+ const ShardId& toShardId,
+ boost::optional<UUID> collUUID);
+
+/**
+ * This function writes a no-op message to the oplog when during migration the last chunk of the
+ * collection collNss is migrated off the off the donor and hence the donor has no more chunks.
+ */
+void notifyChangeStreamsOnDonorLastChunk(OperationContext* opCtx,
+ const NamespaceString& collNss,
+ const ShardId& donorShardId,
+ boost::optional<UUID> collUUID);
} // namespace migrationutil
} // namespace mongo