summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2022-06-10 19:03:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-10 19:48:46 +0000
commit09bb1491952e6e756746640fbfdfaddead59c61b (patch)
treecec59ed6242e97b56e5938cb9492262cec3e2d4d
parentea3e7127282a3d2974c39d7f0045ffa29d1dc426 (diff)
downloadmongo-09bb1491952e6e756746640fbfdfaddead59c61b.tar.gz
SERVER-66633 Record change collection entries in the secondaries.
-rw-r--r--jstests/replsets/write_change_stream_change_collection.js28
-rw-r--r--src/mongo/db/catalog_raii.cpp31
-rw-r--r--src/mongo/db/catalog_raii.h27
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp150
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h21
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp18
7 files changed, 227 insertions, 50 deletions
diff --git a/jstests/replsets/write_change_stream_change_collection.js b/jstests/replsets/write_change_stream_change_collection.js
index 32fce968ecc..d4e1a75d04b 100644
--- a/jstests/replsets/write_change_stream_change_collection.js
+++ b/jstests/replsets/write_change_stream_change_collection.js
@@ -8,18 +8,20 @@
(function() {
"use strict";
-const replSetTest = new ReplSetTest({nodes: 1});
+const replSetTest = new ReplSetTest({nodes: 2});
replSetTest.startSet({setParameter: "multitenancySupport=true"});
replSetTest.initiate();
const primary = replSetTest.getPrimary();
-const oplogColl = primary.getDB("local").oplog.rs;
-const changeColl = primary.getDB("config").system.change_collection;
+const secondary = replSetTest.getSecondary();
const testDb = primary.getDB("test");
// Verifies that the oplog and change collection entries are the same for the specified start and
// end duration of the oplog timestamp.
-function verifyChangeCollectionEntries(startOplogTimestamp, endOplogTimestamp) {
+function verifyChangeCollectionEntries(connection, startOplogTimestamp, endOplogTimestamp) {
+ const oplogColl = connection.getDB("local").oplog.rs;
+ const changeColl = connection.getDB("config").system.change_collection;
+
// Fetch all oplog and change collection entries for the duration: [startOplogTimestamp,
// endOplogTimestamp].
const oplogEntries =
@@ -69,6 +71,7 @@ function performWrites(coll) {
// Test the change collection entries with the oplog by performing some basic writes.
(function testBasicWritesInChangeCollection() {
+ const oplogColl = primary.getDB("local").oplog.rs;
const startOplogTimestamp = oplogColl.find().toArray().at(-1).ts;
assert(startOplogTimestamp != undefined);
@@ -79,11 +82,18 @@ function performWrites(coll) {
assert(endOplogTimestamp !== undefined);
assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0);
- verifyChangeCollectionEntries(startOplogTimestamp, endOplogTimestamp);
+ // Wait for the replication to finish.
+ replSetTest.awaitReplication();
+
+ // Verify that the change collection entries are the same as the oplog in the primary and the
+ // secondary node.
+ verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp);
+ verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp);
})();
// Test the change collection entries with the oplog by performing writes in a transaction.
(function testWritesinChangeCollectionWithTrasactions() {
+ const oplogColl = primary.getDB("local").oplog.rs;
const startOplogTimestamp = oplogColl.find().toArray().at(-1).ts;
assert(startOplogTimestamp != undefined);
@@ -97,7 +107,13 @@ function performWrites(coll) {
assert(endOplogTimestamp != undefined);
assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0);
- verifyChangeCollectionEntries(startOplogTimestamp, endOplogTimestamp);
+ // Wait for the replication to finish.
+ replSetTest.awaitReplication();
+
+ // Verify that the change collection entries are the same as the oplog in the primary and the
+ // secondary node for the applyOps.
+ verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp);
+ verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp);
})();
replSetTest.stopSet();
diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp
index 00106047492..2063cf2590b 100644
--- a/src/mongo/db/catalog_raii.cpp
+++ b/src/mongo/db/catalog_raii.cpp
@@ -596,4 +596,35 @@ AutoGetOplog::AutoGetOplog(OperationContext* opCtx, OplogAccessMode mode, Date_t
_oplog = &_oplogInfo->getCollection();
}
+
+AutoGetChangeCollection::AutoGetChangeCollection(OperationContext* opCtx,
+ AutoGetChangeCollection::AccessMode mode,
+ boost::optional<TenantId> tenantId,
+ Date_t deadline) {
+ auto nss = NamespaceString::makeChangeCollectionNSS(tenantId);
+ if (mode == AccessMode::kWrite) {
+ // The global lock must already be held.
+ invariant(opCtx->lockState()->isWriteLocked());
+
+ // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove
+ // 'AllowLockAcquisitionOnTimestampedUnitOfWork'.
+ AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
+ _coll.emplace(
+ opCtx, nss, LockMode::MODE_IX, AutoGetCollectionViewMode::kViewsForbidden, deadline);
+ }
+}
+
+const Collection* AutoGetChangeCollection::operator->() const {
+ return _coll ? _coll->getCollection().get() : nullptr;
+}
+
+const CollectionPtr& AutoGetChangeCollection::operator*() const {
+ return _coll->getCollection();
+}
+
+AutoGetChangeCollection::operator bool() const {
+ return _coll && _coll->getCollection().get();
+}
+
+
} // namespace mongo
diff --git a/src/mongo/db/catalog_raii.h b/src/mongo/db/catalog_raii.h
index ae1111041db..84121f53965 100644
--- a/src/mongo/db/catalog_raii.h
+++ b/src/mongo/db/catalog_raii.h
@@ -476,4 +476,31 @@ private:
const CollectionPtr* _oplog;
};
+/**
+ * A RAII-style class to acquire lock to a particular tenant's change collection.
+ *
+ * A change collection can be accessed in the following modes:
+ * kWrite - This mode assumes that the global IX lock is already held before writing to the change
+ * collection.
+ */
+class AutoGetChangeCollection {
+public:
+ enum class AccessMode { kWrite };
+
+ AutoGetChangeCollection(OperationContext* opCtx,
+ AccessMode mode,
+ boost::optional<TenantId> tenantId,
+ Date_t deadline = Date_t::max());
+
+ AutoGetChangeCollection(const AutoGetChangeCollection&) = delete;
+ AutoGetChangeCollection& operator=(const AutoGetChangeCollection&) = delete;
+
+ const Collection* operator->() const;
+ const CollectionPtr& operator*() const;
+ explicit operator bool() const;
+
+private:
+ boost::optional<AutoGetCollection> _coll;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index 840e045ebd9..e2872e3d815 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -48,6 +48,75 @@ namespace {
const auto getChangeCollectionManager =
ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>();
+/**
+ * Creates a Document object from the supplied oplog entry, performs necessary modifications to it
+ * and then returns it as a BSON object.
+ */
+BSONObj createChangeCollectionEntryFromOplog(const BSONObj& oplogEntry) {
+ Document oplogDoc(oplogEntry);
+ MutableDocument changeCollDoc(oplogDoc);
+ changeCollDoc["_id"] = Value(oplogDoc["ts"]);
+
+ auto readyChangeCollDoc = changeCollDoc.freeze();
+ return readyChangeCollDoc.toBson();
+}
+
+/**
+ * Helper to write insert statements to respective change collections based on tenant ids.
+ */
+class ChangeCollectionsWriter {
+public:
+ /**
+ * Adds the insert statement for the provided tenant that will be written to the change
+ * collection when the 'write()' method is called.
+ */
+ void add(const TenantId& tenantId, InsertStatement insertStatement) {
+ _tenantStatementsMap[tenantId].push_back(std::move(insertStatement));
+ }
+
+ /**
+ * Writes the batch of insert statements for each change collection. Bails out further writes if
+ * a failure is encountered in writing to a any change collection.
+ */
+ Status write(OperationContext* opCtx, OpDebug* opDebug) {
+ for (auto&& [tenantId, insertStatements] : _tenantStatementsMap) {
+ AutoGetChangeCollection tenantChangeCollection(
+ opCtx, AutoGetChangeCollection::AccessMode::kWrite, boost::none /* tenantId */);
+
+ // The change collection does not exist for a particular tenant because either the
+ // change collection is not enabled or is in the process of enablement. Ignore this
+ // insert for now.
+ // TODO: SERVER-65950 move this check before inserting to the map
+ // 'tenantToInsertStatements'.
+ if (!tenantChangeCollection) {
+ continue;
+ }
+
+ // Writes to the change collection should not be replicated.
+ repl::UnreplicatedWritesBlock unReplBlock(opCtx);
+
+ Status status = tenantChangeCollection->insertDocuments(opCtx,
+ insertStatements.begin(),
+ insertStatements.end(),
+ opDebug,
+ false /* fromMigrate */);
+ if (!status.isOK()) {
+ return Status(status.code(),
+ str::stream()
+ << "Write to change collection: " << tenantChangeCollection->ns()
+ << "failed, reason: " << status.reason());
+ }
+ }
+
+ return Status::OK();
+ }
+
+private:
+ // Maps inserts statements for each tenant.
+ stdx::unordered_map<TenantId, std::vector<InsertStatement>, TenantId::Hasher>
+ _tenantStatementsMap;
+};
+
} // namespace
ChangeStreamChangeCollectionManager& ChangeStreamChangeCollectionManager::get(
@@ -115,58 +184,57 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
// commiting the unit of work.
invariant(opCtx->lockState()->inAWriteUnitOfWork());
- // Maps statements that should be inserted to the change collection for each tenant.
- stdx::unordered_map<TenantId, std::vector<InsertStatement>, TenantId::Hasher>
- tenantToInsertStatements;
+ ChangeCollectionsWriter changeCollectionsWriter;
for (size_t idx = 0; idx < oplogRecords.size(); idx++) {
auto& record = oplogRecords[idx];
auto& ts = oplogTimestamps[idx];
- // Create a mutable document and update the '_id' field with the oplog entry timestamp. The
- // '_id' field will be use to order the change collection documents.
- Document oplogDoc(record.data.toBson());
- MutableDocument changeCollDoc(oplogDoc);
- changeCollDoc["_id"] = Value(ts);
-
// Create an insert statement that should be written at the timestamp 'ts' for a particular
// tenant.
- auto readyChangeCollDoc = changeCollDoc.freeze();
- tenantToInsertStatements[TenantId::kSystemTenantId].push_back(
- InsertStatement{readyChangeCollDoc.toBson(), ts, repl::OpTime::kUninitializedTerm});
+ auto changeCollDoc = createChangeCollectionEntryFromOplog(record.data.toBson());
+
+ // TODO SERVER-65950 replace 'TenantId::kSystemTenantId' with the tenant id.
+ changeCollectionsWriter.add(
+ TenantId::kSystemTenantId,
+ InsertStatement{std::move(changeCollDoc), ts, repl::OpTime::kUninitializedTerm});
}
- for (auto&& [tenantId, insertStatements] : tenantToInsertStatements) {
- // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove
- // 'AllowLockAcquisitionOnTimestampedUnitOfWork'.
- AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
- AutoGetCollection tenantChangeCollection(
- opCtx, NamespaceString::makeChangeCollectionNSS(tenantId), LockMode::MODE_IX);
-
- // The change collection does not exist for a particular tenant because either the change
- // collection is not enabled or is in the process of enablement. Ignore this insert for now.
- // TODO: SERVER-65950 move this check before inserting to the map
- // 'tenantToInsertStatements'.
- if (!tenantChangeCollection) {
- continue;
- }
+ // Write documents to change collections and throw exception in case of any failure.
+ Status status = changeCollectionsWriter.write(opCtx, nullptr /* opDebug */);
+ if (!status.isOK()) {
+ LOGV2_FATAL(
+ 6612300, "Failed to write to change collection", "reason"_attr = status.reason());
+ }
+}
- // Writes to the change collection should not be replicated.
- repl::UnreplicatedWritesBlock unReplBlock(opCtx);
-
- Status status = tenantChangeCollection->insertDocuments(opCtx,
- insertStatements.begin(),
- insertStatements.end(),
- nullptr /* opDebug */,
- false /* fromMigrate */);
- if (!status.isOK()) {
- LOGV2_FATAL(6612300,
- "Write to change collection: {ns} failed: {error}",
- "Write to change collection failed",
- "ns"_attr = tenantChangeCollection->ns().toString(),
- "error"_attr = status.toString());
- }
+Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
+ OperationContext* opCtx,
+ std::vector<InsertStatement>::const_iterator beginOplogEntries,
+ std::vector<InsertStatement>::const_iterator endOplogEntries,
+ OpDebug* opDebug) {
+ ChangeCollectionsWriter changeCollectionsWriter;
+
+ // Transform oplog entries to change collections entries and group them by tenant id.
+ for (auto oplogEntryIter = beginOplogEntries; oplogEntryIter != endOplogEntries;
+ oplogEntryIter++) {
+ auto& oplogDoc = oplogEntryIter->doc;
+
+ // The initial seed oplog insertion is not timestamped as such the 'oplogSlot' is not
+ // initialized. The corresponding change collection insertion will not be timestamped.
+ auto oplogSlot = oplogEntryIter->oplogSlot;
+
+ auto changeCollDoc = createChangeCollectionEntryFromOplog(oplogDoc);
+
+ // TODO SERVER-65950 replace 'TenantId::kSystemTenantId' with the tenant id.
+ changeCollectionsWriter.add(TenantId::kSystemTenantId,
+ InsertStatement{std::move(changeCollDoc),
+ oplogSlot.getTimestamp(),
+ oplogSlot.getTerm()});
}
+
+ // Write documents to change collections.
+ return changeCollectionsWriter.write(opCtx, opDebug);
}
} // namespace mongo
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
index 2006b8c9426..f9fe6d6f414 100644
--- a/src/mongo/db/change_stream_change_collection_manager.h
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -31,6 +31,7 @@
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/service_context.h"
namespace mongo {
@@ -86,9 +87,9 @@ public:
Status dropChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
/**
- * Inserts documents to change collections. The parameter 'oplogRecords'
- * is a vector of oplog records and the parameter 'oplogTimestamps' is a vector for respective
- * timestamp for each oplog record.
+ * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of oplog
+ * records and the parameter 'oplogTimestamps' is a vector for respective timestamp for each
+ * oplog record.
*
* The method fetches the tenant-id from the oplog entry, performs necessary modification to the
* document and then write to the tenant's change collection at the specified oplog timestamp.
@@ -101,6 +102,20 @@ public:
void insertDocumentsToChangeCollection(OperationContext* opCtx,
const std::vector<Record>& oplogRecords,
const std::vector<Timestamp>& oplogTimestamps);
+
+
+ /**
+ * Performs a range inserts on respective change collections using the oplog entries as
+ * specified by 'beginOplogEntries' and 'endOplogEntries'.
+ *
+ * Bails out if a failure is encountered in inserting documents to a particular change
+ * collection.
+ */
+ Status insertDocumentsToChangeCollection(
+ OperationContext* opCtx,
+ std::vector<InsertStatement>::const_iterator beginOplogEntries,
+ std::vector<InsertStatement>::const_iterator endOplogEntries,
+ OpDebug* opDebug);
};
} // namespace mongo
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index e9bcecfbdbf..4fa9735e923 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -261,6 +261,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/catalog/database_holder',
'$BUILD_DIR/mongo/db/catalog/multi_index_block',
+ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/dbhelpers',
@@ -619,6 +620,7 @@ env.Library(
'storage_interface',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/commands/mongod_fsync',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/storage/storage_control',
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 1a90e3a57c8..12240e945ba 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/index_catalog.h"
+#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/exception_util.h"
@@ -347,6 +348,8 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
boost::optional<AutoGetOplog> autoOplog;
const CollectionPtr* collection;
+ bool shouldWriteToChangeCollections = false;
+
auto nss = nsOrUUID.nss();
if (nss && nss->isOplog()) {
// Simplify locking rules for oplog collection.
@@ -355,6 +358,9 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
if (!*collection) {
return {ErrorCodes::NamespaceNotFound, "Oplog collection does not exist"};
}
+
+ shouldWriteToChangeCollections =
+ ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive();
} else {
autoColl.emplace(opCtx, nsOrUUID, MODE_IX);
auto collectionResult = getCollection(
@@ -371,6 +377,18 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
if (!status.isOK()) {
return status;
}
+
+ // Insert oplog entries to change collections if we are running in the serverless and the 'nss'
+ // is 'local.oplog.rs'.
+ if (shouldWriteToChangeCollections) {
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ status = changeCollectionManager.insertDocumentsToChangeCollection(
+ opCtx, begin, end, nullOpDebug);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
wunit.commit();
return Status::OK();