summaryrefslogtreecommitdiff
path: root/src/mongo/db/set_change_stream_state_coordinator.cpp
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2022-09-15 10:27:24 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-15 11:29:18 +0000
commite6b184b48b2f4ceaff580c98c24e14eac26e2c03 (patch)
tree27410d5d07867ef6be3026cb69a9a9821e03e254 /src/mongo/db/set_change_stream_state_coordinator.cpp
parent0797ff28efcd7cb954b88658425b7b38c980b605 (diff)
downloadmongo-e6b184b48b2f4ceaff580c98c24e14eac26e2c03.tar.gz
SERVER-66641 Introduce multi-tenancy for change collections.
Diffstat (limited to 'src/mongo/db/set_change_stream_state_coordinator.cpp')
-rw-r--r--src/mongo/db/set_change_stream_state_coordinator.cpp50
1 files changed, 40 insertions, 10 deletions
diff --git a/src/mongo/db/set_change_stream_state_coordinator.cpp b/src/mongo/db/set_change_stream_state_coordinator.cpp
index a0ee72fac51..9191769214c 100644
--- a/src/mongo/db/set_change_stream_state_coordinator.cpp
+++ b/src/mongo/db/set_change_stream_state_coordinator.cpp
@@ -34,6 +34,8 @@
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/db/change_stream_state_gen.h"
+#include "mongo/db/concurrency/exception_util.h"
+#include "mongo/db/op_observer/op_observer.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/logv2/log.h"
@@ -77,15 +79,15 @@ public:
const auto setChangeStreamParameter = ChangeStreamStateParameters::parse(
IDLParserContext("ChangeStreamStateParameters"), _stateDoc.getCommand());
- invariant(_stateDoc.getId().getTenantId());
-
- // TODO SERVER-65950 replace 'tenantId' with the provided tenant id.
- auto tenantId = boost::none;
+ // A tenant's change collection and the pre-images collection are always associated with a
+ // tenant id.
+ const auto tenantId = _stateDoc.getId().getTenantId();
+ tassert(6664100, "Tenant id is missing", tenantId);
if (setChangeStreamParameter.getEnabled()) {
- _enableChangeStream(opCtx, tenantId);
+ _enableChangeStream(opCtx, *tenantId);
} else {
- _disableChangeStream(opCtx, tenantId);
+ _disableChangeStream(opCtx, *tenantId);
}
}
@@ -94,11 +96,38 @@ private:
* Enables the change stream in the serverless by creating the change collection and the
* pre-image collection.
*/
- void _enableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ void _enableChangeStream(OperationContext* opCtx, const TenantId& tenantId) {
auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
changeCollectionManager.createChangeCollection(opCtx, tenantId);
- ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, tenantId);
+ // TODO SERVER-66643 Remove this code. A change collection must have atleast one entry for
+ // the change stream to advance. As such artifically create any oplog entry such that it
+ // will be captured by the change collection. With SERVER-66643, the pre-images collection
+ // 'create' oplog entry will be auto captured by the change collection and hence writing
+ // this entry will not be required. Also remove the necessary header and linked library
+ // after removing this code.
+ [&]() {
+ writeConflictRetry(
+ opCtx, "writeNoop", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ Lock::GlobalLock lock(opCtx, MODE_IX);
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage(
+ opCtx,
+ NamespaceString::makeChangeCollectionNSS(tenantId),
+ boost::none,
+ BSON("msg"
+ << "enable change stream"),
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
+ wuow.commit();
+ });
+ }();
+
+ // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection.
+ ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, boost::none);
// Wait until the create requests are majority committed.
waitForMajority(opCtx);
@@ -108,11 +137,12 @@ private:
* Disables the change stream in the serverless by dropping the change collection and the
* pre-image collection.
*/
- void _disableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ void _disableChangeStream(OperationContext* opCtx, const TenantId& tenantId) {
auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
changeCollectionManager.dropChangeCollection(opCtx, tenantId);
- ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, tenantId);
+ // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection.
+ ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, boost::none);
// Wait until the drop requests are majority committed.
waitForMajority(opCtx);