From 6d2e673bd7c69aa0de24ba3ce1ac3aa1c71343be Mon Sep 17 00:00:00 2001 From: Sergi Mateo Bellido Date: Wed, 12 May 2021 12:29:17 +0000 Subject: SERVER-56788 Extend the in-memory collection critical section to keep track of the ongoing operation --- src/mongo/db/s/shard_server_op_observer.cpp | 59 +++++++++++++++-------------- 1 file changed, 31 insertions(+), 28 deletions(-) (limited to 'src/mongo/db/s/shard_server_op_observer.cpp') diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index c1e0e01da88..0c84dabb36a 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -280,9 +280,9 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, !replCoord->getMemberState().rollback())) { const auto collCSDoc = CollectionCriticalSectionDocument::parse( IDLParserErrorContext("ShardServerOpObserver"), insertedDoc); - - - opCtx->recoveryUnit()->onCommit([opCtx, insertedNss = collCSDoc.getNss()]( + opCtx->recoveryUnit()->onCommit([opCtx, + insertedNss = collCSDoc.getNss(), + reason = collCSDoc.getReason().getOwned()]( boost::optional) { boost::optional lockCollectionIfNotPrimary; if (!isStandaloneOrPrimary(opCtx)) @@ -291,7 +291,7 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, UninterruptibleLockGuard noInterrupt(opCtx->lockState()); auto* const csr = CollectionShardingRuntime::get(opCtx, insertedNss); auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); - csr->enterCriticalSectionCatchUpPhase(csrLock); + csr->enterCriticalSectionCatchUpPhase(csrLock, reason); }); } } @@ -415,12 +415,12 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE if (!replCoord->isReplEnabled() || (!replCoord->getMemberState().recovering() && !replCoord->getMemberState().rollback())) { - const auto collCSDoc = CollectionCriticalSectionDocument::parse( IDLParserErrorContext("ShardServerOpObserver"), args.updateArgs.updatedDoc); opCtx->recoveryUnit()->onCommit( - [opCtx, updatedNss = collCSDoc.getNss()](boost::optional) { + [opCtx, updatedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()]( + boost::optional) { boost::optional lockCollectionIfNotPrimary; if (!isStandaloneOrPrimary(opCtx)) lockCollectionIfNotPrimary.emplace(opCtx, updatedNss, MODE_IX); @@ -428,7 +428,7 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE UninterruptibleLockGuard noInterrupt(opCtx->lockState()); auto* const csr = CollectionShardingRuntime::get(opCtx, updatedNss); auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); - csr->enterCriticalSectionCommitPhase(csrLock); + csr->enterCriticalSectionCommitPhase(csrLock, reason); }); } } @@ -448,9 +448,14 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE void ShardServerOpObserver::aboutToDelete(OperationContext* opCtx, NamespaceString const& nss, BSONObj const& doc) { - // Extract the _id field from the document. If it does not have an _id, use the - // document itself as the _id. - documentIdDecoration(opCtx) = doc["_id"] ? doc["_id"].wrap() : doc; + + if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) { + documentIdDecoration(opCtx) = doc; + } else { + // Extract the _id field from the document. If it does not have an _id, use the + // document itself as the _id. + documentIdDecoration(opCtx) = doc["_id"] ? doc["_id"].wrap() : doc; + } } void ShardServerOpObserver::onDelete(OperationContext* opCtx, @@ -503,24 +508,22 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx, if (!replCoord->isReplEnabled() || (!replCoord->getMemberState().recovering() && !replCoord->getMemberState().rollback())) { - const auto deletedNss([&] { - std::string coll; - fassert(5514801, - bsonExtractStringField( - documentId, CollectionCriticalSectionDocument::kNssFieldName, &coll)); - return NamespaceString(coll); - }()); - - opCtx->recoveryUnit()->onCommit([opCtx, deletedNss](boost::optional) { - boost::optional lockCollectionIfNotPrimary; - if (!isStandaloneOrPrimary(opCtx)) - lockCollectionIfNotPrimary.emplace(opCtx, deletedNss, MODE_IX); - - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); - csr->exitCriticalSection(csrLock); - }); + const auto& deletedDoc = documentId; + const auto collCSDoc = CollectionCriticalSectionDocument::parse( + IDLParserErrorContext("ShardServerOpObserver"), deletedDoc); + + opCtx->recoveryUnit()->onCommit( + [opCtx, deletedNss = collCSDoc.getNss(), reason = collCSDoc.getReason().getOwned()]( + boost::optional) { + boost::optional lockCollectionIfNotPrimary; + if (!isStandaloneOrPrimary(opCtx)) + lockCollectionIfNotPrimary.emplace(opCtx, deletedNss, MODE_IX); + + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss); + auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); + csr->exitCriticalSection(csrLock, reason); + }); } } } -- cgit v1.2.1