diff options
author | Sergi Mateo Bellido <sergi.mateo-bellido@mongodb.com> | 2021-03-23 16:34:28 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-25 16:16:34 +0000 |
commit | 22cd5303336353f6951ff80f9d454b98bb7485c1 (patch) | |
tree | addde9d305788a07ab0fd769b52f3884e2169c64 /src/mongo/db/s/shard_server_op_observer.cpp | |
parent | c54afe747a045fb20afe97d4d8012964fddddaba (diff) | |
download | mongo-22cd5303336353f6951ff80f9d454b98bb7485c1.tar.gz |
SERVER-55148 Extend DDL Utils that acq/rel the collection critical
section to add/rm a doc on config.collectionCriticalSections
Diffstat (limited to 'src/mongo/db/s/shard_server_op_observer.cpp')
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.cpp | 55 |
1 files changed, 55 insertions, 0 deletions
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index 9bee1efa916..9e8bf42a6b5 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -38,6 +38,7 @@ #include "mongo/db/op_observer_impl.h" #include "mongo/db/s/chunk_split_state_driver.h" #include "mongo/db/s/chunk_splitter.h" +#include "mongo/db/s/collection_critical_section_document_gen.h" #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/migration_util.h" @@ -271,6 +272,20 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, } } + if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) { + const auto collCSDoc = CollectionCriticalSectionDocument::parse( + IDLParserErrorContext("ShardServerOpObserver"), insertedDoc); + if (isStandaloneOrPrimary(opCtx)) { + opCtx->recoveryUnit()->onCommit([opCtx, insertedNss = collCSDoc.getNss()]( + boost::optional<Timestamp>) { + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto* const csr = CollectionShardingRuntime::get(opCtx, insertedNss); + auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); + csr->enterCriticalSectionCatchUpPhase(csrLock); + }); + } + } + if (metadata && metadata->isSharded()) { incrementChunkOnInsertOrUpdate(opCtx, nss, @@ -385,6 +400,28 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE } } + if (args.nss == NamespaceString::kCollectionCriticalSectionsNamespace) { + + const auto collCSDoc = CollectionCriticalSectionDocument::parse( + IDLParserErrorContext("ShardServerOpObserver"), args.updateArgs.updatedDoc); + + const auto& updatedNss = collCSDoc.getNss(); + + if (isStandaloneOrPrimary(opCtx)) { + opCtx->recoveryUnit()->onCommit([opCtx, updatedNss](boost::optional<Timestamp>) { + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto* const csr = CollectionShardingRuntime::get(opCtx, updatedNss); + auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr); + csr->enterCriticalSectionCommitPhase(csrLock); + }); + } else { + // Force subsequent uses of the namespace to refresh the filtering metadata so they + // can synchronize with any work happening on the primary (e.g., migration critical + // section). + CollectionShardingRuntime::get(opCtx, updatedNss)->clearFilteringMetadata(opCtx); + } + } + auto* const csr = CollectionShardingRuntime::get(opCtx, args.nss); const auto metadata = csr->getCurrentMetadataIfKnown(); if (metadata && metadata->isSharded()) { @@ -450,6 +487,24 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx, } } } + + if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) { + if (isStandaloneOrPrimary(opCtx)) { + const auto deletedNss([&] { + std::string coll; + fassert(5514801, + bsonExtractStringField( + documentId, CollectionCriticalSectionDocument::kNssFieldName, &coll)); + return NamespaceString(coll); + }()); + opCtx->recoveryUnit()->onCommit([opCtx, deletedNss](boost::optional<Timestamp>) { + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss); + // The CSRLock is taken in exclusive mode by exitCriticalSection + csr->exitCriticalSection(opCtx); + }); + } + } } void ShardServerOpObserver::onCreateCollection(OperationContext* opCtx, |