summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/storage_interface_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/storage_interface_impl.cpp')
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp24
1 files changed, 18 insertions, 6 deletions
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 1a90e3a57c8..22d7c7648e4 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/index_catalog.h"
+#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/exception_util.h"
@@ -323,12 +324,6 @@ template <typename AutoGetCollectionType>
StatusWith<const CollectionPtr*> getCollection(const AutoGetCollectionType& autoGetCollection,
const NamespaceStringOrUUID& nsOrUUID,
const std::string& message) {
- if (!autoGetCollection.getDb()) {
- StringData dbName = nsOrUUID.nss() ? nsOrUUID.nss()->db() : nsOrUUID.dbname();
- return {ErrorCodes::NamespaceNotFound,
- str::stream() << "Database [" << dbName << "] not found. " << message};
- }
-
const auto& collection = autoGetCollection.getCollection();
if (!collection) {
return {ErrorCodes::NamespaceNotFound,
@@ -347,6 +342,8 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
boost::optional<AutoGetOplog> autoOplog;
const CollectionPtr* collection;
+ bool shouldWriteToChangeCollections = false;
+
auto nss = nsOrUUID.nss();
if (nss && nss->isOplog()) {
// Simplify locking rules for oplog collection.
@@ -355,6 +352,9 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
if (!*collection) {
return {ErrorCodes::NamespaceNotFound, "Oplog collection does not exist"};
}
+
+ shouldWriteToChangeCollections =
+ ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive();
} else {
autoColl.emplace(opCtx, nsOrUUID, MODE_IX);
auto collectionResult = getCollection(
@@ -371,6 +371,18 @@ Status insertDocumentsSingleBatch(OperationContext* opCtx,
if (!status.isOK()) {
return status;
}
+
+ // Insert oplog entries to change collections if we are running in the serverless and the 'nss'
+ // is 'local.oplog.rs'.
+ if (shouldWriteToChangeCollections) {
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ status = changeCollectionManager.insertDocumentsToChangeCollection(
+ opCtx, begin, end, nullOpDebug);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
wunit.commit();
return Status::OK();