summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/shard_server_op_observer.cpp
diff options
context:
space:
mode:
authorSergi Mateo Bellido <sergi.mateo-bellido@mongodb.com>2021-03-23 16:34:28 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-25 16:16:34 +0000
commit22cd5303336353f6951ff80f9d454b98bb7485c1 (patch)
treeaddde9d305788a07ab0fd769b52f3884e2169c64 /src/mongo/db/s/shard_server_op_observer.cpp
parentc54afe747a045fb20afe97d4d8012964fddddaba (diff)
downloadmongo-22cd5303336353f6951ff80f9d454b98bb7485c1.tar.gz
SERVER-55148 Extend DDL Utils that acq/rel the collection critical
section to add/rm a doc on config.collectionCriticalSections
Diffstat (limited to 'src/mongo/db/s/shard_server_op_observer.cpp')
-rw-r--r--src/mongo/db/s/shard_server_op_observer.cpp55
1 files changed, 55 insertions, 0 deletions
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp
index 9bee1efa916..9e8bf42a6b5 100644
--- a/src/mongo/db/s/shard_server_op_observer.cpp
+++ b/src/mongo/db/s/shard_server_op_observer.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/op_observer_impl.h"
#include "mongo/db/s/chunk_split_state_driver.h"
#include "mongo/db/s/chunk_splitter.h"
+#include "mongo/db/s/collection_critical_section_document_gen.h"
#include "mongo/db/s/database_sharding_state.h"
#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/migration_util.h"
@@ -271,6 +272,20 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx,
}
}
+ if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) {
+ const auto collCSDoc = CollectionCriticalSectionDocument::parse(
+ IDLParserErrorContext("ShardServerOpObserver"), insertedDoc);
+ if (isStandaloneOrPrimary(opCtx)) {
+ opCtx->recoveryUnit()->onCommit([opCtx, insertedNss = collCSDoc.getNss()](
+ boost::optional<Timestamp>) {
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ auto* const csr = CollectionShardingRuntime::get(opCtx, insertedNss);
+ auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr);
+ csr->enterCriticalSectionCatchUpPhase(csrLock);
+ });
+ }
+ }
+
if (metadata && metadata->isSharded()) {
incrementChunkOnInsertOrUpdate(opCtx,
nss,
@@ -385,6 +400,28 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE
}
}
+ if (args.nss == NamespaceString::kCollectionCriticalSectionsNamespace) {
+
+ const auto collCSDoc = CollectionCriticalSectionDocument::parse(
+ IDLParserErrorContext("ShardServerOpObserver"), args.updateArgs.updatedDoc);
+
+ const auto& updatedNss = collCSDoc.getNss();
+
+ if (isStandaloneOrPrimary(opCtx)) {
+ opCtx->recoveryUnit()->onCommit([opCtx, updatedNss](boost::optional<Timestamp>) {
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ auto* const csr = CollectionShardingRuntime::get(opCtx, updatedNss);
+ auto csrLock = CollectionShardingRuntime ::CSRLock::lockExclusive(opCtx, csr);
+ csr->enterCriticalSectionCommitPhase(csrLock);
+ });
+ } else {
+ // Force subsequent uses of the namespace to refresh the filtering metadata so they
+ // can synchronize with any work happening on the primary (e.g., migration critical
+ // section).
+ CollectionShardingRuntime::get(opCtx, updatedNss)->clearFilteringMetadata(opCtx);
+ }
+ }
+
auto* const csr = CollectionShardingRuntime::get(opCtx, args.nss);
const auto metadata = csr->getCurrentMetadataIfKnown();
if (metadata && metadata->isSharded()) {
@@ -450,6 +487,24 @@ void ShardServerOpObserver::onDelete(OperationContext* opCtx,
}
}
}
+
+ if (nss == NamespaceString::kCollectionCriticalSectionsNamespace) {
+ if (isStandaloneOrPrimary(opCtx)) {
+ const auto deletedNss([&] {
+ std::string coll;
+ fassert(5514801,
+ bsonExtractStringField(
+ documentId, CollectionCriticalSectionDocument::kNssFieldName, &coll));
+ return NamespaceString(coll);
+ }());
+ opCtx->recoveryUnit()->onCommit([opCtx, deletedNss](boost::optional<Timestamp>) {
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ auto* const csr = CollectionShardingRuntime::get(opCtx, deletedNss);
+ // The CSRLock is taken in exclusive mode by exitCriticalSection
+ csr->exitCriticalSection(opCtx);
+ });
+ }
+ }
}
void ShardServerOpObserver::onCreateCollection(OperationContext* opCtx,