summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorauto-revert-processor <dev-prod-dag@mongodb.com>2022-05-28 10:44:15 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-28 11:11:32 +0000
commitb154f8ea0332b8829f2c7f46a8ea84c2b86f3a45 (patch)
treec0fc89a75b5c27a82d3122f0c0caacdb65c8d96c /src
parentc9e3facf5dd94fd39066f54d44ee846f750c988b (diff)
downloadmongo-b154f8ea0332b8829f2c7f46a8ea84c2b86f3a45.tar.gz
Revert "SERVER-66123 Introduce logic to write to the change collection in the primary."
This reverts commit 1a5a5419426d512a8648914206776025beb496c4.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp7
-rw-r--r--src/mongo/db/catalog/collection_validation.cpp1
-rw-r--r--src/mongo/db/catalog/validate_state.cpp4
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp93
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h17
-rw-r--r--src/mongo/db/namespace_string.cpp9
-rw-r--r--src/mongo/db/namespace_string.h7
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp6
-rw-r--r--src/mongo/db/repl/oplog.cpp8
10 files changed, 40 insertions, 113 deletions
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index ecb86d77238..a920e11d85c 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -1046,11 +1046,8 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx,
}
}
- // TODO SERVER-66813 fix issue with batch deletion.
- if (!ns().isChangeCollection()) {
- opCtx->getServiceContext()->getOpObserver()->onInserts(
- opCtx, ns(), uuid(), begin, end, fromMigrate);
- }
+ opCtx->getServiceContext()->getOpObserver()->onInserts(
+ opCtx, ns(), uuid(), begin, end, fromMigrate);
_cappedDeleteAsNeeded(opCtx, records.begin()->id);
diff --git a/src/mongo/db/catalog/collection_validation.cpp b/src/mongo/db/catalog/collection_validation.cpp
index 7a3ad0c5dd2..610d7c39ca7 100644
--- a/src/mongo/db/catalog/collection_validation.cpp
+++ b/src/mongo/db/catalog/collection_validation.cpp
@@ -566,6 +566,7 @@ Status validate(OperationContext* opCtx,
BSONObjBuilder* output,
bool turnOnExtraLoggingForTest) {
invariant(!opCtx->lockState()->isLocked() || storageGlobalParams.repair);
+
// This is deliberately outside of the try-catch block, so that any errors thrown in the
// constructor fail the cmd, as opposed to returning OK with valid:false.
ValidateState validateState(opCtx, nss, mode, repairMode, turnOnExtraLoggingForTest);
diff --git a/src/mongo/db/catalog/validate_state.cpp b/src/mongo/db/catalog/validate_state.cpp
index 09b597752cf..88d66867751 100644
--- a/src/mongo/db/catalog/validate_state.cpp
+++ b/src/mongo/db/catalog/validate_state.cpp
@@ -107,13 +107,11 @@ ValidateState::ValidateState(OperationContext* opCtx,
bool ValidateState::shouldEnforceFastCount() const {
if (_mode == ValidateMode::kForegroundFullEnforceFastCount) {
- if (_nss.isOplog() || _nss.isChangeCollection()) {
+ if (_nss.isOplog()) {
// Oplog writers only take a global IX lock, so the oplog can still be written to even
// during full validation despite its collection X lock. This can cause validate to
// incorrectly report an incorrect fast count on the oplog when run in enforceFastCount
// mode.
- // The oplog entries are also written to the change collections and are prone to fast
- // count failures.
return false;
} else if (_nss == NamespaceString::kIndexBuildEntryNamespace) {
// Do not enforce fast count on the 'config.system.indexBuilds' collection. This is an
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index 0954a7af635..e27c5af2af2 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -37,21 +37,13 @@
#include "mongo/db/catalog/coll_mod.h"
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog/drop_collection.h"
-#include "mongo/db/catalog_raii.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/repl/oplog.h"
#include "mongo/logv2/log.h"
namespace mongo {
namespace {
const auto getChangeCollectionManager =
ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>();
-
-// TODO: SERVER-65950 create or update the change collection for a particular tenant.
-NamespaceString getTenantChangeCollectionNamespace(boost::optional<TenantId> tenantId) {
- return NamespaceString{NamespaceString::kConfigDb, NamespaceString::kChangeCollectionName};
-}
-
} // namespace
ChangeStreamChangeCollectionManager& ChangeStreamChangeCollectionManager::get(
@@ -70,16 +62,19 @@ void ChangeStreamChangeCollectionManager::create(ServiceContext* service) {
Status ChangeStreamChangeCollectionManager::createChangeCollection(
OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ // TODO: SERVER-65950 create or update the change collection for a particular tenant.
+ const NamespaceString nss{NamespaceString::kConfigDb,
+ NamespaceString::kChangeStreamChangeCollection};
+
// Make the change collection clustered by '_id'. The '_id' field will have the same value as
// the 'ts' field of the oplog.
CollectionOptions changeCollectionOptions;
changeCollectionOptions.clusteredIndex.emplace(clustered_util::makeDefaultClusteredIdIndex());
changeCollectionOptions.capped = true;
- auto status = createCollection(
- opCtx, getTenantChangeCollectionNamespace(tenantId), changeCollectionOptions, BSONObj());
+ auto status = createCollection(opCtx, nss, changeCollectionOptions, BSONObj());
if (status.code() == ErrorCodes::NamespaceExists) {
- return Status::OK();
+ return Status(ErrorCodes::Error::OK, "");
}
return status;
@@ -87,75 +82,21 @@ Status ChangeStreamChangeCollectionManager::createChangeCollection(
Status ChangeStreamChangeCollectionManager::dropChangeCollection(
OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ // TODO: SERVER-65950 remove the change collection for a particular tenant.
+ const NamespaceString nss{NamespaceString::kConfigDb,
+ NamespaceString::kChangeStreamChangeCollection};
DropReply dropReply;
- return dropCollection(opCtx,
- getTenantChangeCollectionNamespace(tenantId),
- &dropReply,
- DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
+ return dropCollection(
+ opCtx, nss, &dropReply, DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
}
-void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
+Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
OperationContext* opCtx,
- const std::vector<Record>& oplogRecords,
- const std::vector<Timestamp>& oplogTimestamps) {
- invariant(oplogRecords.size() == oplogTimestamps.size());
-
- // This method must be called within a 'WriteUnitOfWork'. The caller must be responsible for
- // commiting the unit of work.
- invariant(opCtx->lockState()->inAWriteUnitOfWork());
-
- // Maps statements that should be inserted to the change collection for each tenant.
- stdx::unordered_map<TenantId, std::vector<InsertStatement>, TenantId::Hasher>
- tenantToInsertStatements;
-
- for (size_t idx = 0; idx < oplogRecords.size(); idx++) {
- auto& record = oplogRecords[idx];
- auto& ts = oplogTimestamps[idx];
-
- // Create a mutable document and update the '_id' field with the oplog entry timestamp. The
- // '_id' field will be use to order the change collection documents.
- Document oplogDoc(record.data.toBson());
- MutableDocument changeCollDoc(oplogDoc);
- changeCollDoc["_id"] = Value(ts);
-
- // Create an insert statement that should be written at the timestamp 'ts' for a particular
- // tenant.
- auto readyChangeCollDoc = changeCollDoc.freeze();
- tenantToInsertStatements[TenantId::kSystemTenantId].push_back(
- InsertStatement{readyChangeCollDoc.toBson(), ts, repl::OpTime::kUninitializedTerm});
- }
-
- for (auto&& [tenantId, insertStatements] : tenantToInsertStatements) {
- // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove
- // 'AllowLockAcquisitionOnTimestampedUnitOfWork'.
- AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
- AutoGetCollection tenantChangeCollection(
- opCtx, getTenantChangeCollectionNamespace(tenantId), LockMode::MODE_IX);
-
- // The change collection does not exist for a particular tenant because either the change
- // collection is not enabled or is in the process of enablement. Ignore this insert for now.
- // TODO: SERVER-65950 move this check before inserting to the map
- // 'tenantToInsertStatements'.
- if (!tenantChangeCollection) {
- continue;
- }
-
- // Writes to the change collection should not be replicated.
- repl::UnreplicatedWritesBlock unReplBlock(opCtx);
-
- Status status = tenantChangeCollection->insertDocuments(opCtx,
- insertStatements.begin(),
- insertStatements.end(),
- nullptr /* opDebug */,
- false /* fromMigrate */);
- if (!status.isOK()) {
- LOGV2_FATAL(6612300,
- "Write to change collection: {ns} failed: {error}",
- "Write to change collection failed",
- "ns"_attr = tenantChangeCollection->ns().toString(),
- "error"_attr = status.toString());
- }
- }
+ boost::optional<TenantId> tenantId,
+ std::vector<Record>* records,
+ const std::vector<Timestamp>& timestamps) {
+ // TODO SERVER-65210 add code to insert to the change collection in the primaries.
+ return Status(ErrorCodes::OK, "");
}
} // namespace mongo
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
index 6b7de7da109..b4ad0e25c50 100644
--- a/src/mongo/db/change_stream_change_collection_manager.h
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -75,21 +75,16 @@ public:
Status dropChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
/**
- * Inserts documents to change collections. The parameter 'oplogRecords'
- * is a vector of oplog records and the parameter 'oplogTimestamps' is a vector for respective
+ * Inserts documents to the change collection for the specified tenant. The parameter 'records'
+ * is a vector of oplog records and the parameter 'timestamps' is a vector for respective
* timestamp for each oplog record.
*
- * The method fetches the tenant-id from the oplog entry, performs necessary modification to the
- * document and then write to the tenant's change collection at the specified oplog timestamp.
- *
- * Failure in insertion to any change collection will result in a fatal exception and will bring
- * down the node.
- *
* TODO: SERVER-65950 make tenantId field mandatory.
*/
- void insertDocumentsToChangeCollection(OperationContext* opCtx,
- const std::vector<Record>& oplogRecords,
- const std::vector<Timestamp>& oplogTimestamps);
+ Status insertDocumentsToChangeCollection(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId,
+ std::vector<Record>* records,
+ const std::vector<Timestamp>& timestamps);
};
} // namespace mongo
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index d16370da310..525dd9c6724 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -243,7 +243,7 @@ bool NamespaceString::isLegalClientSystemNS(
return true;
}
- if (isChangeCollection()) {
+ if (isChangeStreamChangeCollection()) {
return true;
}
@@ -404,8 +404,8 @@ bool NamespaceString::isChangeStreamPreImagesCollection() const {
return ns() == kChangeStreamPreImagesNamespace.ns();
}
-bool NamespaceString::isChangeCollection() const {
- return db() == kConfigDb && coll() == kChangeCollectionName;
+bool NamespaceString::isChangeStreamChangeCollection() const {
+ return db() == kConfigDb && coll() == kChangeStreamChangeCollection;
}
bool NamespaceString::isConfigImagesCollection() const {
@@ -432,7 +432,8 @@ NamespaceString NamespaceString::getTimeseriesViewNamespace() const {
}
bool NamespaceString::isImplicitlyReplicated() const {
- if (isChangeStreamPreImagesCollection() || isConfigImagesCollection() || isChangeCollection()) {
+ if (isChangeStreamPreImagesCollection() || isConfigImagesCollection() || isChangeCollection() ||
+ isChangeStreamChangeCollection()) {
// Implicitly replicated namespaces are replicated, although they only replicate a subset of
// writes.
invariant(isReplicated());
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index dd6842974f0..687fb431bb1 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -77,7 +77,7 @@ public:
static constexpr StringData kSystemDotViewsCollectionName = "system.views"_sd;
// Name for the change stream change collection.
- static constexpr StringData kChangeCollectionName = "system.change_collection"_sd;
+ static constexpr StringData kChangeStreamChangeCollection = "system.change_collection"_sd;
// Names of privilege document collections
static constexpr StringData kSystemUsers = "system.users"_sd;
@@ -384,6 +384,9 @@ public:
bool isSystemDotProfile() const {
return coll() == "system.profile";
}
+ bool isChangeCollection() const {
+ return (db() == kConfigDb) && coll().startsWith("changes.");
+ }
bool isSystemDotViews() const {
return coll() == kSystemDotViewsCollectionName;
}
@@ -458,7 +461,7 @@ public:
/**
* Returns whether the specified namespace is config.system.changeCollection.
*/
- bool isChangeCollection() const;
+ bool isChangeStreamChangeCollection() const;
/**
* Returns whether the specified namespace is config.image_collection.
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index e9bcecfbdbf..3379a0d34d1 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -73,7 +73,6 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/index_build_oplog_entry',
'$BUILD_DIR/mongo/db/catalog/local_oplog_info',
'$BUILD_DIR/mongo/db/catalog/multi_index_block',
- '$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/db_raii',
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index bde00eef906..8800297cb4d 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -108,9 +108,9 @@ CollectionCloner::CollectionCloner(const NamespaceString& sourceNss,
}
BaseCloner::ClonerStages CollectionCloner::getStages() {
- if (_sourceNss.isChangeStreamPreImagesCollection() || _sourceNss.isChangeCollection()) {
- // Only the change stream pre-images collection and change collection needs to be created -
- // its documents should not be copied.
+ if (_sourceNss.isChangeStreamPreImagesCollection()) {
+ // Only the change stream pre-images collection needs to be created - its documents should
+ // not be copied.
return {&_listIndexesStage,
&_createCollectionStage,
&_setupIndexBuildersForUnfinishedIndexesStage};
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 48a9ab150da..7f521591c0c 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -58,7 +58,6 @@
#include "mongo/db/catalog/local_oplog_info.h"
#include "mongo/db/catalog/multi_index_block.h"
#include "mongo/db/catalog/rename_collection.h"
-#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/coll_mod_gen.h"
#include "mongo/db/commands.h"
@@ -388,13 +387,6 @@ void _logOpsInner(OperationContext* opCtx,
"error"_attr = result.toString());
}
- // Insert the oplog records to the respective tenants change collections.
- if (::mongo::feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled(
- serverGlobalParams.featureCompatibility)) {
- auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
- changeCollectionManager.insertDocumentsToChangeCollection(opCtx, *records, timestamps);
- }
-
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
opCtx->recoveryUnit()->onCommit(
[opCtx, replCoord, finalOpTime, wallTime](boost::optional<Timestamp> commitTime) {