summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2022-05-21 16:49:51 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-21 17:17:31 +0000
commit85573be23d02bb95a38cf314294abbfc57b71c38 (patch)
treed97588780ff9a64c327ccb54e40bc8f9fe659ff6 /src
parent18ec8376222bd7afe8485441af2c3aba3130ea2e (diff)
downloadmongo-85573be23d02bb95a38cf314294abbfc57b71c38.tar.gz
SERVER-65209 Skeleton code to create change collection.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript14
-rw-r--r--src/mongo/db/catalog/create_collection.cpp77
-rw-r--r--src/mongo/db/catalog/create_collection.h9
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp102
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h90
-rw-r--r--src/mongo/db/mongod_main.cpp3
-rw-r--r--src/mongo/db/namespace_string.cpp11
-rw-r--r--src/mongo/db/namespace_string.h8
-rw-r--r--src/mongo/db/query/query_feature_flags.idl5
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp12
11 files changed, 291 insertions, 41 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index ce6d1d91268..1cdea6aa03b 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -522,6 +522,19 @@ env.Library(
)
env.Library(
+ target='change_stream_change_collection_manager',
+ source=[
+ 'change_stream_change_collection_manager.cpp'
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog/catalog_helpers',
+ "$BUILD_DIR/mongo/db/catalog/clustered_collection_options",
+ '$BUILD_DIR/mongo/db/dbhelpers',
+ '$BUILD_DIR/mongo/db/service_context',
+ ]
+)
+
+env.Library(
target='write_block_bypass',
source=[
'write_block_bypass.cpp',
@@ -2501,6 +2514,7 @@ env.Library(
# please add that library as a private libdep of
# mongod_initializers.
'$BUILD_DIR/mongo/client/clientdriver_minimal',
+ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/change_stream_options_manager',
'$BUILD_DIR/mongo/db/pipeline/change_stream_expired_pre_image_remover',
'$BUILD_DIR/mongo/idl/cluster_server_parameter',
diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp
index ffa70dc447a..de921ee027b 100644
--- a/src/mongo/db/catalog/create_collection.cpp
+++ b/src/mongo/db/catalog/create_collection.cpp
@@ -540,46 +540,6 @@ Status _createCollection(OperationContext* opCtx,
});
}
-/**
- * Creates the collection or the view as described by 'options'.
- */
-Status createCollection(OperationContext* opCtx,
- const NamespaceString& ns,
- const CollectionOptions& options,
- const boost::optional<BSONObj>& idIndex) {
- auto status = userAllowedCreateNS(opCtx, ns);
- if (!status.isOK()) {
- return status;
- }
-
- if (options.isView()) {
- uassert(ErrorCodes::OperationNotSupportedInTransaction,
- str::stream() << "Cannot create a view in a multi-document "
- "transaction.",
- !opCtx->inMultiDocumentTransaction());
- uassert(ErrorCodes::Error(6026500),
- "The 'clusteredIndex' option is not supported with views",
- !options.clusteredIndex);
-
- return _createView(opCtx, ns, options);
- } else if (options.timeseries && !ns.isTimeseriesBucketsCollection()) {
- // This helper is designed for user-created time-series collections on primaries. If a
- // time-series buckets collection is created explicitly or during replication, treat this as
- // a normal collection creation.
- uassert(ErrorCodes::OperationNotSupportedInTransaction,
- str::stream()
- << "Cannot create a time-series collection in a multi-document transaction.",
- !opCtx->inMultiDocumentTransaction());
- return _createTimeseries(opCtx, ns, options);
- } else {
- uassert(ErrorCodes::OperationNotSupportedInTransaction,
- str::stream() << "Cannot create system collection " << ns
- << " within a transaction.",
- !opCtx->inMultiDocumentTransaction() || !ns.isSystem());
- return _createCollection(opCtx, ns, options, idIndex);
- }
-}
-
CollectionOptions clusterByDefaultIfNecessary(const NamespaceString& nss,
CollectionOptions collectionOptions,
const boost::optional<BSONObj>& idIndex) {
@@ -853,4 +813,41 @@ Status createCollectionForApplyOps(OperationContext* opCtx,
opCtx, newCollName, newCmd, idIndex, CollectionOptions::parseForStorage);
}
+Status createCollection(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const CollectionOptions& options,
+ const boost::optional<BSONObj>& idIndex) {
+ auto status = userAllowedCreateNS(opCtx, ns);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ if (options.isView()) {
+ uassert(ErrorCodes::OperationNotSupportedInTransaction,
+ str::stream() << "Cannot create a view in a multi-document "
+ "transaction.",
+ !opCtx->inMultiDocumentTransaction());
+ uassert(ErrorCodes::Error(6026500),
+ "The 'clusteredIndex' option is not supported with views",
+ !options.clusteredIndex);
+
+ return _createView(opCtx, ns, options);
+ } else if (options.timeseries && !ns.isTimeseriesBucketsCollection()) {
+ // This helper is designed for user-created time-series collections on primaries. If a
+ // time-series buckets collection is created explicitly or during replication, treat this as
+ // a normal collection creation.
+ uassert(ErrorCodes::OperationNotSupportedInTransaction,
+ str::stream()
+ << "Cannot create a time-series collection in a multi-document transaction.",
+ !opCtx->inMultiDocumentTransaction());
+ return _createTimeseries(opCtx, ns, options);
+ } else {
+ uassert(ErrorCodes::OperationNotSupportedInTransaction,
+ str::stream() << "Cannot create system collection " << ns
+ << " within a transaction.",
+ !opCtx->inMultiDocumentTransaction() || !ns.isSystem());
+ return _createCollection(opCtx, ns, options, idIndex);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/catalog/create_collection.h b/src/mongo/db/catalog/create_collection.h
index ee373c7ef7b..694f4c6ed36 100644
--- a/src/mongo/db/catalog/create_collection.h
+++ b/src/mongo/db/catalog/create_collection.h
@@ -55,6 +55,15 @@ Status createCollection(OperationContext* opCtx,
const NamespaceString& ns,
const CreateCommand& cmd);
+
+/**
+ * Creates the collection or the view as described by 'options'.
+ */
+Status createCollection(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const CollectionOptions& options,
+ const boost::optional<BSONObj>& idIndex);
+
/**
* Creates the change stream pre-images collection. The collection is clustered by the primary key,
* _id.
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
new file mode 100644
index 00000000000..e27c5af2af2
--- /dev/null
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -0,0 +1,102 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/change_stream_change_collection_manager.h"
+
+#include "mongo/db/catalog/clustered_collection_util.h"
+#include "mongo/db/catalog/coll_mod.h"
+#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/catalog/drop_collection.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+namespace {
+const auto getChangeCollectionManager =
+ ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>();
+} // namespace
+
+ChangeStreamChangeCollectionManager& ChangeStreamChangeCollectionManager::get(
+ ServiceContext* service) {
+ return *getChangeCollectionManager(service);
+}
+
+ChangeStreamChangeCollectionManager& ChangeStreamChangeCollectionManager::get(
+ OperationContext* opCtx) {
+ return *getChangeCollectionManager(opCtx->getServiceContext());
+}
+
+void ChangeStreamChangeCollectionManager::create(ServiceContext* service) {
+ getChangeCollectionManager(service).emplace(service);
+}
+
+Status ChangeStreamChangeCollectionManager::createChangeCollection(
+ OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ // TODO: SERVER-65950 create or update the change collection for a particular tenant.
+ const NamespaceString nss{NamespaceString::kConfigDb,
+ NamespaceString::kChangeStreamChangeCollection};
+
+ // Make the change collection clustered by '_id'. The '_id' field will have the same value as
+ // the 'ts' field of the oplog.
+ CollectionOptions changeCollectionOptions;
+ changeCollectionOptions.clusteredIndex.emplace(clustered_util::makeDefaultClusteredIdIndex());
+ changeCollectionOptions.capped = true;
+
+ auto status = createCollection(opCtx, nss, changeCollectionOptions, BSONObj());
+ if (status.code() == ErrorCodes::NamespaceExists) {
+ return Status(ErrorCodes::Error::OK, "");
+ }
+
+ return status;
+}
+
+Status ChangeStreamChangeCollectionManager::dropChangeCollection(
+ OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ // TODO: SERVER-65950 remove the change collection for a particular tenant.
+ const NamespaceString nss{NamespaceString::kConfigDb,
+ NamespaceString::kChangeStreamChangeCollection};
+ DropReply dropReply;
+ return dropCollection(
+ opCtx, nss, &dropReply, DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
+}
+
+Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
+ OperationContext* opCtx,
+ boost::optional<TenantId> tenantId,
+ std::vector<Record>* records,
+ const std::vector<Timestamp>& timestamps) {
+ // TODO SERVER-65210 add code to insert to the change collection in the primaries.
+ return Status(ErrorCodes::OK, "");
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
new file mode 100644
index 00000000000..b4ad0e25c50
--- /dev/null
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/catalog/collection_catalog.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+
+namespace mongo {
+
+/**
+ * Manages the creation, deletion and insertion lifecycle of the change collection.
+ */
+class ChangeStreamChangeCollectionManager {
+public:
+ explicit ChangeStreamChangeCollectionManager(ServiceContext* service) {}
+
+ ~ChangeStreamChangeCollectionManager() = default;
+
+ /**
+ * Creates an instance of the class using the service-context.
+ */
+ static void create(ServiceContext* service);
+
+ /**
+ * Gets the instance of the class using the service context.
+ */
+ static ChangeStreamChangeCollectionManager& get(ServiceContext* service);
+
+ /**
+ * Gets the instance of the class using the operation context.
+ */
+ static ChangeStreamChangeCollectionManager& get(OperationContext* opCtx);
+
+ /**
+ * Creates a change collection for the specified tenant, if it doesn't exist. Returns Status::OK
+ * if the change collection already exists.
+ *
+ * TODO: SERVER-65950 make tenantId field mandatory.
+ */
+ Status createChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
+
+ /**
+ * Deletes the change collection for the specified tenant, if it already exist.
+ *
+ * TODO: SERVER-65950 make tenantId field mandatory.
+ */
+ Status dropChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
+
+ /**
+ * Inserts documents to the change collection for the specified tenant. The parameter 'records'
+ * is a vector of oplog records and the parameter 'timestamps' is a vector for respective
+ * timestamp for each oplog record.
+ *
+ * TODO: SERVER-65950 make tenantId field mandatory.
+ */
+ Status insertDocumentsToChangeCollection(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId,
+ std::vector<Record>* records,
+ const std::vector<Timestamp>& timestamps);
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index 2f8a439bb10..be1bd6fc294 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -60,6 +60,7 @@
#include "mongo/db/catalog/health_log.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog/index_key_validate.h"
+#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_options_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/client_metadata_propagation_egress_hook.h"
@@ -1542,6 +1543,8 @@ int mongod_main(int argc, char* argv[]) {
ReadWriteConcernDefaults::create(service, readWriteConcernDefaultsCacheLookupMongoD);
ChangeStreamOptionsManager::create(service);
+ ChangeStreamChangeCollectionManager::create(service);
+
#if defined(_WIN32)
if (ntservice::shouldStartService()) {
ntservice::startService();
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 26f9249996e..525dd9c6724 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -243,6 +243,10 @@ bool NamespaceString::isLegalClientSystemNS(
return true;
}
+ if (isChangeStreamChangeCollection()) {
+ return true;
+ }
+
return false;
}
@@ -400,6 +404,10 @@ bool NamespaceString::isChangeStreamPreImagesCollection() const {
return ns() == kChangeStreamPreImagesNamespace.ns();
}
+bool NamespaceString::isChangeStreamChangeCollection() const {
+ return db() == kConfigDb && coll() == kChangeStreamChangeCollection;
+}
+
bool NamespaceString::isConfigImagesCollection() const {
return ns() == kConfigImagesNamespace.ns();
}
@@ -424,7 +432,8 @@ NamespaceString NamespaceString::getTimeseriesViewNamespace() const {
}
bool NamespaceString::isImplicitlyReplicated() const {
- if (isChangeStreamPreImagesCollection() || isConfigImagesCollection() || isChangeCollection()) {
+ if (isChangeStreamPreImagesCollection() || isConfigImagesCollection() || isChangeCollection() ||
+ isChangeStreamChangeCollection()) {
// Implicitly replicated namespaces are replicated, although they only replicate a subset of
// writes.
invariant(isReplicated());
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 38606ae3d4f..692d768dcca 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -76,6 +76,9 @@ public:
// Name for the system views collection
static constexpr StringData kSystemDotViewsCollectionName = "system.views"_sd;
+ // Name for the change stream change collection.
+ static constexpr StringData kChangeStreamChangeCollection = "system.change_collection"_sd;
+
// Names of privilege document collections
static constexpr StringData kSystemUsers = "system.users"_sd;
static constexpr StringData kSystemRoles = "system.roles"_sd;
@@ -456,6 +459,11 @@ public:
bool isChangeStreamPreImagesCollection() const;
/**
+ * Returns whether the specified namespace is config.system.changeCollection.
+ */
+ bool isChangeStreamChangeCollection() const;
+
+ /**
* Returns whether the specified namespace is config.image_collection.
*/
bool isConfigImagesCollection() const;
diff --git a/src/mongo/db/query/query_feature_flags.idl b/src/mongo/db/query/query_feature_flags.idl
index 87379efcf87..f9b7ddb8ec8 100644
--- a/src/mongo/db/query/query_feature_flags.idl
+++ b/src/mongo/db/query/query_feature_flags.idl
@@ -146,3 +146,8 @@ feature_flags:
description: "Enables creation of a new columnstore index type"
cpp_varname: gFeatureFlagColumnstoreIndexes
default: false
+
+ featureFlagServerlessChangeStreams:
+ description: "Feature flag to enable reading change events from the change collection rather than the oplog"
+ cpp_varname: gFeatureFlagServerlessChangeStreams
+ default: false \ No newline at end of file
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index b7c4f4c2a88..f12237cf395 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1506,6 +1506,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/catalog/local_oplog_info',
+ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/commands/mongod_fcv',
'$BUILD_DIR/mongo/db/commands/rwc_defaults_commands',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index a46671d6c62..25982c4e638 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/local_oplog_info.h"
+#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/commands/rwc_defaults_commands_gen.h"
@@ -556,6 +557,17 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
createChangeStreamPreImagesCollection(opCtx);
}
+ // TODO: SERVER-65948 move the change collection creation logic from here to the PM-2502 hooks.
+ // The change collection will be created when the change stream is enabled.
+ if (::mongo::feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ auto status = changeCollectionManager.createChangeCollection(opCtx, boost::none);
+ if (!status.isOK()) {
+ fassert(6520900, status);
+ }
+ }
+
serverGlobalParams.validateFeaturesAsPrimary.store(true);
return opTimeToReturn;