summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2021-05-31 12:58:41 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-01 13:22:26 +0000
commit5339712ac3d71198ca06fa6894c787010d2d5d96 (patch)
treebe376e5ccf80ffe15e6814553f644c8b5e668d57
parent8b565a174c9edf5451b6325085cebe0f1afe1e3a (diff)
downloadmongo-5339712ac3d71198ca06fa6894c787010d2d5d96.tar.gz
Revert "SERVER-56779 Allow multiple concurrent merges for the same collection across the cluster"
This reverts commit 6c37e83e56bf2ceea19a4de59a5aba38e28de65a.
-rw-r--r--src/mongo/db/s/active_migrations_registry.cpp91
-rw-r--r--src/mongo/db/s/active_migrations_registry.h74
-rw-r--r--src/mongo/db/s/merge_chunks_command.cpp19
3 files changed, 35 insertions, 149 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp
index 6a05fb1f460..78503d62090 100644
--- a/src/mongo/db/s/active_migrations_registry.cpp
+++ b/src/mongo/db/s/active_migrations_registry.cpp
@@ -74,11 +74,6 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
return _activeMoveChunkState->constructErrorStatus();
}
- auto itSplitMerge = _activeSplitMergeChunkStates.find(args.getNss());
- if (itSplitMerge != _activeSplitMergeChunkStates.end()) {
- return itSplitMerge->second.constructErrorStatus();
- }
-
_activeMoveChunkState.emplace(args);
return {ScopedDonateChunk(this, true, _activeMoveChunkState->notification)};
@@ -100,26 +95,6 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk(
return {ScopedReceiveChunk(this)};
}
-StatusWith<ScopedSplitMergeChunk> ActiveMigrationsRegistry::registerSplitOrMergeChunk(
- const NamespaceString& nss, const ChunkRange& chunkRange) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_activeReceiveChunkState) {
- return _activeReceiveChunkState->constructErrorStatus();
- }
-
- if (_activeMoveChunkState) {
- return _activeMoveChunkState->constructErrorStatus();
- }
-
- auto emplaceResult =
- _activeSplitMergeChunkStates.emplace(nss, ActiveSplitMergeChunkState(nss, chunkRange));
- if (!emplaceResult.second) {
- return emplaceResult.first->second.constructErrorStatus();
- }
-
- return {ScopedSplitMergeChunk(this, nss)};
-}
-
boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_activeMoveChunkState) {
@@ -168,42 +143,26 @@ void ActiveMigrationsRegistry::_clearReceiveChunk() {
_activeReceiveChunkState.reset();
}
-void ActiveMigrationsRegistry::_clearSplitMergeChunk(const NamespaceString& nss) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_activeSplitMergeChunkStates.erase(nss));
-}
-
Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() const {
- return {
- ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Unable to start new balancer operation because this shard is currently "
- "donating chunk "
- << ChunkRange(args.getMinKey(), args.getMaxKey()).toString()
- << " for namespace "
- << args.getNss().ns()
- << " to "
- << args.getToShardId()};
+ return {ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Unable to start new migration because this shard is currently "
+ "donating chunk "
+ << ChunkRange(args.getMinKey(), args.getMaxKey()).toString()
+ << " for namespace "
+ << args.getNss().ns()
+ << " to "
+ << args.getToShardId()};
}
Status ActiveMigrationsRegistry::ActiveReceiveChunkState::constructErrorStatus() const {
- return {
- ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Unable to start new balancer operation because this shard is currently "
- "receiving chunk "
- << range.toString()
- << " for namespace "
- << nss.ns()
- << " from "
- << fromShardId};
-}
-
-Status ActiveMigrationsRegistry::ActiveSplitMergeChunkState::constructErrorStatus() const {
return {ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Unable to start new balancer operation because this shard is "
- "currently splitting or merging chunk "
+ str::stream() << "Unable to start new migration because this shard is currently "
+ "receiving chunk "
<< range.toString()
<< " for namespace "
- << nss.ns()};
+ << nss.ns()
+ << " from "
+ << fromShardId};
}
ScopedDonateChunk::ScopedDonateChunk(ActiveMigrationsRegistry* registry,
@@ -267,28 +226,4 @@ ScopedReceiveChunk& ScopedReceiveChunk::operator=(ScopedReceiveChunk&& other) {
return *this;
}
-ScopedSplitMergeChunk::ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry,
- const NamespaceString& nss)
- : _registry(registry), _nss(nss) {}
-
-ScopedSplitMergeChunk::~ScopedSplitMergeChunk() {
- if (_registry) {
- _registry->_clearSplitMergeChunk(_nss);
- }
-}
-
-ScopedSplitMergeChunk::ScopedSplitMergeChunk(ScopedSplitMergeChunk&& other) {
- *this = std::move(other);
-}
-
-ScopedSplitMergeChunk& ScopedSplitMergeChunk::operator=(ScopedSplitMergeChunk&& other) {
- if (&other != this) {
- _registry = other._registry;
- other._registry = nullptr;
- _nss = std::move(other._nss);
- }
-
- return *this;
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h
index 48c55b01883..29344432d2d 100644
--- a/src/mongo/db/s/active_migrations_registry.h
+++ b/src/mongo/db/s/active_migrations_registry.h
@@ -35,8 +35,8 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/s/request_types/move_chunk_request.h"
+#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/unordered_map.h"
#include "mongo/util/concurrency/notification.h"
namespace mongo {
@@ -44,7 +44,8 @@ namespace mongo {
class OperationContext;
class ScopedDonateChunk;
class ScopedReceiveChunk;
-class ScopedSplitMergeChunk;
+template <typename T>
+class StatusWith;
/**
* Thread-safe object that keeps track of the active migrations running on a node and limits them
@@ -61,9 +62,9 @@ public:
static ActiveMigrationsRegistry& get(OperationContext* opCtx);
/**
- * If there are no migrations or split/merges running on this shard, registers an active
- * migration with the specified arguments. Returns a ScopedDonateChunk, which must be signaled
- * by the caller before it goes out of scope.
+ * If there are no migrations running on this shard, registers an active migration with the
+ * specified arguments. Returns a ScopedDonateChunk, which must be signaled by the
+ * caller before it goes out of scope.
*
* If there is an active migration already running on this shard and it has the exact same
* arguments, returns a ScopedDonateChunk. The ScopedDonateChunk can be used to join the
@@ -74,10 +75,9 @@ public:
StatusWith<ScopedDonateChunk> registerDonateChunk(const MoveChunkRequest& args);
/**
- * If there are no migrations or split/merges running on this shard, registers an active receive
- * operation with the specified session id and returns a ScopedReceiveChunk. The
- * ScopedReceiveChunk will unregister the migration when the ScopedReceiveChunk goes out of
- * scope.
+ * If there are no migrations running on this shard, registers an active receive operation with
+ * the specified session id and returns a ScopedReceiveChunk. The ScopedReceiveChunk will
+ * unregister the migration when the ScopedReceiveChunk goes out of scope.
*
* Otherwise returns a ConflictingOperationInProgress error.
*/
@@ -86,14 +86,6 @@ public:
const ShardId& fromShardId);
/**
- * If there are no migrations running on this shard, registers an active split or merge
- * operation for the specified namespace and returns a scoped object which will in turn disallow
- * other migrations or splits/merges for the same namespace (but not for other namespaces).
- */
- StatusWith<ScopedSplitMergeChunk> registerSplitOrMergeChunk(const NamespaceString& nss,
- const ChunkRange& chunkRange);
-
- /**
* If a migration has been previously registered through a call to registerDonateChunk, returns
* that namespace. Otherwise returns boost::none.
*/
@@ -110,7 +102,6 @@ public:
private:
friend class ScopedDonateChunk;
friend class ScopedReceiveChunk;
- friend class ScopedSplitMergeChunk;
// Describes the state of a currently active moveChunk operation
struct ActiveMoveChunkState {
@@ -149,24 +140,6 @@ private:
ShardId fromShardId;
};
- // Describes the state of a currently active split or merge operation
- struct ActiveSplitMergeChunkState {
- ActiveSplitMergeChunkState(NamespaceString inNss, ChunkRange inRange)
- : nss(std::move(inNss)), range(std::move(inRange)) {}
-
- /**
- * Constructs an error status to return in the case of conflicting operations.
- */
- Status constructErrorStatus() const;
-
- // Namespace for which a chunk is being split or merged
- NamespaceString nss;
-
- // If split, bounds of the chunk being split; if merge, the end bounds of the range being
- // merged
- ChunkRange range;
- };
-
/**
* Unregisters a previously registered namespace with an ongoing migration. Must only be called
* if a previous call to registerDonateChunk has succeeded.
@@ -179,12 +152,6 @@ private:
*/
void _clearReceiveChunk();
- /**
- * Unregisters a previously registered split/merge chunk operation. Must only be called if a
- * previous call to registerSplitOrMergeChunk has succeeded.
- */
- void _clearSplitMergeChunk(const NamespaceString& nss);
-
// Protects the state below
stdx::mutex _mutex;
@@ -193,10 +160,6 @@ private:
// If there is an active chunk receive operation, this field contains the original session id
boost::optional<ActiveReceiveChunkState> _activeReceiveChunkState;
-
- // If there is an active split or merge chunk operation for a particular namespace, this map
- // will contain an entry
- stdx::unordered_map<NamespaceString, ActiveSplitMergeChunkState> _activeSplitMergeChunkStates;
};
/**
@@ -272,23 +235,4 @@ private:
ActiveMigrationsRegistry* _registry;
};
-/**
- * Object of this class is returned from the registerSplitOrMergeChunk call of the active migrations
- * registry.
- */
-class ScopedSplitMergeChunk {
-public:
- ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry, const NamespaceString& nss);
- ~ScopedSplitMergeChunk();
-
- ScopedSplitMergeChunk(ScopedSplitMergeChunk&&);
- ScopedSplitMergeChunk& operator=(ScopedSplitMergeChunk&&);
-
-private:
- // Registry from which to unregister the split/merge. Not owned.
- ActiveMigrationsRegistry* _registry;
-
- NamespaceString _nss;
-};
-
} // namespace mongo
diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp
index a3e611ebcca..83137ad4950 100644
--- a/src/mongo/db/s/merge_chunks_command.cpp
+++ b/src/mongo/db/s/merge_chunks_command.cpp
@@ -40,7 +40,6 @@
#include "mongo/db/field_parser.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/sharding_state.h"
@@ -81,11 +80,19 @@ void mergeChunks(OperationContext* opCtx,
const BSONObj& minKey,
const BSONObj& maxKey,
const OID& epoch) {
- auto const shardingState = ShardingState::get(opCtx);
+ const std::string whyMessage = str::stream() << "merging chunks in " << nss.ns() << " from "
+ << redact(minKey) << " to " << redact(maxKey);
+ auto scopedDistLock = uassertStatusOKWithContext(
+ Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock(
+ opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout),
+ str::stream() << "could not acquire collection lock for " << nss.ns()
+ << " to merge chunks in ["
+ << redact(minKey)
+ << ", "
+ << redact(maxKey)
+ << ")");
- auto scopedSplitOrMergeChunk(
- uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerSplitOrMergeChunk(
- nss, ChunkRange(minKey, maxKey))));
+ auto const shardingState = ShardingState::get(opCtx);
// We now have the collection distributed lock, refresh metadata to latest version and sanity
// check
@@ -148,7 +155,7 @@ void mergeChunks(OperationContext* opCtx,
!chunksToMerge.empty());
//
- // Validate the range starts and ends at chunk boundaries and has no holes, error if not valid
+ // Validate the range starts and ends at chunks and has no holes, error if not valid
//
BSONObj firstDocMin = chunksToMerge.front().getMin();