diff options
Diffstat (limited to 'src/mongo/db/set_change_stream_state_coordinator.cpp')
-rw-r--r-- | src/mongo/db/set_change_stream_state_coordinator.cpp | 50 |
1 files changed, 40 insertions, 10 deletions
diff --git a/src/mongo/db/set_change_stream_state_coordinator.cpp b/src/mongo/db/set_change_stream_state_coordinator.cpp index a0ee72fac51..9191769214c 100644 --- a/src/mongo/db/set_change_stream_state_coordinator.cpp +++ b/src/mongo/db/set_change_stream_state_coordinator.cpp @@ -34,6 +34,8 @@ #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" #include "mongo/db/change_stream_state_gen.h" +#include "mongo/db/concurrency/exception_util.h" +#include "mongo/db/op_observer/op_observer.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/logv2/log.h" @@ -77,15 +79,15 @@ public: const auto setChangeStreamParameter = ChangeStreamStateParameters::parse( IDLParserContext("ChangeStreamStateParameters"), _stateDoc.getCommand()); - invariant(_stateDoc.getId().getTenantId()); - - // TODO SERVER-65950 replace 'tenantId' with the provided tenant id. - auto tenantId = boost::none; + // A tenant's change collection and the pre-images collection are always associated with a + // tenant id. + const auto tenantId = _stateDoc.getId().getTenantId(); + tassert(6664100, "Tenant id is missing", tenantId); if (setChangeStreamParameter.getEnabled()) { - _enableChangeStream(opCtx, tenantId); + _enableChangeStream(opCtx, *tenantId); } else { - _disableChangeStream(opCtx, tenantId); + _disableChangeStream(opCtx, *tenantId); } } @@ -94,11 +96,38 @@ private: * Enables the change stream in the serverless by creating the change collection and the * pre-image collection. */ - void _enableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) { + void _enableChangeStream(OperationContext* opCtx, const TenantId& tenantId) { auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); changeCollectionManager.createChangeCollection(opCtx, tenantId); - ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, tenantId); + // TODO SERVER-66643 Remove this code. A change collection must have atleast one entry for + // the change stream to advance. As such artifically create any oplog entry such that it + // will be captured by the change collection. With SERVER-66643, the pre-images collection + // 'create' oplog entry will be auto captured by the change collection and hence writing + // this entry will not be required. Also remove the necessary header and linked library + // after removing this code. + [&]() { + writeConflictRetry( + opCtx, "writeNoop", NamespaceString::kRsOplogNamespace.ns(), [&] { + Lock::GlobalLock lock(opCtx, MODE_IX); + WriteUnitOfWork wuow(opCtx); + opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( + opCtx, + NamespaceString::makeChangeCollectionNSS(tenantId), + boost::none, + BSON("msg" + << "enable change stream"), + boost::none, + boost::none, + boost::none, + boost::none, + boost::none); + wuow.commit(); + }); + }(); + + // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection. + ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, boost::none); // Wait until the create requests are majority committed. waitForMajority(opCtx); @@ -108,11 +137,12 @@ private: * Disables the change stream in the serverless by dropping the change collection and the * pre-image collection. */ - void _disableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) { + void _disableChangeStream(OperationContext* opCtx, const TenantId& tenantId) { auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); changeCollectionManager.dropChangeCollection(opCtx, tenantId); - ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, tenantId); + // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection. + ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, boost::none); // Wait until the drop requests are majority committed. waitForMajority(opCtx); |