diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2022-09-15 10:27:24 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-15 11:29:18 +0000 |
commit | e6b184b48b2f4ceaff580c98c24e14eac26e2c03 (patch) | |
tree | 27410d5d07867ef6be3026cb69a9a9821e03e254 /src/mongo/db/set_change_stream_state_coordinator.cpp | |
parent | 0797ff28efcd7cb954b88658425b7b38c980b605 (diff) | |
download | mongo-e6b184b48b2f4ceaff580c98c24e14eac26e2c03.tar.gz |
SERVER-66641 Introduce multi-tenancy for change collections.
Diffstat (limited to 'src/mongo/db/set_change_stream_state_coordinator.cpp')
-rw-r--r-- | src/mongo/db/set_change_stream_state_coordinator.cpp | 50 |
1 files changed, 40 insertions, 10 deletions
diff --git a/src/mongo/db/set_change_stream_state_coordinator.cpp b/src/mongo/db/set_change_stream_state_coordinator.cpp index a0ee72fac51..9191769214c 100644 --- a/src/mongo/db/set_change_stream_state_coordinator.cpp +++ b/src/mongo/db/set_change_stream_state_coordinator.cpp @@ -34,6 +34,8 @@ #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" #include "mongo/db/change_stream_state_gen.h" +#include "mongo/db/concurrency/exception_util.h" +#include "mongo/db/op_observer/op_observer.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/logv2/log.h" @@ -77,15 +79,15 @@ public: const auto setChangeStreamParameter = ChangeStreamStateParameters::parse( IDLParserContext("ChangeStreamStateParameters"), _stateDoc.getCommand()); - invariant(_stateDoc.getId().getTenantId()); - - // TODO SERVER-65950 replace 'tenantId' with the provided tenant id. - auto tenantId = boost::none; + // A tenant's change collection and the pre-images collection are always associated with a + // tenant id. + const auto tenantId = _stateDoc.getId().getTenantId(); + tassert(6664100, "Tenant id is missing", tenantId); if (setChangeStreamParameter.getEnabled()) { - _enableChangeStream(opCtx, tenantId); + _enableChangeStream(opCtx, *tenantId); } else { - _disableChangeStream(opCtx, tenantId); + _disableChangeStream(opCtx, *tenantId); } } @@ -94,11 +96,38 @@ private: * Enables the change stream in the serverless by creating the change collection and the * pre-image collection. */ - void _enableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) { + void _enableChangeStream(OperationContext* opCtx, const TenantId& tenantId) { auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); changeCollectionManager.createChangeCollection(opCtx, tenantId); - ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, tenantId); + // TODO SERVER-66643 Remove this code. A change collection must have atleast one entry for + // the change stream to advance. As such artifically create any oplog entry such that it + // will be captured by the change collection. With SERVER-66643, the pre-images collection + // 'create' oplog entry will be auto captured by the change collection and hence writing + // this entry will not be required. Also remove the necessary header and linked library + // after removing this code. + [&]() { + writeConflictRetry( + opCtx, "writeNoop", NamespaceString::kRsOplogNamespace.ns(), [&] { + Lock::GlobalLock lock(opCtx, MODE_IX); + WriteUnitOfWork wuow(opCtx); + opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( + opCtx, + NamespaceString::makeChangeCollectionNSS(tenantId), + boost::none, + BSON("msg" + << "enable change stream"), + boost::none, + boost::none, + boost::none, + boost::none, + boost::none); + wuow.commit(); + }); + }(); + + // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection. + ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, boost::none); // Wait until the create requests are majority committed. waitForMajority(opCtx); @@ -108,11 +137,12 @@ private: * Disables the change stream in the serverless by dropping the change collection and the * pre-image collection. */ - void _disableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) { + void _disableChangeStream(OperationContext* opCtx, const TenantId& tenantId) { auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); changeCollectionManager.dropChangeCollection(opCtx, tenantId); - ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, tenantId); + // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection. + ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, boost::none); // Wait until the drop requests are majority committed. waitForMajority(opCtx); |