summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2022-08-11 17:00:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-11 18:34:34 +0000
commit7be22a4e693ed231f217c7670de076dc1960b238 (patch)
tree5e1b3e5901dd9f05e09cf85a6061d4aa3c3f34a9 /src/mongo
parentf387fa8cc75b9129fb1d57d302ed424349990d5e (diff)
downloadmongo-7be22a4e693ed231f217c7670de076dc1960b238.tar.gz
SERVER-66631 Implement command to enable and disable the change stream.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript34
-rw-r--r--src/mongo/db/auth/action_type.idl2
-rw-r--r--src/mongo/db/auth/builtin_roles.yml2
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp45
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h12
-rw-r--r--src/mongo/db/change_stream_pre_images_collection_manager.cpp7
-rw-r--r--src/mongo/db/change_stream_pre_images_collection_manager.h7
-rw-r--r--src/mongo/db/change_stream_state.idl62
-rw-r--r--src/mongo/db/commands/SConscript4
-rw-r--r--src/mongo/db/commands/change_stream_state_command.cpp179
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp7
-rw-r--r--src/mongo/db/exec/collection_scan.cpp5
-rw-r--r--src/mongo/db/mongod_main.cpp5
-rw-r--r--src/mongo/db/namespace_string.cpp3
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp9
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/primary_only_service_util.cpp128
-rw-r--r--src/mongo/db/repl/primary_only_service_util.h146
-rw-r--r--src/mongo/db/repl/primary_only_service_util_test.cpp211
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp16
-rw-r--r--src/mongo/db/set_change_stream_state_coordinator.cpp232
-rw-r--r--src/mongo/db/set_change_stream_state_coordinator.h101
-rw-r--r--src/mongo/db/set_change_stream_state_coordinator.idl61
-rw-r--r--src/mongo/platform/mutex.h4
25 files changed, 1246 insertions, 41 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index d8068bdd0e9..6f2cc3a531f 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -520,13 +520,44 @@ env.Library(
)
env.Library(
+ target='change_stream_state',
+ source=['change_stream_state.idl'],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/idl/idl_parser',
+ ],
+)
+
+env.Library(
+ target='set_change_stream_state_coordinator',
+ source=[
+ 'set_change_stream_state_coordinator.cpp',
+ 'set_change_stream_state_coordinator.idl',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_state',
+ '$BUILD_DIR/mongo/db/dbdirectclient',
+ '$BUILD_DIR/mongo/db/logical_session_id',
+ '$BUILD_DIR/mongo/db/logical_session_id_helpers',
+ '$BUILD_DIR/mongo/db/repl/primary_only_service',
+ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
+ '$BUILD_DIR/mongo/db/repl/wait_for_majority_service',
+ '$BUILD_DIR/mongo/db/rw_concern_d',
+ '$BUILD_DIR/mongo/idl/idl_parser',
+ ],
+)
+
+env.Library(
target='change_stream_change_collection_manager',
source=[
'change_stream_change_collection_manager.cpp',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
- "$BUILD_DIR/mongo/db/catalog/clustered_collection_options",
+ '$BUILD_DIR/mongo/db/catalog/clustered_collection_options',
+ '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/service_context',
],
@@ -2367,6 +2398,7 @@ env.Library(
'$BUILD_DIR/mongo/db/change_stream_options_manager',
'$BUILD_DIR/mongo/db/change_streams_cluster_parameter',
'$BUILD_DIR/mongo/db/pipeline/change_stream_expired_pre_image_remover',
+ '$BUILD_DIR/mongo/db/set_change_stream_state_coordinator',
'$BUILD_DIR/mongo/idl/cluster_server_parameter',
'$BUILD_DIR/mongo/idl/cluster_server_parameter_op_observer',
'$BUILD_DIR/mongo/s/grid',
diff --git a/src/mongo/db/auth/action_type.idl b/src/mongo/db/auth/action_type.idl
index c5328ec0617..ad1fb2cb0f9 100644
--- a/src/mongo/db/auth/action_type.idl
+++ b/src/mongo/db/auth/action_type.idl
@@ -186,6 +186,8 @@ enums:
viewRole : "viewRole"
viewUser : "viewUser"
applyOps : "applyOps"
+ setChangeStreamState: "setChangeStreamState"
+ getChangeStreamState: "getChangeStreamState"
# In 'MatchType' the extra_data field "serverlessActionTypes" is used
# by the AuthorizationSession while in multitenancy mode to determine
diff --git a/src/mongo/db/auth/builtin_roles.yml b/src/mongo/db/auth/builtin_roles.yml
index ae2c1aa57dd..64202331747 100644
--- a/src/mongo/db/auth/builtin_roles.yml
+++ b/src/mongo/db/auth/builtin_roles.yml
@@ -334,6 +334,8 @@ roles:
- setFreeMonitoring
- setClusterParameter
- getClusterParameter
+ - setChangeStreamState
+ - getChangeStreamState
- matchType: any_normal
actions: &clusterManagerRoleDatabaseActions
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index 5a79d14953f..9dbf6937b49 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/db/multitenancy_gen.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/oplog.h"
@@ -187,32 +188,44 @@ bool ChangeStreamChangeCollectionManager::hasChangeCollection(
opCtx, NamespaceString::makeChangeCollectionNSS(tenantId)));
}
-Status ChangeStreamChangeCollectionManager::createChangeCollection(
+bool ChangeStreamChangeCollectionManager::isChangeStreamEnabled(
+ OperationContext* opCtx, boost::optional<TenantId> tenantId) const {
+ // A change stream in the serverless is declared as enabled if both the change collection and
+ // the pre-images collection exist for the provided tenant.
+ return isChangeCollectionsModeActive() && hasChangeCollection(opCtx, tenantId) &&
+ ChangeStreamPreImagesCollectionManager::hasPreImagesCollection(opCtx, tenantId);
+}
+
+void ChangeStreamChangeCollectionManager::createChangeCollection(
OperationContext* opCtx, boost::optional<TenantId> tenantId) {
// Make the change collection clustered by '_id'. The '_id' field will have the same value as
// the 'ts' field of the oplog.
CollectionOptions changeCollectionOptions;
changeCollectionOptions.clusteredIndex.emplace(clustered_util::makeDefaultClusteredIdIndex());
changeCollectionOptions.capped = true;
+ const auto changeCollNss = NamespaceString::makeChangeCollectionNSS(tenantId);
- auto status = createCollection(opCtx,
- NamespaceString::makeChangeCollectionNSS(tenantId),
- changeCollectionOptions,
- BSONObj());
- if (status.code() == ErrorCodes::NamespaceExists) {
- return Status::OK();
- }
-
- return status;
+ const auto status = createCollection(opCtx, changeCollNss, changeCollectionOptions, BSONObj());
+ uassert(status.code(),
+ str::stream() << "Failed to create change collection: " << changeCollNss
+ << causedBy(status.reason()),
+ status.isOK() || status.code() == ErrorCodes::NamespaceExists);
}
-Status ChangeStreamChangeCollectionManager::dropChangeCollection(
- OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext* opCtx,
+ boost::optional<TenantId> tenantId) {
DropReply dropReply;
- return dropCollection(opCtx,
- NamespaceString::makeChangeCollectionNSS(tenantId),
- &dropReply,
- DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
+ const auto changeCollNss = NamespaceString::makeChangeCollectionNSS(tenantId);
+
+ const auto status =
+ dropCollection(opCtx,
+ changeCollNss,
+ &dropReply,
+ DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
+ uassert(status.code(),
+ str::stream() << "Failed to drop change collection: " << changeCollNss
+ << causedBy(status.reason()),
+ status.isOK() || status.code() == ErrorCodes::NamespaceNotFound);
}
void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
index 37a9dbaef27..b6836ac0c1e 100644
--- a/src/mongo/db/change_stream_change_collection_manager.h
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -72,19 +72,23 @@ public:
bool hasChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId) const;
/**
- * Creates a change collection for the specified tenant, if it doesn't exist. Returns Status::OK
- * if the change collection already exists.
+ * Returns true if the change stream is enabled for the provided tenant, false otherwise.
+ */
+ bool isChangeStreamEnabled(OperationContext* opCtx, boost::optional<TenantId> tenantId) const;
+
+ /**
+ * Creates a change collection for the specified tenant, if it doesn't exist.
*
* TODO: SERVER-65950 make tenantId field mandatory.
*/
- Status createChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
+ void createChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
/**
* Deletes the change collection for the specified tenant, if it already exist.
*
* TODO: SERVER-65950 make tenantId field mandatory.
*/
- Status dropChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
+ void dropChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
/**
* Inserts documents to change collections. The parameter 'oplogRecords' is a vector of oplog
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
index afae70d7da4..a83aa9645f6 100644
--- a/src/mongo/db/change_stream_pre_images_collection_manager.cpp
+++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
@@ -172,6 +172,13 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op
uassertStatusOK(insertionStatus);
}
+bool ChangeStreamPreImagesCollectionManager::hasPreImagesCollection(
+ OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ auto catalog = CollectionCatalog::get(opCtx);
+ return static_cast<bool>(catalog->lookupCollectionByNamespace(
+ opCtx, NamespaceString::makePreImageCollectionNSS(tenantId)));
+}
+
namespace {
RecordId toRecordId(ChangeStreamPreImageId id) {
return record_id_helpers::keyForElem(
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.h b/src/mongo/db/change_stream_pre_images_collection_manager.h
index 75efb28c22d..dede0e38c96 100644
--- a/src/mongo/db/change_stream_pre_images_collection_manager.h
+++ b/src/mongo/db/change_stream_pre_images_collection_manager.h
@@ -89,6 +89,13 @@ public:
const ChangeStreamPreImage& preImage);
/**
+ * Returns true if the pre-images collection exists, false otherwise. If 'tenantId' is provided
+ * then the pre-images collection associated with that tenant will be checked for existence,
+ * otherwise the default pre-images collection will be checked.
+ */
+ static bool hasPreImagesCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
+
+ /**
* Scans the system pre-images collection and deletes the expired pre-images from it.
*/
static void performExpiredChangeStreamPreImagesRemovalPass(Client* client);
diff --git a/src/mongo/db/change_stream_state.idl b/src/mongo/db/change_stream_state.idl
new file mode 100644
index 00000000000..cec05b86bc1
--- /dev/null
+++ b/src/mongo/db/change_stream_state.idl
@@ -0,0 +1,62 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+structs:
+ ChangeStreamStateParameters:
+ description: "The parameters associated with 'setChangeStreamState' and 'getChangeStreamState' commands."
+ fields:
+ enabled:
+ description: "Represents the state of the change stream of a tenant. If true, then
+ the change stream should be enabled, false otherwise. The corresponding
+ value is also returned by the 'getChangeStreamState' command"
+ type: bool
+
+commands:
+ setChangeStreamState:
+ description: "The command to set the state of the change stream in the serverless for a
+ particular tenant."
+ command_name: setChangeStreamState
+ cpp_name: SetChangeStreamStateCommandRequest
+ api_version: ""
+ namespace: ignored
+ chained_structs:
+ ChangeStreamStateParameters: ChangeStreamStateParameters
+ getChangeStreamState:
+ description: "The command to get the state of the change stream in the serverless for a
+ particular tenant."
+ command_name: getChangeStreamState
+ cpp_name: GetChangeStreamStateCommandRequest
+ api_version: ""
+ namespace: ignored
+ reply_type: ChangeStreamStateParameters \ No newline at end of file
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index c995e5eccbf..e9772f9b61f 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -372,6 +372,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/index_key_validate',
'$BUILD_DIR/mongo/db/catalog/multi_index_block',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
'$BUILD_DIR/mongo/db/command_can_run_here',
'$BUILD_DIR/mongo/db/commands',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
@@ -513,6 +514,7 @@ env.Library(
"apply_ops_cmd.cpp",
"collection_to_capped.cpp",
"compact.cpp",
+ "change_stream_state_command.cpp",
"cpuload.cpp",
"dbcheck.cpp",
"dbcommands_d.cpp",
@@ -565,6 +567,7 @@ env.Library(
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/change_stream_options_manager',
'$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_state',
'$BUILD_DIR/mongo/db/cluster_transaction_api',
'$BUILD_DIR/mongo/db/commands',
'$BUILD_DIR/mongo/db/curop_failpoint_helpers',
@@ -592,6 +595,7 @@ env.Library(
'$BUILD_DIR/mongo/db/s/user_writes_recoverable_critical_section',
'$BUILD_DIR/mongo/db/server_options_core',
'$BUILD_DIR/mongo/db/serverless/shard_split_donor_service',
+ '$BUILD_DIR/mongo/db/set_change_stream_state_coordinator',
'$BUILD_DIR/mongo/db/tenant_id',
'$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util',
'$BUILD_DIR/mongo/db/transaction/transaction_api',
diff --git a/src/mongo/db/commands/change_stream_state_command.cpp b/src/mongo/db/commands/change_stream_state_command.cpp
new file mode 100644
index 00000000000..de8d98083ea
--- /dev/null
+++ b/src/mongo/db/commands/change_stream_state_command.cpp
@@ -0,0 +1,179 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_state_gen.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/set_change_stream_state_coordinator.h"
+#include "mongo/logv2/log.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
+
+namespace mongo {
+
+namespace {
+
+/**
+ * The command that should run in the replica-set to set the state of the change stream in the
+ * serverless.
+ */
+class SetChangeStreamStateCommand final : public TypedCommand<SetChangeStreamStateCommand> {
+public:
+ using Request = SetChangeStreamStateCommandRequest;
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ std::string help() const override {
+ return "Sets the change stream state in the serverless\n"
+ "Usage:\n"
+ " {setChangeStreamState: 1, enabled: <true|false>}\n"
+ "Fields:\n"
+ " enabled: enable or disable the change stream";
+ }
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ uassert(ErrorCodes::CommandNotSupported,
+ str::stream() << SetChangeStreamStateCommandRequest::kCommandName
+ << " is only supported in the serverless",
+ ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive());
+
+ // TODO SERVER-65950 use provided '$tenant' only and add 'uassert that tenant must be
+ // present. Remove 'getDollarTenant()' and fetch tenant from dbName().
+ const std::string tenantId = request().getDollarTenant()
+ ? request().getDollarTenant()->toString()
+ : TenantId::kSystemTenantId.toString();
+
+ // Prepare the payload for the 'SetChangeStreamStateCoordinator'.
+ SetChangeStreamStateCoordinatorId coordinatorId;
+ coordinatorId.setTenantId({TenantId{OID(tenantId)}});
+ SetChangeStreamStateCoordinatorDocument coordinatorDoc{
+ coordinatorId, request().getChangeStreamStateParameters().toBSON()};
+
+ // Dispatch the request to the 'SetChangeStreamStateCoordinatorService'.
+ const auto service = SetChangeStreamStateCoordinatorService::getService(opCtx);
+ const auto instance = service->getOrCreateInstance(opCtx, coordinatorDoc.toBSON());
+
+ const auto coordinatorCompletionFuture = instance->getCompletionFuture();
+ coordinatorCompletionFuture.get(opCtx);
+ }
+
+ private:
+ bool supportsWriteConcern() const override {
+ return false;
+ }
+
+ NamespaceString ns() const override {
+ return NamespaceString();
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(),
+ ActionType::setChangeStreamState}));
+ }
+ };
+} setChangeStreamStateCommand;
+
+class GetChangeStreamStateCommand final : public TypedCommand<GetChangeStreamStateCommand> {
+public:
+ using Request = GetChangeStreamStateCommandRequest;
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ std::string help() const override {
+ return "Gets the change stream state in the serverless\n"
+ "Usage:\n"
+ " {getChangeStreamState: 1}";
+ }
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ auto typedRun(OperationContext* opCtx) {
+ uassert(ErrorCodes::CommandNotSupported,
+ str::stream() << GetChangeStreamStateCommandRequest::kCommandName
+ << " is only supported in the serverless",
+ ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive());
+
+ // TODO SERVER-65950 use provided '$tenant' only and add 'uassert that tenant must be
+ // present.
+ boost::optional<TenantId> tenantId = boost::none;
+
+ // Set the change stream enablement state in the 'reply' object.
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ GetChangeStreamStateCommandRequest::Reply reply{
+ changeCollectionManager.isChangeStreamEnabled(opCtx, tenantId)};
+
+ return reply;
+ }
+
+ private:
+ bool supportsWriteConcern() const override {
+ return false;
+ }
+
+ NamespaceString ns() const override {
+ return NamespaceString();
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(),
+ ActionType::getChangeStreamState}));
+ }
+ };
+} getChangeStreamStateCommand;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 36a23574a71..01ed5a70a0a 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/db/commands/cqf/cqf_aggregate.h"
#include "mongo/db/commands/cqf/cqf_command_utils.h"
#include "mongo/db/curop.h"
@@ -752,14 +753,12 @@ Status runAggregate(OperationContext* opCtx,
// Replace the execution namespace with the oplog.
nss = NamespaceString::kRsOplogNamespace;
- // In case of serverless the change stream will be opened on the change collection. We
- // should first check if the change collection for the particular tenant exists and then
- // replace the namespace with the change collection.
+ // In case of serverless the change stream will be opened on the change collection.
if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
uassert(ErrorCodes::ChangeStreamNotEnabled,
"Change streams must be enabled before being used.",
- changeCollectionManager.hasChangeCollection(opCtx, origNss.tenantId()));
+ changeCollectionManager.isChangeStreamEnabled(opCtx, origNss.tenantId()));
nss = NamespaceString::makeChangeCollectionNSS(origNss.tenantId());
}
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index edea7336f3e..b9910a7a6c8 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -302,7 +302,10 @@ void CollectionScan::assertTsHasNotFallenOff(const Record& record) {
const bool tsHasNotFallenOff = oplogEntry.getTimestamp() <= *_params.assertTsHasNotFallenOff;
uassert(ErrorCodes::OplogQueryMinTsMissing,
- "Specified timestamp has already fallen off the oplog",
+ str::stream()
+ << "Specified timestamp has already fallen off the oplog for the input timestamp: "
+ << *_params.assertTsHasNotFallenOff
+ << ", oplog entry: " << oplogEntry.getEntry().toString(),
isNewRS || tsHasNotFallenOff);
// We don't need to check this assertion again after we've confirmed the first oplog event.
_params.assertTsHasNotFallenOff = boost::none;
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index eb212db69e0..80fe723c81e 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -161,6 +161,7 @@
#include "mongo/db/service_entry_point_mongod.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/session_killer.h"
+#include "mongo/db/set_change_stream_state_coordinator.h"
#include "mongo/db/startup_recovery.h"
#include "mongo/db/startup_warnings_mongod.h"
#include "mongo/db/stats/counters.h"
@@ -349,6 +350,9 @@ void registerPrimaryOnlyServices(ServiceContext* serviceContext) {
}
}
+ // TODO SERVER-65950 create 'SetChangeStreamStateCoordinatorService' only in the serverless.
+ services.push_back(std::make_unique<SetChangeStreamStateCoordinatorService>(serviceContext));
+
for (auto& service : services) {
registry->registerService(std::move(service));
}
@@ -1549,6 +1553,7 @@ int mongod_main(int argc, char* argv[]) {
ReadWriteConcernDefaults::create(service, readWriteConcernDefaultsCacheLookupMongoD);
ChangeStreamOptionsManager::create(service);
+ // TODO SERVER-65950 create 'ChangeStreamChangeCollectionManager' only in the serverless.
ChangeStreamChangeCollectionManager::create(service);
#if defined(_WIN32)
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 639694dc4af..9a79fc2f4ca 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -182,6 +182,9 @@ const NamespaceString NamespaceString::kShardCollectionCatalogNamespace(Namespac
const NamespaceString NamespaceString::kLockpingsNamespace(NamespaceString::kConfigDb, "lockpings");
const NamespaceString NamespaceString::kDistLocksNamepsace(NamespaceString::kConfigDb, "locks");
+const NamespaceString NamespaceString::kSetChangeStreamStateCoordinatorNamespace(
+ NamespaceString::kConfigDb, "change_stream_coordinator");
+
NamespaceString NamespaceString::parseFromStringExpectTenantIdInMultitenancyMode(StringData ns) {
if (!gMultitenancySupport) {
return NamespaceString(ns, boost::none);
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 3c54837625e..18cf434bd86 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -245,6 +245,9 @@ public:
// TODO SERVER-68551: remove once 7.0 becomes last-lts
static const NamespaceString kDistLocksNamepsace;
+ // Namespace used to store the state document of 'SetChangeStreamStateCoordinator'.
+ static const NamespaceString kSetChangeStreamStateCoordinatorNamespace;
+
/**
* Constructs an empty NamespaceString.
*/
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
index d95ff78f58e..ee473aada28 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
@@ -33,8 +33,12 @@
#include "mongo/db/pipeline/document_source_change_stream_check_resumability.h"
#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/logv2/log.h"
using boost::intrusive_ptr;
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
namespace mongo {
namespace {
@@ -162,7 +166,10 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckResumability::doGet
auto nextInput = [this]() {
try {
return pSource->getNext();
- } catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>&) {
+ } catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>& ex) {
+ LOGV2_ERROR(6663107,
+ "Resume of change stream was not possible",
+ "reason"_attr = ex.reason());
uasserted(ErrorCodes::ChangeStreamHistoryLost,
"Resume of change stream was not possible, as the resume point may no "
"longer be in the oplog.");
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 9c63c0a8151..40dcaa49c6e 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1669,6 +1669,7 @@ if wiredtiger:
'oplog_test.cpp',
'optime_extract_test.cpp',
'primary_only_service_test.cpp',
+ 'primary_only_service_util_test.cpp',
'read_concern_args_test.cpp',
'repl_set_config_checks_test.cpp',
'repl_set_config_test.cpp',
@@ -1965,6 +1966,7 @@ env.Library(
target='primary_only_service',
source=[
'primary_only_service.cpp',
+ 'primary_only_service_util.cpp',
'primary_only_service_op_observer.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/db/repl/primary_only_service_util.cpp b/src/mongo/db/repl/primary_only_service_util.cpp
new file mode 100644
index 00000000000..d845ac2a262
--- /dev/null
+++ b/src/mongo/db/repl/primary_only_service_util.cpp
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/primary_only_service_util.h"
+
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/future_util.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
+
+namespace mongo {
+
+namespace {
+
+// Set the exponential delay after evaluating the condition 'until' within the 'run' method.
+const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
+
+} // namespace
+
+DefaultPrimaryOnlyServiceInstance::~DefaultPrimaryOnlyServiceInstance() {
+ invariant(_completionPromise.getFuture().isReady());
+}
+
+SharedSemiFuture<void> DefaultPrimaryOnlyServiceInstance::getCompletionFuture() {
+ return _completionPromise.getFuture();
+}
+
+void DefaultPrimaryOnlyServiceInstance::interrupt(Status status) noexcept {
+ LOGV2_DEBUG(6663100,
+ 1,
+ "DefaultPrimaryOnlyServiceInstance interrupted",
+ "instance"_attr = getInstanceName(),
+ "reason"_attr = redact(status));
+
+ // Resolve any unresolved promises to avoid hanging.
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_completionPromise.getFuture().isReady()) {
+ _completionPromise.setError(status);
+ }
+}
+
+SemiFuture<void> DefaultPrimaryOnlyServiceInstance::run(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept {
+ return ExecutorFuture<void>(**executor)
+ .then([this, executor, token, anchor = shared_from_this()] {
+ return AsyncTry([this, executor, token] { return _runImpl(executor, token); })
+ .until([this, token](Status status) { return status.isOK() || token.isCanceled(); })
+ .withBackoffBetweenIterations(kExponentialBackoff)
+ .on(**executor, CancellationToken::uncancelable());
+ })
+ .onCompletion([this, executor, token, anchor = shared_from_this()](const Status& status) {
+ if (!status.isOK()) {
+ LOGV2_ERROR(6663101,
+ "Error executing DefaultPrimaryOnlyServiceInstance",
+ "instance"_attr = getInstanceName(),
+ "error"_attr = redact(status));
+
+ // Nothing else to do, the _completionPromise will be cancelled once the coordinator
+ // is interrupted, because the only reasons to stop forward progress in this node is
+ // because of a stepdown happened or the coordinator was canceled.
+ dassert((token.isCanceled() && status.isA<ErrorCategory::CancellationError>()) ||
+ status.isA<ErrorCategory::NotPrimaryError>());
+ return status;
+ }
+
+ try {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+
+ _removeStateDocument(opCtx);
+ } catch (DBException& ex) {
+ LOGV2_WARNING(
+ 6663103,
+ "Failed to remove state document from DefaultPrimaryOnlyServiceInstance",
+ "instance"_attr = getInstanceName(),
+ "error"_attr = redact(ex));
+ ex.addContext(str::stream()
+ << "Failed to remove state document from " << getInstanceName());
+
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_completionPromise.getFuture().isReady()) {
+ _completionPromise.setError(ex.toStatus());
+ }
+
+ throw;
+ }
+
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_completionPromise.getFuture().isReady()) {
+ _completionPromise.emplaceValue();
+ }
+
+ return status;
+ })
+ .semi();
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/repl/primary_only_service_util.h b/src/mongo/db/repl/primary_only_service_util.h
new file mode 100644
index 00000000000..64aea0ac8d2
--- /dev/null
+++ b/src/mongo/db/repl/primary_only_service_util.h
@@ -0,0 +1,146 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/repl/primary_only_service.h"
+
+namespace mongo {
+
+/**
+ * A helper to CRUD the state document for the 'PrimaryOnlyService' using the provided namespace
+ * 'stateDocumentNs'.
+ */
+template <typename StateDocumentType>
+class PrimaryOnlyServiceStateStore {
+public:
+ explicit PrimaryOnlyServiceStateStore(const NamespaceString& stateDocumentNs)
+ : _stateDocumentNs{stateDocumentNs} {}
+
+ /**
+ * Stores the state document 'addDoc' on the provided namespace.
+ */
+ void add(
+ OperationContext* opCtx,
+ const StateDocumentType& addDoc,
+ const WriteConcernOptions& writeConcern = WriteConcerns::kMajorityWriteConcernNoTimeout) {
+ PersistentTaskStore<StateDocumentType> store(_stateDocumentNs);
+ store.add(opCtx, addDoc, writeConcern);
+ }
+
+ /**
+ * Removes the state document matching the criteria stated in the 'removeDoc'.
+ */
+ void remove(
+ OperationContext* opCtx,
+ const BSONObj& removeDoc,
+ const WriteConcernOptions& writeConcern = WriteConcerns::kMajorityWriteConcernNoTimeout) {
+ PersistentTaskStore<StateDocumentType> store(_stateDocumentNs);
+ store.remove(opCtx, removeDoc, writeConcern);
+ }
+
+ /**
+ * Updates the state document 'updateDoc' using the provided filter criteria 'filter'.
+ */
+ void update(
+ OperationContext* opCtx,
+ const BSONObj& filter,
+ const BSONObj& updateDoc,
+ const WriteConcernOptions& writeConcern = WriteConcerns::kMajorityWriteConcernNoTimeout) {
+ PersistentTaskStore<StateDocumentType> store(_stateDocumentNs);
+ store.update(opCtx, filter, updateDoc, writeConcern);
+ }
+
+ /**
+ * Gets the count of the documents matching the 'filter' criteria.
+ */
+ size_t count(OperationContext* opCtx, const BSONObj& filter = BSONObj{}) {
+ PersistentTaskStore<StateDocumentType> store(_stateDocumentNs);
+ return store.count(opCtx, filter);
+ }
+
+private:
+ // Namespace where the state document will be stored.
+ const NamespaceString _stateDocumentNs;
+};
+
+/**
+ * A helper that provides a default implementation for the set of methods to create the
+ * 'PrimaryOnlyService::Instance'.
+ */
+class DefaultPrimaryOnlyServiceInstance
+ : public repl::PrimaryOnlyService::TypedInstance<DefaultPrimaryOnlyServiceInstance> {
+public:
+ ~DefaultPrimaryOnlyServiceInstance();
+
+ /**
+ * The name of the 'PrimaryOnlyService::Instance'.
+ */
+ virtual const StringData getInstanceName() = 0;
+
+ /**
+ * Interrupts the current running 'DefaultPrimaryOnlyServiceInstance' instance.
+ */
+ void interrupt(Status status) noexcept override;
+
+ /**
+ * Returns the completion state of the running instance.
+ */
+ SharedSemiFuture<void> getCompletionFuture();
+
+private:
+ /**
+ * Invoked by the 'PrimaryOnlyService' to run the 'DefaultPrimaryOnlyServiceInstance' instance
+ * type.
+ */
+ SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept override;
+
+ /**
+ * The derived class must override this method to write the custom logic. This method is invoked
+ * within the context of the 'run' method.
+ */
+ virtual ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept = 0;
+
+ /**
+ * Removes the state document after the 'DefaultPrimaryOnlyServiceInstance' has been completed.
+ * It is the responsibility of the derived class to store the state document.
+ */
+ virtual void _removeStateDocument(OperationContext* opCtx) = 0;
+
+ // Guards the access of the '_completionPromise'.
+ Mutex _mutex = MONGO_MAKE_LATCH("DefaultPrimaryOnlyServiceInstance::_mutex");
+
+ // Tracks the completion state of the instance.
+ SharedPromise<void> _completionPromise;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/repl/primary_only_service_util_test.cpp b/src/mongo/db/repl/primary_only_service_util_test.cpp
new file mode 100644
index 00000000000..0fc059d3d5a
--- /dev/null
+++ b/src/mongo/db/repl/primary_only_service_util_test.cpp
@@ -0,0 +1,211 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/base/string_data.h"
+#include "mongo/db/repl/primary_only_service_test_fixture.h"
+#include "mongo/db/repl/primary_only_service_util.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/logv2/log.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/fail_point.h"
+
+using namespace mongo;
+using namespace mongo::repl;
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+namespace {
+constexpr auto kTestPrimaryOnlyServiceName = "TestService"_sd;
+constexpr auto kTestPrimaryOnlyServiceInstanceName = "TestServiceInstance"_sd;
+constexpr auto kTestPrimaryOnlyServiceStateDocumentNss = "config.test_primary_only_service"_sd;
+
+// Hangs the 'TestDefaultPrimaryOnlyServiceInstance' after inserting the state document.
+MONGO_FAIL_POINT_DEFINE(HangTestPrimaryOnlyServiceAfterStateDocInsertion);
+
+/**
+ * Represents the state document class for the 'TestDefaultPrimaryOnlyService'.
+ */
+class TestStateDocument {
+public:
+ void setState(int state) {
+ _state = state;
+ }
+
+ const BSONObj toBSON() const {
+ return BSON("_id" << _state);
+ }
+
+private:
+ int _state = 0;
+};
+
+// Create a global instance of the 'PrimaryOnlyServiceStateStore' to store the state document.
+PrimaryOnlyServiceStateStore<TestStateDocument> gStateDocStore{
+ NamespaceString{kTestPrimaryOnlyServiceStateDocumentNss}};
+
+/**
+ * Test class for the 'DefaultPrimaryOnlyServiceInstance'.
+ */
+class TestDefaultPrimaryOnlyServiceInstance : public DefaultPrimaryOnlyServiceInstance {
+public:
+ const StringData getInstanceName() final {
+ return kTestPrimaryOnlyServiceInstanceName;
+ }
+
+ boost::optional<BSONObj> reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode connMode,
+ MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept final {
+ return boost::none;
+ };
+
+ void checkIfOptionsConflict(const BSONObj& stateDoc) const final{};
+
+private:
+ ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept final {
+
+ return ExecutorFuture<void>(**executor).then([this, anchor = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+
+ TestStateDocument testStateDoc;
+ testStateDoc.setState(1);
+ gStateDocStore.add(opCtx, testStateDoc);
+
+ if (MONGO_unlikely(HangTestPrimaryOnlyServiceAfterStateDocInsertion.shouldFail())) {
+ HangTestPrimaryOnlyServiceAfterStateDocInsertion.pauseWhileSet();
+ }
+ });
+ };
+
+ void _removeStateDocument(OperationContext* opCtx) final {
+ gStateDocStore.remove(opCtx, BSON("_id" << 1));
+ };
+};
+
+/**
+ * Test class for the 'PrimaryOnlyService'.
+ */
+class TestDefaultPrimaryOnlyService final : public repl::PrimaryOnlyService {
+public:
+ explicit TestDefaultPrimaryOnlyService(ServiceContext* serviceContext)
+ : repl::PrimaryOnlyService(serviceContext) {}
+
+ ~TestDefaultPrimaryOnlyService() = default;
+
+ StringData getServiceName() const final {
+ return kTestPrimaryOnlyServiceName;
+ }
+
+ NamespaceString getStateDocumentsNS() const final {
+ return NamespaceString(kTestPrimaryOnlyServiceStateDocumentNss);
+ }
+
+ ThreadPool::Limits getThreadPoolLimits() const final {
+ return ThreadPool::Limits();
+ }
+
+ static TestDefaultPrimaryOnlyService* getService(OperationContext* opCtx) {
+ auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
+ auto service = registry->lookupServiceByName(kTestPrimaryOnlyServiceInstanceName);
+ return checked_cast<TestDefaultPrimaryOnlyService*>(std::move(service));
+ }
+
+ void checkIfConflictsWithOtherInstances(
+ OperationContext* opCtx,
+ BSONObj initialState,
+ const std::vector<const PrimaryOnlyService::Instance*>& existingInstances) final{};
+
+ std::shared_ptr<repl::PrimaryOnlyService::Instance> constructInstance(
+ BSONObj initialState) final {
+ return std::make_shared<TestDefaultPrimaryOnlyServiceInstance>();
+ }
+};
+
+} // namespace
+
+class DefaultPrimaryOnlyServiceInstanceTest : public PrimaryOnlyServiceMongoDTest {
+public:
+ std::unique_ptr<repl::PrimaryOnlyService> makeService(ServiceContext* serviceContext) override {
+ return std::make_unique<TestDefaultPrimaryOnlyService>(serviceContext);
+ }
+
+ void setUp() override {
+ PrimaryOnlyServiceMongoDTest::setUp();
+
+ _service = _registry->lookupServiceByName(kTestPrimaryOnlyServiceName);
+ ASSERT(_service);
+ }
+
+ void tearDown() override {
+ // Ensure that even on test failures all failpoint state gets reset.
+ globalFailPointRegistry().disableAllFailpoints();
+
+ WaitForMajorityService::get(getServiceContext()).shutDown();
+
+ _registry->onShutdown();
+ _service = nullptr;
+
+ ServiceContextMongoDTest::tearDown();
+ }
+
+ void stepUp() {
+ auto opCtx = cc().makeOperationContext();
+ PrimaryOnlyServiceMongoDTest::stepUp(opCtx.get());
+ }
+};
+
+TEST_F(DefaultPrimaryOnlyServiceInstanceTest, VerifyTaskExecuted) {
+ // Initialize the fail point to hang the 'TestDefaultPrimaryOnlyServiceInstance' at the
+ // '_runImpl' method.
+ auto timesEntered =
+ HangTestPrimaryOnlyServiceAfterStateDocInsertion.setMode(FailPoint::alwaysOn);
+
+ auto opCtx = makeOperationContext();
+
+ // Create an instance of the 'TestDefaultPrimaryOnlyServiceInstance'.
+ auto instance = TestDefaultPrimaryOnlyServiceInstance::getOrCreate(
+ opCtx.get(), _service, BSON("_id" << 0), true);
+ ASSERT(instance.get());
+
+ // Wait until the fail point is hit.
+ HangTestPrimaryOnlyServiceAfterStateDocInsertion.waitForTimesEntered(timesEntered + 1);
+
+ // Verify that the state document has been inserted. This verifies that the method
+ // 'TestDefaultPrimaryOnlyServiceInstance::_runImpl' has been executed.
+ ASSERT_EQ(gStateDocStore.count(opCtx.get(), BSON("_id" << 1)), 1);
+ HangTestPrimaryOnlyServiceAfterStateDocInsertion.setMode(FailPoint::off);
+
+ // Wait for the instance to complete.
+ instance->getCompletionFuture().get();
+
+ // Verify that the state document has now been removed. This verifies that the method
+ // 'DefaultPrimaryOnlyServiceInstanceTest::_removeStateDocument' has been executed.
+ ASSERT_EQ(gStateDocStore.count(opCtx.get(), BSON("_id" << 1)), 0);
+}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index df8655044d4..0039845afc7 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -544,18 +544,10 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
}
});
- // Create the pre-images collection if it doesn't exist yet.
- ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx,
- boost::none /* tenantId */);
-
- // TODO: SERVER-66631 move the change collection creation logic from here to the PM-2502 hooks.
- // The change collection will be created when the change stream is enabled.
- if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
- auto status = ChangeStreamChangeCollectionManager::get(opCtx).createChangeCollection(
- opCtx, boost::none);
- if (!status.isOK()) {
- fassert(6520900, status);
- }
+ // Create the pre-images collection if it doesn't exist yet in the non-serverless environment.
+ if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ ChangeStreamPreImagesCollectionManager::createPreImagesCollection(
+ opCtx, boost::none /* tenantId */);
}
serverGlobalParams.validateFeaturesAsPrimary.store(true);
diff --git a/src/mongo/db/set_change_stream_state_coordinator.cpp b/src/mongo/db/set_change_stream_state_coordinator.cpp
new file mode 100644
index 00000000000..a172b2e06fc
--- /dev/null
+++ b/src/mongo/db/set_change_stream_state_coordinator.cpp
@@ -0,0 +1,232 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/set_change_stream_state_coordinator.h"
+
+#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_state_gen.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/logv2/log.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+namespace mongo {
+
+// Hangs the 'SetChangeStreamStateCoordinator' before calling the
+// 'ChangeStreamStateCommandProcessor'.
+MONGO_FAIL_POINT_DEFINE(hangSetChangeStreamStateCoordinatorBeforeCommandProcessor);
+namespace {
+
+constexpr auto kSetChangeStreamStateCoordinatorServiceName =
+ "setChangeStreamStateCoordinatorService"_sd;
+constexpr auto kSetChangeStreamStateCoordinatorName = "setChangeStreamStateCoordinator"_sd;
+
+/**
+ * Waits until the oplog entry has been majority committed.
+ */
+void waitForMajority(OperationContext* opCtx) {
+ const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ const auto lastLocalOpTime = replCoord->getMyLastAppliedOpTime();
+ WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(lastLocalOpTime, opCtx->getCancellationToken())
+ .get(opCtx);
+}
+
+/**
+ * A helper that inspect the the 'SetChangeStreamStateCoordinatorDocument' state document and
+ * processes the provided change stream command accordingly.
+ */
+class ChangeStreamStateCommandProcessor {
+public:
+ explicit ChangeStreamStateCommandProcessor(SetChangeStreamStateCoordinatorDocument stateDoc)
+ : _stateDoc(stateDoc) {}
+
+ /**
+ * Retrieves the tenant id and processes the change stream command.
+ */
+ void process(OperationContext* opCtx) {
+ const auto setChangeStreamParameter = ChangeStreamStateParameters::parse(
+ IDLParserContext("ChangeStreamStateParameters"), _stateDoc.getCommand());
+
+ invariant(_stateDoc.getId().getTenantId());
+
+ // TODO SERVER-65950 replace 'tenantId' with the provided tenant id.
+ auto tenantId = boost::none;
+
+ if (setChangeStreamParameter.getEnabled()) {
+ _enableChangeStream(opCtx, tenantId);
+ } else {
+ _disableChangeStream(opCtx, tenantId);
+ }
+ }
+
+private:
+ /**
+ * Enables the change stream in the serverless by creating the change collection and the
+ * pre-image collection.
+ */
+ void _enableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ changeCollectionManager.createChangeCollection(opCtx, tenantId);
+
+ ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, tenantId);
+
+ // Wait until the create requests are majority committed.
+ waitForMajority(opCtx);
+ }
+
+ /**
+ * Disables the change stream in the serverless by dropping the change collection and the
+ * pre-image collection.
+ */
+ void _disableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ changeCollectionManager.dropChangeCollection(opCtx, tenantId);
+
+ ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, tenantId);
+
+ // Wait until the drop requests are majority committed.
+ waitForMajority(opCtx);
+ }
+
+ const SetChangeStreamStateCoordinatorDocument _stateDoc;
+};
+
+} // namespace
+
+const StringData SetChangeStreamStateCoordinator::getInstanceName() {
+ return kSetChangeStreamStateCoordinatorName;
+}
+
+SetChangeStreamStateCoordinator::SetChangeStreamStateCoordinator(const BSONObj& stateDoc)
+ : _stateDoc{SetChangeStreamStateCoordinatorDocument::parse(
+ IDLParserContext("SetChangeStreamStateCoordinatorDocument"), stateDoc)},
+ _stateDocStore{NamespaceString::kSetChangeStreamStateCoordinatorNamespace} {}
+
+boost::optional<BSONObj> SetChangeStreamStateCoordinator::reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode connMode,
+ MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept {
+ // Tenant id must always be present.
+ invariant(_stateDoc.getId().getTenantId());
+
+ BSONObjBuilder bob;
+ bob.append("tenantId", _stateDoc.getId().getTenantId()->toString());
+ bob.append("command", _stateDoc.getCommand());
+ return bob.obj();
+}
+
+void SetChangeStreamStateCoordinator::checkIfOptionsConflict(const BSONObj& otherDocBSON) const {
+ const auto otherDoc = SetChangeStreamStateCoordinatorDocument::parse(
+ IDLParserContext("SetChangeStreamStateCoordinatorDocument"), otherDocBSON);
+
+ // The '_id' field of the state document corresponds to the tenant id and hence if we are here
+ // then the current and the new request belongs to the same tenant. Reject the new request if it
+ // is not identical to the previous one.
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Another conflicting request is in progress",
+ SimpleBSONObjComparator::kInstance.evaluate(_stateDoc.toBSON() == otherDoc.toBSON()));
+}
+
+ExecutorFuture<void> SetChangeStreamStateCoordinator::_runImpl(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept {
+
+ return ExecutorFuture<void>(**executor).then([this, anchor = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+
+ try {
+ _stateDocStore.add(opCtx, _stateDoc);
+ } catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) {
+ // A series of retries, step-up and step-down events can cause a node to try and insert
+ // the document when it has already been persisted locally, but we must still wait for
+ // majority commit.
+ waitForMajority(opCtx);
+ }
+
+ hangSetChangeStreamStateCoordinatorBeforeCommandProcessor.pauseWhileSet(
+ Interruptible::notInterruptible());
+
+ // Dispatch the state document to be processed.
+ ChangeStreamStateCommandProcessor commandProcessor(_stateDoc);
+ commandProcessor.process(opCtx);
+ });
+}
+
+void SetChangeStreamStateCoordinator::_removeStateDocument(OperationContext* opCtx) {
+ _stateDocStore.remove(
+ opCtx,
+ BSON(SetChangeStreamStateCoordinatorDocument::kIdFieldName << _stateDoc.getId().toBSON()));
+}
+
+StringData SetChangeStreamStateCoordinatorService::getServiceName() const {
+ return kSetChangeStreamStateCoordinatorServiceName;
+}
+
+NamespaceString SetChangeStreamStateCoordinatorService::getStateDocumentsNS() const {
+ return NamespaceString::kSetChangeStreamStateCoordinatorNamespace;
+}
+
+ThreadPool::Limits SetChangeStreamStateCoordinatorService::getThreadPoolLimits() const {
+ return ThreadPool::Limits();
+}
+
+SetChangeStreamStateCoordinatorService* SetChangeStreamStateCoordinatorService::getService(
+ OperationContext* opCtx) {
+ auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
+ auto service = registry->lookupServiceByName(kSetChangeStreamStateCoordinatorServiceName);
+ return checked_cast<SetChangeStreamStateCoordinatorService*>(std::move(service));
+}
+
+std::shared_ptr<repl::PrimaryOnlyService::Instance>
+SetChangeStreamStateCoordinatorService::constructInstance(BSONObj stateDoc) {
+ return std::make_shared<SetChangeStreamStateCoordinator>(std::move(stateDoc));
+}
+
+std::shared_ptr<SetChangeStreamStateCoordinator>
+SetChangeStreamStateCoordinatorService::getOrCreateInstance(OperationContext* opCtx,
+ BSONObj coorDoc) {
+ return [&] {
+ try {
+ auto [coordinator, _] = PrimaryOnlyService::getOrCreateInstance(opCtx, coorDoc, true);
+ return checked_pointer_cast<SetChangeStreamStateCoordinator>(std::move(coordinator));
+ } catch (const DBException& ex) {
+ LOGV2_ERROR(6663106,
+ "Failed to create coordinator instance",
+ "service"_attr = kSetChangeStreamStateCoordinatorServiceName,
+ "reason"_attr = ex);
+ throw;
+ }
+ }();
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/set_change_stream_state_coordinator.h b/src/mongo/db/set_change_stream_state_coordinator.h
new file mode 100644
index 00000000000..aaa4d4305d6
--- /dev/null
+++ b/src/mongo/db/set_change_stream_state_coordinator.h
@@ -0,0 +1,101 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/primary_only_service_util.h"
+#include "mongo/db/set_change_stream_state_coordinator_gen.h"
+
+namespace mongo {
+
+/**
+ * A 'PrimaryOnlyService::Instance' that runs in the replica sets and orchestrates the change stream
+ * requests in the serverless.
+ *
+ * At any time only one request is accepted, any subsequent request will be rejected by this
+ * instance type.
+ */
+class SetChangeStreamStateCoordinator : public DefaultPrimaryOnlyServiceInstance {
+public:
+ explicit SetChangeStreamStateCoordinator(const BSONObj& stateDoc);
+
+ const StringData getInstanceName() final;
+
+ boost::optional<BSONObj> reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode connMode,
+ MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept final;
+
+ void checkIfOptionsConflict(const BSONObj& stateDoc) const final;
+
+private:
+ ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept;
+
+ void _removeStateDocument(OperationContext* opCtx) final;
+
+ // The state document of the 'PrimaryOnlyService'.
+ SetChangeStreamStateCoordinatorDocument _stateDoc;
+
+ // Stores the state document durably in the namespace 'config.change_stream_coordinator'.
+ PrimaryOnlyServiceStateStore<SetChangeStreamStateCoordinatorDocument> _stateDocStore;
+};
+
+/**
+ * A 'PrimaryOnlyService' that creates and gets an instance of the
+ * 'SetChangeStreamStateCoordinator'.
+ */
+class SetChangeStreamStateCoordinatorService final : public repl::PrimaryOnlyService {
+public:
+ explicit SetChangeStreamStateCoordinatorService(ServiceContext* serviceContext)
+ : repl::PrimaryOnlyService(serviceContext) {}
+
+ ~SetChangeStreamStateCoordinatorService() = default;
+
+ StringData getServiceName() const final;
+
+ NamespaceString getStateDocumentsNS() const final;
+
+ ThreadPool::Limits getThreadPoolLimits() const final;
+
+ std::shared_ptr<SetChangeStreamStateCoordinator> getOrCreateInstance(OperationContext* opCtx,
+ BSONObj coorDoc);
+
+ static SetChangeStreamStateCoordinatorService* getService(OperationContext* opCtx);
+
+ void checkIfConflictsWithOtherInstances(
+ OperationContext* opCtx,
+ BSONObj initialState,
+ const std::vector<const PrimaryOnlyService::Instance*>& existingInstances) final{};
+
+ std::shared_ptr<repl::PrimaryOnlyService::Instance> constructInstance(
+ BSONObj initialState) final;
+};
+} // namespace mongo
diff --git a/src/mongo/db/set_change_stream_state_coordinator.idl b/src/mongo/db/set_change_stream_state_coordinator.idl
new file mode 100644
index 00000000000..b3861888cdc
--- /dev/null
+++ b/src/mongo/db/set_change_stream_state_coordinator.idl
@@ -0,0 +1,61 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/db/logical_session_id.idl"
+ - "mongo/idl/basic_types.idl"
+
+structs:
+ SetChangeStreamStateCoordinatorId:
+ description: "A specification that uniquely identifies each state document of the
+ 'SetChangeStreamStateCoordinatorService'."
+ generate_comparison_operators: false
+ strict: false
+ fields:
+ tenantId:
+ description: "Tenant id associated with the state document."
+ type: tenant_id
+ optional: true
+
+ SetChangeStreamStateCoordinatorDocument:
+ description: "A specification that defines the schema of the
+ 'SetChangeStreamStateCoordinatorService' state document."
+ generate_comparison_operators: false
+ strict: false
+ fields:
+ _id:
+ description: "Defines the schema of the '_id' field of the state document."
+ cpp_name: id
+ type: SetChangeStreamStateCoordinatorId
+ command:
+ description: "The command to be executed by the 'SetChangeStreamStateCoordinator'"
+ type: object_owned \ No newline at end of file
diff --git a/src/mongo/platform/mutex.h b/src/mongo/platform/mutex.h
index 4a094048ebe..ed9ddab42ad 100644
--- a/src/mongo/platform/mutex.h
+++ b/src/mongo/platform/mutex.h
@@ -381,7 +381,7 @@ using Mutex = stdx::mutex; // NOLINT
* Construct a mongo::Mutex using the result of MONGO_GET_LATCH_DATA with all arguments forwarded
*/
#ifndef MONGO_CONFIG_USE_RAW_LATCHES
-#define MONGO_MAKE_LATCH(...) ::mongo::Mutex(MONGO_GET_LATCH_DATA(__VA_ARGS__));
+#define MONGO_MAKE_LATCH(...) ::mongo::Mutex(MONGO_GET_LATCH_DATA(__VA_ARGS__))
#else
-#define MONGO_MAKE_LATCH(...) ::mongo::stdx::mutex(); // NOLINT
+#define MONGO_MAKE_LATCH(...) ::mongo::stdx::mutex() // NOLINT
#endif