summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2022-07-08 11:10:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-08 11:48:27 +0000
commit04b55aa4c74a1ebf5e1ef50294f06e0b91acc45a (patch)
tree4d83a91a5c56ca1fc82239f261a731f77f57c523
parent92a93d167d0a8ab1c584bfc496cda3371419aa3b (diff)
downloadmongo-04b55aa4c74a1ebf5e1ef50294f06e0b91acc45a.tar.gz
SERVER-66634 Make changes to the startup recovery and initial-sync for the change collections.
-rw-r--r--jstests/serverless/basic_write_to_change_collection.js48
-rw-r--r--jstests/serverless/change_streams/basic_read_from_change_collection.js1
-rw-r--r--jstests/serverless/initial_sync_change_collection.js96
-rw-r--r--jstests/serverless/libs/change_collection_util.js47
-rw-r--r--jstests/serverless/write_to_change_collection_in_startup_recovery.js108
-rw-r--r--src/mongo/db/catalog_raii.cpp18
-rw-r--r--src/mongo/db/catalog_raii.h9
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp22
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h1
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp4
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp103
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp50
-rw-r--r--src/mongo/db/storage/storage_util.h63
13 files changed, 438 insertions, 132 deletions
diff --git a/jstests/serverless/basic_write_to_change_collection.js b/jstests/serverless/basic_write_to_change_collection.js
index 0105f33d344..401c8916880 100644
--- a/jstests/serverless/basic_write_to_change_collection.js
+++ b/jstests/serverless/basic_write_to_change_collection.js
@@ -1,13 +1,14 @@
// Tests that entries are written to the change collection for collection create, drop and document
// modification operations.
// @tags: [
-// multiversion_incompatible,
// featureFlagMongoStore,
// requires_fcv_61,
// ]
(function() {
"use strict";
+load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries.
+
const replSetTest = new ReplSetTest({nodes: 2});
// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
@@ -21,51 +22,6 @@ const primary = replSetTest.getPrimary();
const secondary = replSetTest.getSecondary();
const testDb = primary.getDB("test");
-// Verifies that the oplog and change collection entries are the same for the specified start and
-// end duration of the oplog timestamp.
-function verifyChangeCollectionEntries(connection, startOplogTimestamp, endOplogTimestamp) {
- const oplogColl = connection.getDB("local").oplog.rs;
- const changeColl = connection.getDB("config").system.change_collection;
-
- // Fetch all oplog and change collection entries for the duration: [startOplogTimestamp,
- // endOplogTimestamp].
- const oplogEntries =
- oplogColl.find({$and: [{ts: {$gte: startOplogTimestamp}}, {ts: {$lte: endOplogTimestamp}}]})
- .toArray();
- const changeCollectionEntries =
- changeColl
- .find({$and: [{_id: {$gte: startOplogTimestamp}}, {_id: {$lte: endOplogTimestamp}}]})
- .toArray();
-
- assert.eq(
- oplogEntries.length,
- changeCollectionEntries.length,
- "Number of entries in the oplog and the change collection is not the same. Oplog has total " +
- oplogEntries.length + " entries , change collection has total " +
- changeCollectionEntries.length + " entries");
-
- for (let idx = 0; idx < oplogEntries.length; idx++) {
- const oplogEntry = oplogEntries[idx];
- const changeCollectionEntry = changeCollectionEntries[idx];
-
- // Remove the '_id' field from the change collection as oplog does not have it.
- assert(changeCollectionEntry.hasOwnProperty("_id"));
- assert.eq(timestampCmp(changeCollectionEntry._id, oplogEntry.ts),
- 0,
- "Change collection '_id' field: " + tojson(changeCollectionEntry._id) +
- " is not same as the oplog 'ts' field: " + tojson(oplogEntry.ts));
- delete changeCollectionEntry["_id"];
-
- // Verify that the oplog and change collecton entry (after removing the '_id') field are
- // the same.
- assert.eq(
- oplogEntry,
- changeCollectionEntry,
- "Oplog and change collection entries are not same. Oplog entry: " + tojson(oplogEntry) +
- ", change collection entry: " + tojson(changeCollectionEntry));
- }
-}
-
// Performs writes on the specified collection.
function performWrites(coll) {
const docIds = [1, 2, 3, 4, 5];
diff --git a/jstests/serverless/change_streams/basic_read_from_change_collection.js b/jstests/serverless/change_streams/basic_read_from_change_collection.js
index fa457e96905..6c2edc1da4d 100644
--- a/jstests/serverless/change_streams/basic_read_from_change_collection.js
+++ b/jstests/serverless/change_streams/basic_read_from_change_collection.js
@@ -1,7 +1,6 @@
// Tests that a change stream can be opened on a change collection when one exists, and that an
// exception is thrown if we attempt to open a stream while change streams are disabled.
// @tags: [
-// multiversion_incompatible,
// featureFlagMongoStore,
// requires_fcv_61,
// assumes_against_mongod_not_mongos,
diff --git a/jstests/serverless/initial_sync_change_collection.js b/jstests/serverless/initial_sync_change_collection.js
new file mode 100644
index 00000000000..81dd18b3a93
--- /dev/null
+++ b/jstests/serverless/initial_sync_change_collection.js
@@ -0,0 +1,96 @@
+// Tests that the data cloning phase of initial sync does not clone the change collection documents
+// and when the initial sync has completed the change collection and oplog entries are exactly same
+// in the new secondary.
+// @tags: [
+// featureFlagServerlessChangeStreams,
+// featureFlagMongoStore,
+// requires_fcv_61,
+// ]
+//
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js"); // For waitForFailPoint.
+load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries.
+
+const replSetTest = new ReplSetTest({nodes: 1});
+
+// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
+// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'.
+replSetTest.startSet(
+ {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})});
+
+replSetTest.initiate();
+
+const primary = replSetTest.getPrimary();
+const primaryChangeColl = primary.getDB("config").system.change_collection;
+
+const mdbStockPriceDoc = {
+ _id: "mdb",
+ price: 250
+};
+
+// The document 'mdbStockPriceDoc' is inserted before starting the initial sync. As such the
+// document 'mdbStockPriceDoc' should not be cloned in the secondary after initial sync is complete.
+assert.commandWorked(primary.getDB("test").stockPrice.insert(mdbStockPriceDoc));
+assert.eq(primaryChangeColl.find({o: mdbStockPriceDoc}).toArray().length, 1);
+
+// Add a new secondary to the replica set and block the initial sync after the data cloning is done.
+const secondary = replSetTest.add({
+ setParameter: {
+ // Hang after the data cloning phase is completed.
+ "failpoint.initialSyncHangAfterDataCloning": tojson({mode: "alwaysOn"}),
+ "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"})
+ }
+});
+
+replSetTest.reInitiate();
+
+// Wait for the cloning phase to complete. The cloning phase should not clone documents of the
+// change collection from the primary.
+assert.commandWorked(secondary.adminCommand({
+ waitForFailPoint: "initialSyncHangAfterDataCloning",
+ timesEntered: 1,
+ maxTimeMS: kDefaultWaitForFailPointTimeout
+}));
+
+const tslaStockPriceDoc = {
+ _id: "tsla",
+ price: 650
+};
+
+// The document 'tslaStockPriceDoc' is inserted in the primary after the data cloning phase has
+// completed, as such this should be inserted in the secondary's change change collection.
+assert.commandWorked(primary.getDB("test").stockPrice.insert(tslaStockPriceDoc));
+assert.eq(primaryChangeColl.find({o: tslaStockPriceDoc}).toArray().length, 1);
+
+// Unblock the initial sync process.
+assert.commandWorked(secondary.getDB("test").adminCommand(
+ {configureFailPoint: "initialSyncHangAfterDataCloning", mode: "off"}));
+
+// Wait for the initial sync to complete.
+replSetTest.waitForState(secondary, ReplSetTest.State.SECONDARY);
+
+// Verify that the document 'mdbStockPriceDoc' does not exist and the document 'tslaStockPriceDoc'
+// exists in the secondary's change collection.
+const changeCollDocs =
+ secondary.getDB("config")
+ .system.change_collection.find({$or: [{o: mdbStockPriceDoc}, {o: tslaStockPriceDoc}]})
+ .toArray();
+assert.eq(changeCollDocs.length, 1);
+assert.eq(changeCollDocs[0].o, tslaStockPriceDoc);
+
+// Get the timestamp of the first and the last entry from the secondary's oplog.
+const oplogDocs = secondary.getDB("local").oplog.rs.find().toArray();
+assert.gt(oplogDocs.length, 0);
+const startOplogTimestamp = oplogDocs[0].ts;
+const endOplogTimestamp = oplogDocs.at(-1).ts;
+
+// The change collection gets created at the data cloning phase and documents are written to the
+// oplog only after the data cloning is done. And so, the change collection already exists in place
+// to capture all oplog entries. As such, the change collection entries and the oplog entries from
+// the 'startOplogTimestamp' to the 'endOplogTimestamp' must be exactly the same.
+verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp);
+
+replSetTest.stopSet();
+})();
diff --git a/jstests/serverless/libs/change_collection_util.js b/jstests/serverless/libs/change_collection_util.js
new file mode 100644
index 00000000000..4026ea84f81
--- /dev/null
+++ b/jstests/serverless/libs/change_collection_util.js
@@ -0,0 +1,47 @@
+// Contains functions for testing the change collections.
+
+// Verifies that the oplog and change collection entries are the same for the specified start and
+// end duration of the oplog timestamp.
+function verifyChangeCollectionEntries(connection, startOplogTimestamp, endOplogTimestamp) {
+ const oplogColl = connection.getDB("local").oplog.rs;
+ const changeColl = connection.getDB("config").system.change_collection;
+
+ // Fetch all oplog and change collection entries for the duration: [startOplogTimestamp,
+ // endOplogTimestamp].
+ const oplogEntries =
+ oplogColl.find({$and: [{ts: {$gte: startOplogTimestamp}}, {ts: {$lte: endOplogTimestamp}}]})
+ .toArray();
+ const changeCollectionEntries =
+ changeColl
+ .find({$and: [{_id: {$gte: startOplogTimestamp}}, {_id: {$lte: endOplogTimestamp}}]})
+ .toArray();
+
+ assert.eq(
+ oplogEntries.length,
+ changeCollectionEntries.length,
+ "Number of entries in the oplog and the change collection is not the same. Oplog has total " +
+ oplogEntries.length + " entries , change collection has total " +
+ changeCollectionEntries.length + " entries" +
+ "change collection entries " + tojson(changeCollectionEntries));
+
+ for (let idx = 0; idx < oplogEntries.length; idx++) {
+ const oplogEntry = oplogEntries[idx];
+ const changeCollectionEntry = changeCollectionEntries[idx];
+
+ // Remove the '_id' field from the change collection as oplog does not have it.
+ assert(changeCollectionEntry.hasOwnProperty("_id"));
+ assert.eq(timestampCmp(changeCollectionEntry._id, oplogEntry.ts),
+ 0,
+ "Change collection '_id' field: " + tojson(changeCollectionEntry._id) +
+ " is not same as the oplog 'ts' field: " + tojson(oplogEntry.ts));
+ delete changeCollectionEntry["_id"];
+
+ // Verify that the oplog and change collecton entry (after removing the '_id') field are
+ // the same.
+ assert.eq(
+ oplogEntry,
+ changeCollectionEntry,
+ "Oplog and change collection entries are not same. Oplog entry: " + tojson(oplogEntry) +
+ ", change collection entry: " + tojson(changeCollectionEntry));
+ }
+}
diff --git a/jstests/serverless/write_to_change_collection_in_startup_recovery.js b/jstests/serverless/write_to_change_collection_in_startup_recovery.js
new file mode 100644
index 00000000000..a14b5e28600
--- /dev/null
+++ b/jstests/serverless/write_to_change_collection_in_startup_recovery.js
@@ -0,0 +1,108 @@
+// Tests that replaying the oplog entries during the startup recovery also writes to the change
+// collection.
+// @tags: [
+// featureFlagServerlessChangeStreams,
+// multiversion_incompatible,
+// featureFlagMongoStore,
+// ]
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js"); // For configureFailPoint.
+load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries.
+
+const replSetTest = new ReplSetTest({nodes: 1});
+
+// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
+// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'.
+replSetTest.startSet(
+ {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})});
+
+replSetTest.initiate();
+
+let primary = replSetTest.getPrimary();
+
+// Insert a document to the collection and then capture the corresponding oplog timestamp. This
+// timestamp will be the start timestamp beyond (inclusive) which we will validate the oplog and the
+// change collection entries.
+const startTimestamp = assert
+ .commandWorked(primary.getDB("test").runCommand(
+ {insert: "seedCollection", documents: [{_id: "beginTs"}]}))
+ .operationTime;
+
+// Pause the checkpointing, as such non-journaled collection including the change collection will
+// not be persisted.
+const pauseCheckpointThreadFailPoint = configureFailPoint(primary, "pauseCheckpointThread");
+pauseCheckpointThreadFailPoint.wait();
+
+// Insert a document to the collection.
+assert.commandWorked(primary.getDB("test").stockPrice.insert({_id: "mdb", price: 250}));
+
+// Verify that the inserted document can be queried from the 'stockPrice', the 'oplog.rs', and
+// the 'system.change_collection'.
+assert.eq(primary.getDB("test").stockPrice.find({_id: "mdb", price: 250}).toArray().length, 1);
+assert.eq(primary.getDB("local")
+ .oplog.rs.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
+ .toArray()
+ .length,
+ 1);
+assert.eq(primary.getDB("config")
+ .system.change_collection.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
+ .toArray()
+ .length,
+ 1);
+
+// Perform ungraceful shutdown of the primary node and do not clean the db path directory.
+replSetTest.stop(0, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL}, {forRestart: true});
+
+// Run a new mongoD instance with db path pointing to the replica set primary db directory.
+const standalone =
+ MongoRunner.runMongod({dbpath: primary.dbpath, noReplSet: true, noCleanData: true});
+assert.neq(null, standalone, "Fail to restart the node as standalone");
+
+// Verify that the inserted document does not exist both in the 'stockPrice' and
+// the 'system.change_collection' but exists in the 'oplog.rs'.
+assert.eq(standalone.getDB("test").stockPrice.find({_id: "mdb", price: 250}).toArray().length, 0);
+assert.eq(standalone.getDB("local")
+ .oplog.rs.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
+ .toArray()
+ .length,
+ 1);
+assert.eq(standalone.getDB("config")
+ .system.change_collection.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
+ .toArray()
+ .length,
+ 0);
+
+// Stop the mongoD instance and do not clean the db directory.
+MongoRunner.stopMongod(standalone, null, {noCleanData: true, skipValidation: true, wait: true});
+
+// Start the replica set primary with the same db path.
+replSetTest.start(primary, {
+ noCleanData: true,
+ setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})
+});
+
+primary = replSetTest.getPrimary();
+
+// Verify that the 'stockPrice' and the 'system.change_collection' now have the inserted document.
+// This document was inserted by applying oplog entries during the startup recovery.
+assert.eq(primary.getDB("test").stockPrice.find({_id: "mdb", price: 250}).toArray().length, 1);
+assert.eq(primary.getDB("config")
+ .system.change_collection.find({ns: "test.stockPrice", o: {_id: "mdb", price: 250}})
+ .toArray()
+ .length,
+ 1);
+
+// Get the oplog timestamp up to this point. All oplog entries upto this timestamp must exist in the
+// change collection.
+const endTimestamp = primary.getDB("local").oplog.rs.find().toArray().at(-1).ts;
+assert(endTimestamp !== undefined);
+
+// Verify that the oplog and the change collection entries between the ['startTimestamp',
+// 'endTimestamp'] window are exactly same and in the same order.
+verifyChangeCollectionEntries(primary, startTimestamp, endTimestamp);
+
+replSetTest.stopSet();
+})();
diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp
index c3c87a95844..e3ae1132263 100644
--- a/src/mongo/db/catalog_raii.cpp
+++ b/src/mongo/db/catalog_raii.cpp
@@ -572,17 +572,19 @@ AutoGetChangeCollection::AutoGetChangeCollection(OperationContext* opCtx,
AutoGetChangeCollection::AccessMode mode,
boost::optional<TenantId> tenantId,
Date_t deadline) {
- auto nss = NamespaceString::makeChangeCollectionNSS(tenantId);
- if (mode == AccessMode::kWrite) {
+ if (mode == AccessMode::kWriteInOplogContext) {
// The global lock must already be held.
invariant(opCtx->lockState()->isWriteLocked());
-
- // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove
- // 'AllowLockAcquisitionOnTimestampedUnitOfWork'.
- AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
- _coll.emplace(
- opCtx, nss, LockMode::MODE_IX, AutoGetCollectionViewMode::kViewsForbidden, deadline);
}
+
+ // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove
+ // 'AllowLockAcquisitionOnTimestampedUnitOfWork'.
+ AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
+ _coll.emplace(opCtx,
+ NamespaceString::makeChangeCollectionNSS(tenantId),
+ LockMode::MODE_IX,
+ AutoGetCollectionViewMode::kViewsForbidden,
+ deadline);
}
const Collection* AutoGetChangeCollection::operator->() const {
diff --git a/src/mongo/db/catalog_raii.h b/src/mongo/db/catalog_raii.h
index 2c48422f8fb..62d63404e9f 100644
--- a/src/mongo/db/catalog_raii.h
+++ b/src/mongo/db/catalog_raii.h
@@ -474,12 +474,15 @@ private:
* A RAII-style class to acquire lock to a particular tenant's change collection.
*
* A change collection can be accessed in the following modes:
- * kWrite - This mode assumes that the global IX lock is already held before writing to the change
- * collection.
+ * kWriteInOplogContext - perform writes to the change collection by taking the IX lock on a
+ * tenant's change collection. The change collection is written along with
+ * the oplog in the same 'WriteUnitOfWork' and assumes that the global IX
+ * lock is already held.
+ * kWrite - takes the IX lock on a tenant's change collection to perform any writes.
*/
class AutoGetChangeCollection {
public:
- enum class AccessMode { kWrite };
+ enum class AccessMode { kWriteInOplogContext, kWrite };
AutoGetChangeCollection(OperationContext* opCtx,
AccessMode mode,
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index 9a1efe8b8f7..5a79d14953f 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -71,6 +71,8 @@ BSONObj createChangeCollectionEntryFromOplog(const BSONObj& oplogEntry) {
*/
class ChangeCollectionsWriter {
public:
+ explicit ChangeCollectionsWriter(const AutoGetChangeCollection::AccessMode& accessMode)
+ : _accessMode{accessMode} {}
/**
* Adds the insert statement for the provided tenant that will be written to the change
* collection when the 'write()' method is called.
@@ -88,7 +90,7 @@ public:
Status write(OperationContext* opCtx, OpDebug* opDebug) {
for (auto&& [tenantId, insertStatements] : _tenantStatementsMap) {
AutoGetChangeCollection tenantChangeCollection(
- opCtx, AutoGetChangeCollection::AccessMode::kWrite, boost::none /* tenantId */);
+ opCtx, _accessMode, boost::none /* tenantId */);
// The change collection does not exist for a particular tenant because either the
// change collection is not enabled or is in the process of enablement. Ignore this
@@ -143,6 +145,9 @@ private:
return true;
}
+ // Mode required to access change collections.
+ const AutoGetChangeCollection::AccessMode _accessMode;
+
// Maps inserts statements for each tenant.
stdx::unordered_map<TenantId, std::vector<InsertStatement>, TenantId::Hasher>
_tenantStatementsMap;
@@ -220,7 +225,8 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
// commiting the unit of work.
invariant(opCtx->lockState()->inAWriteUnitOfWork());
- ChangeCollectionsWriter changeCollectionsWriter;
+ ChangeCollectionsWriter changeCollectionsWriter{
+ AutoGetChangeCollection::AccessMode::kWriteInOplogContext};
for (size_t idx = 0; idx < oplogRecords.size(); idx++) {
auto& record = oplogRecords[idx];
@@ -248,8 +254,18 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator beginOplogEntries,
std::vector<InsertStatement>::const_iterator endOplogEntries,
+ bool isGlobalIXLockAcquired,
OpDebug* opDebug) {
- ChangeCollectionsWriter changeCollectionsWriter;
+ // This method must be called within a 'WriteUnitOfWork'. The caller must be responsible for
+ // commiting the unit of work.
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
+ // If the global IX lock is already acquired, then change collections entries will be written
+ // within the oplog context as such acquire the correct access mode for change collections.
+ const auto changeCollAccessMode = isGlobalIXLockAcquired
+ ? AutoGetChangeCollection::AccessMode::kWriteInOplogContext
+ : AutoGetChangeCollection::AccessMode::kWrite;
+ ChangeCollectionsWriter changeCollectionsWriter{changeCollAccessMode};
// Transform oplog entries to change collections entries and group them by tenant id.
for (auto oplogEntryIter = beginOplogEntries; oplogEntryIter != endOplogEntries;
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
index f9fe6d6f414..37a9dbaef27 100644
--- a/src/mongo/db/change_stream_change_collection_manager.h
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -115,6 +115,7 @@ public:
OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator beginOplogEntries,
std::vector<InsertStatement>::const_iterator endOplogEntries,
+ bool isGlobalIXLockAcquired,
OpDebug* opDebug);
};
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index e380fbe6238..8a141ac0214 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -109,8 +109,8 @@ CollectionCloner::CollectionCloner(const NamespaceString& sourceNss,
BaseCloner::ClonerStages CollectionCloner::getStages() {
if (_sourceNss.isChangeStreamPreImagesCollection() || _sourceNss.isChangeCollection()) {
- // Only the change stream pre-images collection and change collection needs to be created -
- // its documents should not be copied.
+ // The change stream pre-images collection and the change collection only need to be created
+ // - their documents should not be copied.
return {&_listIndexesStage,
&_createCollectionStage,
&_setupIndexBuildersForUnfinishedIndexesStage};
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 5eed8504f83..f7403a15209 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/logical_session_id.h"
@@ -46,6 +47,7 @@
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/db/storage/control/journal_flusher.h"
+#include "mongo/db/storage/storage_util.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/basic.h"
#include "mongo/util/fail_point.h"
@@ -126,6 +128,47 @@ void _addOplogChainOpsToWriterVectors(OperationContext* opCtx,
opCtx, &derivedOps->back(), writerVectors, collPropertiesCache, shouldSerialize);
}
+Status _insertDocumentsToOplogAndChangeCollections(
+ OperationContext* opCtx,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool skipWritesToOplog) {
+ WriteUnitOfWork wunit(opCtx);
+
+ if (!skipWritesToOplog) {
+ AutoGetOplog autoOplog(opCtx, OplogAccessMode::kWrite);
+ auto& oplogColl = autoOplog.getCollection();
+ if (!oplogColl) {
+ return {ErrorCodes::NamespaceNotFound, "Oplog collection does not exist"};
+ }
+
+ auto status = oplogColl->insertDocuments(
+ opCtx, begin, end, nullptr /* OpDebug */, false /* fromMigrate */);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
+ // Write the corresponding oplog entries to tenants respective change
+ // collections in the serverless.
+ if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ auto status =
+ ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection(
+ opCtx,
+ begin,
+ end,
+ !skipWritesToOplog /* hasAcquiredGlobalIXLock */,
+ nullptr /* OpDebug */);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
+ wunit.commit();
+
+ return Status::OK();
+}
+
} // namespace
@@ -365,19 +408,27 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) {
}
-// Schedules the writes to the oplog for 'ops' into threadPool. The caller must guarantee that
-// 'ops' stays valid until all scheduled work in the thread pool completes.
-void scheduleWritesToOplog(OperationContext* opCtx,
- StorageInterface* storageInterface,
- ThreadPool* writerPool,
- const std::vector<OplogEntry>& ops) {
- auto makeOplogWriterForRange = [storageInterface, &ops](size_t begin, size_t end) {
- // The returned function will be run in a separate thread after this returns. Therefore all
- // captures other than 'ops' must be by value since they will not be available. The caller
- // guarantees that 'ops' will stay in scope until the spawned threads complete.
- return [storageInterface, &ops, begin, end](auto status) {
- invariant(status);
+// Schedules the writes to the oplog and the change collection for 'ops' into threadPool. The caller
+// must guarantee that 'ops' stays valid until all scheduled work in the thread pool completes.
+void scheduleWritesToOplogAndChangeCollection(OperationContext* opCtx,
+ StorageInterface* storageInterface,
+ ThreadPool* writerPool,
+ const std::vector<OplogEntry>& ops,
+ bool skipWritesToOplog) {
+ // Skip performing any writes during the startup recovery when running in the non-serverless
+ // environment.
+ if (skipWritesToOplog &&
+ !ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ return;
+ }
+ auto makeOplogWriterForRange = [storageInterface, &ops, skipWritesToOplog](size_t begin,
+ size_t end) {
+ // The returned function will be run in a separate thread after this returns. Therefore
+ // all captures other than 'ops' must be by value since they will not be available. The
+ // caller guarantees that 'ops' will stay in scope until the spawned threads complete.
+ return [storageInterface, &ops, begin, end, skipWritesToOplog](auto status) {
+ invariant(status);
auto opCtx = cc().makeOperationContext();
// This code path is only executed on secondaries and initial syncing nodes, so it is
@@ -396,9 +447,23 @@ void scheduleWritesToOplog(OperationContext* opCtx,
ops[i].getOpTime().getTerm()});
}
- fassert(40141,
- storageInterface->insertDocuments(
- opCtx.get(), NamespaceString::kRsOplogNamespace, docs));
+ // TODO SERVER-67168 the 'nsOrUUID' is used only to log the debug message when retrying
+ // inserts on the oplog and change collections. The 'writeConflictRetry' assumes
+ // operations are done on a single namespace. But the method
+ // '_insertDocumentsToOplogAndChangeCollections' can perform inserts on the oplog and
+ // multiple change collections, ie. several namespaces. As such 'writeConflictRetry'
+ // will not log the correct namespace when retrying. Refactor this code to log the
+ // correct namespace in the log message.
+ NamespaceStringOrUUID nsOrUUID = !skipWritesToOplog
+ ? NamespaceString::kRsOplogNamespace
+ : NamespaceString::makeChangeCollectionNSS(boost::none /* tenantId */);
+
+ fassert(6663400,
+ storage_helpers::insertBatchAndHandleRetry(
+ opCtx.get(), nsOrUUID, docs, [&](auto* opCtx, auto begin, auto end) {
+ return _insertDocumentsToOplogAndChangeCollections(
+ opCtx, begin, end, skipWritesToOplog);
+ }));
};
};
@@ -463,9 +528,11 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
if (!getOptions().skipWritesToOplog) {
_consistencyMarkers->setOplogTruncateAfterPoint(
opCtx, _replCoord->getMyLastAppliedOpTime().getTimestamp());
- scheduleWritesToOplog(opCtx, _storageInterface, _writerPool, ops);
}
+ scheduleWritesToOplogAndChangeCollection(
+ opCtx, _storageInterface, _writerPool, ops, getOptions().skipWritesToOplog);
+
// Holds 'pseudo operations' generated by secondaries to aid in replication.
// Keep in scope until all operations in 'ops' and 'derivedOps' have been applied.
// Pseudo operations include:
@@ -520,8 +587,8 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
auto opCtx = cc().makeOperationContext();
- // This code path is only executed on secondaries and initial syncing nodes,
- // so it is safe to exclude any writes from Flow Control.
+ // This code path is only executed on secondaries and initial syncing nodes, so
+ // it is safe to exclude any writes from Flow Control.
opCtx->setShouldParticipateInFlowControl(false);
opCtx->setEnforceConstraints(false);
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index b2a0560d145..9cfc1b0f4e7 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -82,6 +82,7 @@
#include "mongo/db/storage/control/journal_flusher.h"
#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/oplog_cap_maintainer_thread.h"
+#include "mongo/db/storage/storage_util.h"
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/background.h"
@@ -342,8 +343,6 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
boost::optional<AutoGetOplog> autoOplog;
const CollectionPtr* collection;
- bool shouldWriteToChangeCollections = false;
-
auto nss = nsOrUUID.nss();
if (nss && nss->isOplog()) {
// Simplify locking rules for oplog collection.
@@ -352,9 +351,6 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
if (!*collection) {
return {ErrorCodes::NamespaceNotFound, "Oplog collection does not exist"};
}
-
- shouldWriteToChangeCollections =
- ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive();
} else {
autoColl.emplace(opCtx, nsOrUUID, MODE_IX);
auto collectionResult = getCollection(
@@ -372,17 +368,6 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
return status;
}
- // Insert oplog entries to change collections if we are running in the serverless and the 'nss'
- // is 'local.oplog.rs'.
- if (shouldWriteToChangeCollections) {
- auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
- status = changeCollectionManager.insertDocumentsToChangeCollection(
- opCtx, begin, end, nullOpDebug);
- if (!status.isOK()) {
- return status;
- }
- }
-
wunit.commit();
return Status::OK();
@@ -393,35 +378,10 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
Status StorageInterfaceImpl::insertDocuments(OperationContext* opCtx,
const NamespaceStringOrUUID& nsOrUUID,
const std::vector<InsertStatement>& docs) {
- if (docs.size() > 1U) {
- try {
- if (insertDocumentsSingleBatch(opCtx, nsOrUUID, docs.cbegin(), docs.cend()).isOK()) {
- return Status::OK();
- }
- } catch (...) {
- // Ignore this failure and behave as-if we never tried to do the combined batch insert.
- // The loop below will handle reporting any non-transient errors.
- }
- }
-
- // Try to insert the batch one-at-a-time because the batch failed all-at-once inserting.
- for (auto it = docs.cbegin(); it != docs.cend(); ++it) {
- auto status = writeConflictRetry(
- opCtx, "StorageInterfaceImpl::insertDocuments", nsOrUUID.toString(), [&] {
- auto status = insertDocumentsSingleBatch(opCtx, nsOrUUID, it, it + 1);
- if (!status.isOK()) {
- return status;
- }
-
- return Status::OK();
- });
-
- if (!status.isOK()) {
- return status;
- }
- }
-
- return Status::OK();
+ return storage_helpers::insertBatchAndHandleRetry(
+ opCtx, nsOrUUID, docs, [&](auto* opCtx, auto begin, auto end) {
+ return insertDocumentsSingleBatch(opCtx, nsOrUUID, begin, end);
+ });
}
Status StorageInterfaceImpl::dropReplicatedDatabases(OperationContext* opCtx) {
diff --git a/src/mongo/db/storage/storage_util.h b/src/mongo/db/storage/storage_util.h
index 3d8b59bb79a..d6eaeea2f6f 100644
--- a/src/mongo/db/storage/storage_util.h
+++ b/src/mongo/db/storage/storage_util.h
@@ -29,16 +29,13 @@
#pragma once
+#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/record_id.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/util/uuid.h"
namespace mongo {
-
-class Collection;
-class Ident;
-class OperationContext;
-class NamespaceString;
-
namespace catalog {
/**
@@ -89,4 +86,58 @@ Status dropCollection(OperationContext* opCtx,
} // namespace catalog
+
+namespace storage_helpers {
+
+/**
+ * Inserts the batch of documents 'docs' using the provided callable object 'insertFn'.
+ *
+ * 'insertFnType' type should be Callable and have the following call signature:
+ * Status insertFn(OperationContext* opCtx,
+ * std::vector<InsertStatement>::const_iterator begin,
+ * std::vector<InsertStatement>::const_iterator end);
+ *
+ * where 'begin' (inclusive) and 'end' (exclusive) are the iterators for the range of documents
+ * 'docs'.
+ *
+ * The function first attempts to insert documents as one batch. If the insertion fails, then it
+ * falls back to inserting documents one at a time. The insertion is retried in case of write
+ * conflicts.
+ */
+template <typename insertFnType>
+Status insertBatchAndHandleRetry(OperationContext* opCtx,
+ const NamespaceStringOrUUID& nsOrUUID,
+ const std::vector<InsertStatement>& docs,
+ insertFnType&& insertFn) {
+ if (docs.size() > 1U) {
+ try {
+ if (insertFn(opCtx, docs.cbegin(), docs.cend()).isOK()) {
+ return Status::OK();
+ }
+ } catch (...) {
+ // Ignore this failure and behave as-if we never tried to do the combined batch insert.
+ // The loop below will handle reporting any non-transient errors.
+ }
+ }
+
+ // Try to insert the batch one-at-a-time because the batch failed all-at-once inserting.
+ for (auto it = docs.cbegin(); it != docs.cend(); ++it) {
+ auto status = writeConflictRetry(opCtx, "batchInsertDocuments", nsOrUUID.toString(), [&] {
+ auto status = insertFn(opCtx, it, it + 1);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return Status::OK();
+ });
+
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
+ return Status::OK();
+}
+} // namespace storage_helpers
+
} // namespace mongo