summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2019-02-11 13:37:53 -0500
committerBlake Oler <blake.oler@mongodb.com>2019-02-21 10:26:13 -0500
commit5cbcc18f3c1f100deb7124d3d665901e473134b1 (patch)
tree4c9653b45ca40afecf6990d9b6fc6101b27ba7bb /src/mongo/db
parent5bd904dff90a0e6332d6d4630053141e6617c5de (diff)
downloadmongo-5cbcc18f3c1f100deb7124d3d665901e473134b1.tar.gz
SERVER-38828 Introduce CollectionShardingRuntimeLock usage to collection critical sections.
Allows us to reduce MODE_X collection locks to MODE_IX when under UninterruptibleLockGuards
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.cpp18
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.h9
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp21
-rw-r--r--src/mongo/db/s/collection_sharding_state.h31
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp59
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h47
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp2
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp13
8 files changed, 164 insertions, 36 deletions
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp
index 9836cb116b7..38999e3d7f3 100644
--- a/src/mongo/db/s/collection_sharding_runtime.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime.cpp
@@ -78,7 +78,6 @@ CollectionShardingRuntime::CollectionShardingRuntime(ServiceContext* sc,
NamespaceString nss,
executor::TaskExecutor* rangeDeleterExecutor)
: CollectionShardingState(nss),
- _stateChangeMutex(nss.toString()),
_nss(std::move(nss)),
_metadataManager(std::make_shared<MetadataManager>(sc, _nss, rangeDeleterExecutor)) {
if (isNamespaceAlwaysUnsharded(_nss)) {
@@ -193,13 +192,19 @@ CollectionCriticalSection::CollectionCriticalSection(OperationContext* opCtx, Na
AutoGetCollection::ViewMode::kViewsForbidden,
opCtx->getServiceContext()->getPreciseClockSource()->now() +
Milliseconds(migrationLockAcquisitionMaxWaitMS.load()));
- CollectionShardingState::get(opCtx, _nss)->enterCriticalSectionCatchUpPhase(_opCtx);
+ auto* const csr = CollectionShardingRuntime::get(_opCtx, _nss);
+ auto csrLock = CollectionShardingState::CSRLock::lockExclusive(_opCtx, csr);
+
+ csr->enterCriticalSectionCatchUpPhase(_opCtx, csrLock);
}
CollectionCriticalSection::~CollectionCriticalSection() {
UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
- AutoGetCollection autoColl(_opCtx, _nss, MODE_IX, MODE_X);
- CollectionShardingState::get(_opCtx, _nss)->exitCriticalSection(_opCtx);
+ AutoGetCollection autoColl(_opCtx, _nss, MODE_IX, MODE_IX);
+ auto* const csr = CollectionShardingRuntime::get(_opCtx, _nss);
+ auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
+
+ csr->exitCriticalSection(_opCtx, csrLock);
}
void CollectionCriticalSection::enterCommitPhase() {
@@ -210,7 +215,10 @@ void CollectionCriticalSection::enterCommitPhase() {
AutoGetCollection::ViewMode::kViewsForbidden,
_opCtx->getServiceContext()->getPreciseClockSource()->now() +
Milliseconds(migrationLockAcquisitionMaxWaitMS.load()));
- CollectionShardingState::get(_opCtx, _nss)->enterCriticalSectionCommitPhase(_opCtx);
+ auto* const csr = CollectionShardingRuntime::get(_opCtx, _nss);
+ auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
+
+ csr->enterCriticalSectionCommitPhase(_opCtx, csrLock);
}
} // namespace mongo
diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h
index e90048a8630..3c14a46db31 100644
--- a/src/mongo/db/s/collection_sharding_runtime.h
+++ b/src/mongo/db/s/collection_sharding_runtime.h
@@ -34,7 +34,6 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/metadata_manager.h"
-#include "mongo/db/s/sharding_state_lock.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/variant.h"
#include "mongo/util/decorable.h"
@@ -52,8 +51,6 @@ class CollectionShardingRuntime final : public CollectionShardingState,
MONGO_DISALLOW_COPYING(CollectionShardingRuntime);
public:
- using CSRLock = ShardingStateLock<CollectionShardingRuntime>;
-
CollectionShardingRuntime(ServiceContext* sc,
NamespaceString nss,
executor::TaskExecutor* rangeDeleterExecutor);
@@ -149,15 +146,9 @@ public:
}
private:
- friend CSRLock;
-
friend boost::optional<Date_t> CollectionRangeDeleter::cleanUpNextRange(
OperationContext*, NamespaceString const&, OID const&, int, CollectionRangeDeleter*);
- // Object-wide ResourceMutex to protect changes to the CollectionShardingRuntime or objects
- // held within.
- Lock::ResourceMutex _stateChangeMutex;
-
// Namespace this state belongs to.
const NamespaceString _nss;
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index fd5b0f943bd..980c7116469 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -134,7 +134,8 @@ ChunkVersion getOperationReceivedVersion(OperationContext* opCtx, const Namespac
} // namespace
-CollectionShardingState::CollectionShardingState(NamespaceString nss) : _nss(std::move(nss)) {}
+CollectionShardingState::CollectionShardingState(NamespaceString nss)
+ : _stateChangeMutex(nss.toString()), _nss(std::move(nss)) {}
CollectionShardingState* CollectionShardingState::get(OperationContext* opCtx,
const NamespaceString& nss) {
@@ -206,9 +207,13 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx)
const auto wantedShardVersion =
metadata->isSharded() ? metadata->getShardVersion() : ChunkVersion::UNSHARDED();
- auto criticalSectionSignal = _critSec.getSignal(opCtx->lockState()->isWriteLocked()
- ? ShardingMigrationCriticalSection::kWrite
- : ShardingMigrationCriticalSection::kRead);
+ auto criticalSectionSignal = [&] {
+ auto csrLock = CSRLock::lock(opCtx, this);
+ return _critSec.getSignal(opCtx->lockState()->isWriteLocked()
+ ? ShardingMigrationCriticalSection::kWrite
+ : ShardingMigrationCriticalSection::kRead);
+ }();
+
if (criticalSectionSignal) {
// Set migration critical section on operation sharding state: operation will wait for the
// migration to finish before returning failure and retrying.
@@ -257,18 +262,18 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx)
MONGO_UNREACHABLE;
}
-void CollectionShardingState::enterCriticalSectionCatchUpPhase(OperationContext* opCtx) {
+void CollectionShardingState::enterCriticalSectionCatchUpPhase(OperationContext* opCtx, CSRLock&) {
invariant(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
_critSec.enterCriticalSectionCatchUpPhase();
}
-void CollectionShardingState::enterCriticalSectionCommitPhase(OperationContext* opCtx) {
+void CollectionShardingState::enterCriticalSectionCommitPhase(OperationContext* opCtx, CSRLock&) {
invariant(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
_critSec.enterCriticalSectionCommitPhase();
}
-void CollectionShardingState::exitCriticalSection(OperationContext* opCtx) {
- invariant(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
+void CollectionShardingState::exitCriticalSection(OperationContext* opCtx, CSRLock&) {
+ invariant(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
_critSec.exitCriticalSection();
}
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index 5c0f714b6d5..1854fe51310 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -34,6 +34,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/scoped_collection_metadata.h"
#include "mongo/db/s/sharding_migration_critical_section.h"
+#include "mongo/db/s/sharding_state_lock.h"
namespace mongo {
@@ -55,6 +56,8 @@ class CollectionShardingState {
MONGO_DISALLOW_COPYING(CollectionShardingState);
public:
+ using CSRLock = ShardingStateLock<CollectionShardingState>;
+
virtual ~CollectionShardingState() = default;
/**
@@ -103,12 +106,24 @@ public:
void checkShardVersionOrThrow(OperationContext* opCtx);
/**
- * Methods to control the collection's critical section. Must be called with the collection X
- * lock held.
+ * Methods to control the collection's critical section. Methods listed below must be called
+ * with both the collection lock and CollectionShardingRuntimeLock held in exclusive mode.
+ *
+ * In these methods, the CollectionShardingRuntimeLock ensures concurrent access to the
+ * critical section.
*/
- void enterCriticalSectionCatchUpPhase(OperationContext* opCtx);
- void enterCriticalSectionCommitPhase(OperationContext* opCtx);
- void exitCriticalSection(OperationContext* opCtx);
+ void enterCriticalSectionCatchUpPhase(OperationContext* opCtx, CSRLock&);
+ void enterCriticalSectionCommitPhase(OperationContext* opCtx, CSRLock&);
+
+
+ /**
+ * Method to control the collection's critical secion. Method listed below must be called with
+ * the collection lock in IX mode and the CollectionShardingRuntimeLock in exclusive mode.
+ *
+ * In this method, the CollectionShardingRuntimeLock ensures concurrent access to the
+ * critical section.
+ */
+ void exitCriticalSection(OperationContext* opCtx, CSRLock&);
/**
* If the collection is currently in a critical section, returns the critical section signal to
@@ -122,6 +137,12 @@ protected:
CollectionShardingState(NamespaceString nss);
private:
+ friend CSRLock;
+
+ // Object-wide ResourceMutex to protect changes to the CollectionShardingRuntime or objects
+ // held within. Use only the CollectionShardingRuntimeLock to lock this mutex.
+ Lock::ResourceMutex _stateChangeMutex;
+
// Namespace this state belongs to.
const NamespaceString _nss;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index e8bf019d4e5..9df65518993 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -151,9 +151,13 @@ public:
sessionSource->notifyNewWriteOpTime(_opTime);
}
}
+
+ _cloner->_decrementOutstandingCommitHandlers();
}
- void rollback() override {}
+ void rollback() override {
+ _cloner->_decrementOutstandingCommitHandlers();
+ }
private:
MigrationChunkClonerSourceLegacy* const _cloner;
@@ -376,6 +380,10 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
return;
}
+ if (!_successfullyAddedOperationToOutstandingCommitHandlers()) {
+ return;
+ }
+
if (opCtx->getTxnNumber()) {
opCtx->recoveryUnit()->registerChange(
new LogOpForShardingHandler(this, idElement.wrap(), 'i', opTime, {}));
@@ -402,6 +410,10 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
return;
}
+ if (!_successfullyAddedOperationToOutstandingCommitHandlers()) {
+ return;
+ }
+
if (opCtx->getTxnNumber()) {
opCtx->recoveryUnit()->registerChange(
new LogOpForShardingHandler(this, idElement.wrap(), 'u', opTime, prePostImageOpTime));
@@ -424,6 +436,10 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
return;
}
+ if (!_successfullyAddedOperationToOutstandingCommitHandlers()) {
+ return;
+ }
+
if (opCtx->getTxnNumber()) {
opCtx->recoveryUnit()->registerChange(
new LogOpForShardingHandler(this, idElement.wrap(), 'd', opTime, preImageOpTime));
@@ -433,6 +449,42 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
}
}
+bool MigrationChunkClonerSourceLegacy::_successfullyAddedOperationToOutstandingCommitHandlers() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (!_acceptingIncomingCommitHandlers) {
+ return false;
+ }
+
+ _incrementOutstandingCommitHandlers(lk);
+ return true;
+}
+
+void MigrationChunkClonerSourceLegacy::_drainAllOutstandingCommitHandlers(
+ stdx::unique_lock<stdx::mutex>& lk) {
+ invariant(_state == kDone);
+ _acceptingIncomingCommitHandlers = false;
+
+ if (_outstandingCommitHandlers == 0) {
+ return;
+ }
+
+ _allCommitHandlersDrained.wait(lk);
+}
+
+
+void MigrationChunkClonerSourceLegacy::_incrementOutstandingCommitHandlers(WithLock) {
+ invariant(_acceptingIncomingCommitHandlers);
+ ++_outstandingCommitHandlers;
+}
+
+void MigrationChunkClonerSourceLegacy::_decrementOutstandingCommitHandlers() {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ --_outstandingCommitHandlers;
+ if (_outstandingCommitHandlers == 0) {
+ _allCommitHandlersDrained.notify_all();
+ }
+}
+
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
stdx::lock_guard<stdx::mutex> sl(_mutex);
@@ -500,8 +552,11 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
}
void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
_state = kDone;
+
+ _drainAllOutstandingCommitHandlers(lk);
+
_reload.clear();
_deleted.clear();
}
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index dcd0dcfbc05..722d17fe734 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -41,6 +41,7 @@
#include "mongo/db/s/session_catalog_migration_source.h"
#include "mongo/s/request_types/move_chunk_request.h"
#include "mongo/s/shard_key_pattern.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
@@ -205,6 +206,42 @@ private:
long long* sizeAccumulator,
bool explode);
+ /**
+ * Adds an operation to the outstanding commit handlers. Returns false if the cloner is no
+ * longer accepting new commit handlers.
+ */
+ bool _successfullyAddedOperationToOutstandingCommitHandlers();
+
+ /**
+ * Called in the constructor of each RecoveryUnit::Change handler created to track a change to
+ * the collection being cloned. Increments a counter residing inside the
+ * MigrationChunkClonerSourceLegacy class.
+ *
+ * There should always be a one to one match from the number of calls to this function to the
+ * number of calls to the corresponding decrement* function.
+ *
+ * NOTE: This funtion invariants that we are currently accepting outstanding commit handlers.
+ * It is up to callers of this function to make sure that will always be the case.
+ */
+ void _incrementOutstandingCommitHandlers(WithLock);
+
+ /**
+ * Called at the end of the commit function of each RecoveryUnit::Change handler that tracks a
+ * change to the collection being cloned. Decrements a counter residing inside the
+ * MigrationChunkClonerSourceLegacy class.
+ *
+ * There should always be a one to one match from the number of calls to this function to the
+ * number of calls to the corresponding increment* function.
+ */
+ void _decrementOutstandingCommitHandlers();
+
+ /**
+ * Wait for all outstanding commit handlers to finish before returning from this function.
+ * Should only be used in the cleanup for this class. Should use a lock wrapped around this
+ * class's mutex.
+ */
+ void _drainAllOutstandingCommitHandlers(stdx::unique_lock<stdx::mutex>& lk);
+
// The original move chunk request
const MoveChunkRequest _args;
@@ -235,6 +272,16 @@ private:
// pre-allocation (initial clone).
uint64_t _averageObjectSizeForCloneLocs{0};
+ // Represents all of the created and unresolved RecoveryUnit::Change handlers used to track
+ // changes to the collection being cloned.
+ uint64_t _outstandingCommitHandlers{0};
+
+ // Signals to any waiters once all unresolved RecoveryUnit::Change handlers have completed.
+ stdx::condition_variable _allCommitHandlersDrained;
+
+ // Indicates whether new RecoveryUnit::Change handlers are accepted.
+ bool _acceptingIncomingCommitHandlers{true};
+
// List of _id of documents that were modified that must be re-cloned (xfer mods)
std::list<BSONObj> _reload;
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index fac50619bc8..a749eb45d9d 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -1201,7 +1201,7 @@ void MigrationDestinationManager::_forgetPending(OperationContext* opCtx, ChunkR
}
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, _nss, MODE_IX, MODE_X);
+ AutoGetCollection autoColl(opCtx, _nss, MODE_IX, MODE_IX);
auto* const css = CollectionShardingRuntime::get(opCtx, _nss);
const auto optMetadata = css->getCurrentMetadataIfKnown();
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 8ab95fe0276..cfce57bc0c2 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -448,7 +448,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
// this node can accept writes for this collection as a proxy for it being primary.
if (!status.isOK()) {
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
+ AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_IX);
if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, getNss())) {
CollectionShardingRuntime::get(opCtx, getNss())->clearFilteringMetadata();
uassertStatusOK(status.withContext(
@@ -484,7 +484,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
if (!refreshStatus.isOK()) {
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
+ AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_IX);
CollectionShardingRuntime::get(opCtx, getNss())->clearFilteringMetadata();
@@ -683,18 +683,19 @@ void MigrationSourceManager::_cleanup(OperationContext* opCtx) {
auto cloneDriver = [&]() {
// Unregister from the collection's sharding state and exit the migration critical section.
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
- auto* const css = CollectionShardingRuntime::get(opCtx, getNss());
+ AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_IX);
+ auto* const csr = CollectionShardingRuntime::get(opCtx, getNss());
+ auto csrLock = CollectionShardingState::CSRLock::lockExclusive(opCtx, csr);
// In the kCreated state there should be no state to clean up, but we can verify this
// just to be safe.
if (_state == kCreated) {
// Verify that we did not set the MSM on the CSR.
- invariant(!msmForCsr(css));
+ invariant(!msmForCsr(csr));
// Verify that the clone driver was not initialized.
invariant(!_cloneDriver);
} else {
- auto oldMsmOnCsr = std::exchange(msmForCsr(css), nullptr);
+ auto oldMsmOnCsr = std::exchange(msmForCsr(csr), nullptr);
invariant(this == oldMsmOnCsr);
}
_critSec.reset();