summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2021-06-03 06:34:00 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-14 13:05:32 +0000
commit3a42f34b634eb9e5b0ccde551f6d8439ed1700c2 (patch)
tree35eea4d7c25a4ddb3867196895cde2aebdbb3f45
parent6cd2d27dbcd1522da59b61b5aea0378f72371c7d (diff)
downloadmongo-3a42f34b634eb9e5b0ccde551f6d8439ed1700c2.tar.gz
SERVER-56779 Allow multiple concurrent merges for the same collection across the cluster
(cherry picked from commit d0c6ab09ee3c2726d92b1044577f2c5ebd22b52c)
-rw-r--r--src/mongo/db/s/active_migrations_registry.cpp99
-rw-r--r--src/mongo/db/s/active_migrations_registry.h88
-rw-r--r--src/mongo/db/s/merge_chunks_command.cpp12
3 files changed, 154 insertions, 45 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp
index d58bcfe9805..70cab089b64 100644
--- a/src/mongo/db/s/active_migrations_registry.cpp
+++ b/src/mongo/db/s/active_migrations_registry.cpp
@@ -64,36 +64,36 @@ ActiveMigrationsRegistry& ActiveMigrationsRegistry::get(OperationContext* opCtx)
void ActiveMigrationsRegistry::lock(OperationContext* opCtx, StringData reason) {
stdx::unique_lock<Latch> lock(_mutex);
- // This wait is to hold back additional lock requests while there is already one in
- // progress.
- opCtx->waitForConditionOrInterrupt(_lockCond, lock, [this] { return !_migrationsBlocked; });
+ // This wait is to hold back additional lock requests while there is already one in progress
+ opCtx->waitForConditionOrInterrupt(
+ _chunkOperationsStateChangedCV, lock, [this] { return !_migrationsBlocked; });
// Setting flag before condvar returns to block new migrations from starting. (Favoring writers)
- LOGV2(4675601, "Going to start blocking migrations", "reason"_attr = reason);
+ LOGV2(467560, "Going to start blocking migrations", "reason"_attr = reason);
_migrationsBlocked = true;
- // Wait for any ongoing migrations to complete.
- opCtx->waitForConditionOrInterrupt(
- _lockCond, lock, [this] { return !(_activeMoveChunkState || _activeReceiveChunkState); });
+ // Wait for any ongoing chunk modifications to complete
+ opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, lock, [this] {
+ return !(_activeMoveChunkState || _activeReceiveChunkState);
+ });
}
void ActiveMigrationsRegistry::unlock(StringData reason) {
stdx::lock_guard<Latch> lock(_mutex);
- LOGV2(4675602, "Going to stop blocking migrations", "reason"_attr = reason);
+ LOGV2(467561, "Going to stop blocking migrations", "reason"_attr = reason);
_migrationsBlocked = false;
- _lockCond.notify_all();
+ _chunkOperationsStateChangedCV.notify_all();
}
StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
OperationContext* opCtx, const MoveChunkRequest& args) {
- stdx::unique_lock<Latch> lk(_mutex);
+ stdx::unique_lock<Latch> ul(_mutex);
- if (_migrationsBlocked) {
- LOGV2(4675603, "Register donate chunk waiting for migrations to be unblocked");
- opCtx->waitForConditionOrInterrupt(_lockCond, lk, [this] { return !_migrationsBlocked; });
- }
+ opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] {
+ return !_migrationsBlocked && !_activeSplitMergeChunkStates.count(args.getNss());
+ });
if (_activeReceiveChunkState) {
return _activeReceiveChunkState->constructErrorStatus();
@@ -117,12 +117,10 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk(
const NamespaceString& nss,
const ChunkRange& chunkRange,
const ShardId& fromShardId) {
- stdx::unique_lock<Latch> lk(_mutex);
+ stdx::unique_lock<Latch> ul(_mutex);
- if (_migrationsBlocked) {
- LOGV2(4675604, "Register receive chunk waiting for migrations to be unblocked");
- opCtx->waitForConditionOrInterrupt(_lockCond, lk, [this] { return !_migrationsBlocked; });
- }
+ opCtx->waitForConditionOrInterrupt(
+ _chunkOperationsStateChangedCV, ul, [this] { return !_migrationsBlocked; });
if (_activeReceiveChunkState) {
return _activeReceiveChunkState->constructErrorStatus();
@@ -137,6 +135,22 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk(
return {ScopedReceiveChunk(this)};
}
+StatusWith<ScopedSplitMergeChunk> ActiveMigrationsRegistry::registerSplitOrMergeChunk(
+ OperationContext* opCtx, const NamespaceString& nss, const ChunkRange& chunkRange) {
+ stdx::unique_lock<Latch> ul(_mutex);
+
+ opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] {
+ return !(_activeMoveChunkState && _activeMoveChunkState->args.getNss() == nss) &&
+ !_activeSplitMergeChunkStates.count(nss);
+ });
+
+ auto [it, inserted] =
+ _activeSplitMergeChunkStates.emplace(nss, ActiveSplitMergeChunkState(nss, chunkRange));
+ invariant(inserted);
+
+ return {ScopedSplitMergeChunk(this, nss)};
+}
+
boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() {
stdx::lock_guard<Latch> lk(_mutex);
if (_activeMoveChunkState) {
@@ -178,20 +192,31 @@ void ActiveMigrationsRegistry::_clearDonateChunk() {
stdx::lock_guard<Latch> lk(_mutex);
invariant(_activeMoveChunkState);
_activeMoveChunkState.reset();
- _lockCond.notify_all();
+ _chunkOperationsStateChangedCV.notify_all();
}
void ActiveMigrationsRegistry::_clearReceiveChunk() {
stdx::lock_guard<Latch> lk(_mutex);
invariant(_activeReceiveChunkState);
+ LOGV2(5004703,
+ "clearReceiveChunk ",
+ "currentKeys"_attr = ChunkRange(_activeReceiveChunkState->range.getMin(),
+ _activeReceiveChunkState->range.getMax())
+ .toString());
_activeReceiveChunkState.reset();
- _lockCond.notify_all();
+ _chunkOperationsStateChangedCV.notify_all();
+}
+
+void ActiveMigrationsRegistry::_clearSplitMergeChunk(const NamespaceString& nss) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ invariant(_activeSplitMergeChunkStates.erase(nss));
+ _chunkOperationsStateChangedCV.notify_all();
}
Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() const {
return {ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Unable to start new migration because this shard is currently "
- "donating chunk "
+ str::stream() << "Unable to start new balancer operation because this shard is "
+ "currently donating chunk "
<< ChunkRange(args.getMinKey(), args.getMaxKey()).toString()
<< " for namespace " << args.getNss().ns() << " to "
<< args.getToShardId()};
@@ -199,8 +224,8 @@ Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() co
Status ActiveMigrationsRegistry::ActiveReceiveChunkState::constructErrorStatus() const {
return {ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Unable to start new migration because this shard is currently "
- "receiving chunk "
+ str::stream() << "Unable to start new balancer operation because this shard is "
+ "currently receiving chunk "
<< range.toString() << " for namespace " << nss.ns() << " from "
<< fromShardId};
}
@@ -266,4 +291,28 @@ ScopedReceiveChunk& ScopedReceiveChunk::operator=(ScopedReceiveChunk&& other) {
return *this;
}
+ScopedSplitMergeChunk::ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry,
+ const NamespaceString& nss)
+ : _registry(registry), _nss(nss) {}
+
+ScopedSplitMergeChunk::~ScopedSplitMergeChunk() {
+ if (_registry) {
+ _registry->_clearSplitMergeChunk(_nss);
+ }
+}
+
+ScopedSplitMergeChunk::ScopedSplitMergeChunk(ScopedSplitMergeChunk&& other) {
+ *this = std::move(other);
+}
+
+ScopedSplitMergeChunk& ScopedSplitMergeChunk::operator=(ScopedSplitMergeChunk&& other) {
+ if (&other != this) {
+ _registry = other._registry;
+ other._registry = nullptr;
+ _nss = std::move(other._nss);
+ }
+
+ return *this;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h
index 4a8bd2d48aa..f6a99185d4a 100644
--- a/src/mongo/db/s/active_migrations_registry.h
+++ b/src/mongo/db/s/active_migrations_registry.h
@@ -30,7 +30,6 @@
#pragma once
#include <boost/optional.hpp>
-#include <memory>
#include "mongo/db/s/migration_session_id.h"
#include "mongo/platform/mutex.h"
@@ -42,12 +41,19 @@ namespace mongo {
class OperationContext;
class ScopedDonateChunk;
class ScopedReceiveChunk;
-template <typename T>
-class StatusWith;
+class ScopedSplitMergeChunk;
/**
- * Thread-safe object that keeps track of the active migrations running on a node and limits them
- * to only one per shard. There is only one instance of this object per shard.
+ * This class is used to synchronise all the active routing info operations for chunks owned by this
+ * shard. There is only one instance of it per ServiceContext.
+ *
+ * It implements a non-fair lock manager, which provides the following guarantees:
+ *
+ * - Move || Move (same chunk): The second move will join the first
+ * - Move || Move (different chunks or collections): The second move will result in a
+ * ConflictingOperationInProgress error
+ * - Move || Split/Merge (same collection): The second operation will block behind the first
+ * - Move/Split/Merge || Split/Merge (for different collections): Can proceed concurrently
*/
class ActiveMigrationsRegistry {
ActiveMigrationsRegistry(const ActiveMigrationsRegistry&) = delete;
@@ -70,9 +76,9 @@ public:
void unlock(StringData reason);
/**
- * If there are no migrations running on this shard, registers an active migration with the
- * specified arguments. Returns a ScopedDonateChunk, which must be signaled by the
- * caller before it goes out of scope.
+ * If there are no migrations or split/merges running on this shard, registers an active
+ * migration with the specified arguments. Returns a ScopedDonateChunk, which must be signaled
+ * by the caller before it goes out of scope.
*
* If there is an active migration already running on this shard and it has the exact same
* arguments, returns a ScopedDonateChunk. The ScopedDonateChunk can be used to join the
@@ -84,9 +90,10 @@ public:
const MoveChunkRequest& args);
/**
- * If there are no migrations running on this shard, registers an active receive operation with
- * the specified session id and returns a ScopedReceiveChunk. The ScopedReceiveChunk will
- * unregister the migration when the ScopedReceiveChunk goes out of scope.
+ * If there are no migrations or split/merges running on this shard, registers an active receive
+ * operation with the specified session id and returns a ScopedReceiveChunk. The
+ * ScopedReceiveChunk will unregister the migration when the ScopedReceiveChunk goes out of
+ * scope.
*
* Otherwise returns a ConflictingOperationInProgress error.
*/
@@ -96,6 +103,15 @@ public:
const ShardId& fromShardId);
/**
+ * If there are no migrations running on this shard, registers an active split or merge
+ * operation for the specified namespace and returns a scoped object which will in turn disallow
+ * other migrations or splits/merges for the same namespace (but not for other namespaces).
+ */
+ StatusWith<ScopedSplitMergeChunk> registerSplitOrMergeChunk(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkRange& chunkRange);
+
+ /**
* If a migration has been previously registered through a call to registerDonateChunk, returns
* that namespace. Otherwise returns boost::none.
*/
@@ -112,6 +128,7 @@ public:
private:
friend class ScopedDonateChunk;
friend class ScopedReceiveChunk;
+ friend class ScopedSplitMergeChunk;
// Describes the state of a currently active moveChunk operation
struct ActiveMoveChunkState {
@@ -150,6 +167,19 @@ private:
ShardId fromShardId;
};
+ // Describes the state of a currently active split or merge operation
+ struct ActiveSplitMergeChunkState {
+ ActiveSplitMergeChunkState(NamespaceString inNss, ChunkRange inRange)
+ : nss(std::move(inNss)), range(std::move(inRange)) {}
+
+ // Namespace for which a chunk is being split or merged
+ NamespaceString nss;
+
+ // If split, bounds of the chunk being split; if merge, the end bounds of the range being
+ // merged
+ ChunkRange range;
+ };
+
/**
* Unregisters a previously registered namespace with an ongoing migration. Must only be called
* if a previous call to registerDonateChunk has succeeded.
@@ -162,10 +192,21 @@ private:
*/
void _clearReceiveChunk();
+ /**
+ * Unregisters a previously registered split/merge chunk operation. Must only be called if a
+ * previous call to registerSplitOrMergeChunk has succeeded.
+ */
+ void _clearSplitMergeChunk(const NamespaceString& nss);
+
// Protects the state below
Mutex _mutex = MONGO_MAKE_LATCH("ActiveMigrationsRegistry::_mutex");
- stdx::condition_variable _lockCond;
+ // Condition variable which will be signaled whenever any of the states below become false,
+ // boost::none or a specific namespace removed from the map.
+ stdx::condition_variable _chunkOperationsStateChangedCV;
+
+ // Overarching block, which doesn't allow migrations to occur even when there isn't an active
+ // migration ongoing. Used during recovery and FCV changes.
bool _migrationsBlocked{false};
// If there is an active moveChunk operation, this field contains the original request
@@ -173,6 +214,10 @@ private:
// If there is an active chunk receive operation, this field contains the original session id
boost::optional<ActiveReceiveChunkState> _activeReceiveChunkState;
+
+ // If there is an active split or merge chunk operation for a particular namespace, this map
+ // will contain an entry
+ stdx::unordered_map<NamespaceString, ActiveSplitMergeChunkState> _activeSplitMergeChunkStates;
};
class MigrationBlockingGuard {
@@ -270,4 +315,23 @@ private:
ActiveMigrationsRegistry* _registry;
};
+/**
+ * Object of this class is returned from the registerSplitOrMergeChunk call of the active migrations
+ * registry.
+ */
+class ScopedSplitMergeChunk {
+public:
+ ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry, const NamespaceString& nss);
+ ~ScopedSplitMergeChunk();
+
+ ScopedSplitMergeChunk(ScopedSplitMergeChunk&&);
+ ScopedSplitMergeChunk& operator=(ScopedSplitMergeChunk&&);
+
+private:
+ // Registry from which to unregister the split/merge. Not owned.
+ ActiveMigrationsRegistry* _registry;
+
+ NamespaceString _nss;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp
index a79727716dc..041cfec28bc 100644
--- a/src/mongo/db/s/merge_chunks_command.cpp
+++ b/src/mongo/db/s/merge_chunks_command.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/field_parser.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
@@ -224,14 +225,9 @@ void mergeChunks(OperationContext* opCtx,
const BSONObj& minKey,
const BSONObj& maxKey,
const OID& epoch) {
- const std::string whyMessage = str::stream() << "merging chunks in " << nss.ns() << " from "
- << redact(minKey) << " to " << redact(maxKey);
- auto scopedDistLock = uassertStatusOKWithContext(
- Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock(
- opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout),
- str::stream() << "could not acquire collection lock for " << nss.ns()
- << " to merge chunks in [" << redact(minKey) << ", " << redact(maxKey)
- << ")");
+ auto scopedSplitOrMergeChunk(
+ uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerSplitOrMergeChunk(
+ opCtx, nss, ChunkRange(minKey, maxKey))));
// We now have the collection distributed lock, refresh metadata to latest version and sanity
// check