summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-06-29 17:20:59 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-07-06 20:31:46 -0400
commit56ebd8ce50af94b5beaa8af8f6b2da1d1accd44a (patch)
tree7b79ef6258aca638537bdd18ee481100651c7ad7 /src/mongo
parentb3589be5ce8fe43b6f36a02fb80ed4aa358eee41 (diff)
downloadmongo-56ebd8ce50af94b5beaa8af8f6b2da1d1accd44a.tar.gz
SERVER-24781 CSRS primary should cleanup dist locks on promotion
This also reverts commit cc904854866b1cd1a25508573eeab322c0dacbff.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/db.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp6
-rw-r--r--src/mongo/db/s/collection_sharding_state_test.cpp9
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.cpp7
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.h8
-rw-r--r--src/mongo/db/s/sharding_state.cpp9
-rw-r--r--src/mongo/db/s/sharding_state.h7
-rw-r--r--src/mongo/db/s/sharding_state_test.cpp91
-rw-r--r--src/mongo/s/balancer/balancer.cpp76
-rw-r--r--src/mongo/s/catalog/replset/dist_lock_catalog_impl.cpp6
-rw-r--r--src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp43
-rw-r--r--src/mongo/s/catalog/replset/replset_dist_lock_manager.h10
-rw-r--r--src/mongo/s/server.cpp2
-rw-r--r--src/mongo/s/sharding_initialization.cpp29
-rw-r--r--src/mongo/s/sharding_initialization.h16
16 files changed, 217 insertions, 115 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 74c3298f5a2..a10b0be5a39 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -779,7 +779,10 @@ static void _initAndListen(int listenPort) {
uassertStatusOK(ShardingStateRecovery::recover(startupOpCtx.get()));
}
} else if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- uassertStatusOK(initializeGlobalShardingStateForMongod(ConnectionString::forLocal()));
+ uassertStatusOK(
+ initializeGlobalShardingStateForMongod(startupOpCtx.get(),
+ ConnectionString::forLocal(),
+ kDistLockProcessIdForConfigServer));
Balancer::create(startupOpCtx->getServiceContext());
}
@@ -788,7 +791,8 @@ static void _initAndListen(int listenPort) {
auto parseStatus = ShardIdentityType::fromBSON(serverGlobalParams.overrideShardIdentity);
uassertStatusOK(parseStatus);
uassertStatusOK(ShardingState::get(startupOpCtx.get())
- ->initializeFromShardIdentity(parseStatus.getValue(), Date_t::max()));
+ ->initializeFromShardIdentity(
+ startupOpCtx.get(), parseStatus.getValue(), Date_t::max()));
uassertStatusOK(reloadShardRegistryUntilSuccess(startupOpCtx.get()));
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index f9854dc31b9..20c2d200906 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -485,6 +485,7 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnDrainingStateHook(Operat
// Don't fassert if we're mid-shutdown, let the shutdown happen gracefully.
return;
}
+
fassertFailedWithStatus(40184,
Status(status.code(),
str::stream()
@@ -493,6 +494,10 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnDrainingStateHook(Operat
<< causedBy(status)));
}
+ // Free any leftover locks from previous instantiations
+ auto distLockManager = Grid::get(txn)->catalogClient(txn)->getDistLockManager();
+ distLockManager->unlockAll(txn, distLockManager->getProcessID());
+
// If this is a config server node becoming a primary, start the balancer
auto balancer = Balancer::get(txn);
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 20cc7e2d4d6..29606ca8640 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -60,9 +60,9 @@ public:
: _txn(txn), _shardIdentity(std::move(shardIdentity)) {}
void commit() override {
- fassertNoTrace(
- 40071,
- ShardingState::get(_txn)->initializeFromShardIdentity(_shardIdentity, Date_t::max()));
+ fassertNoTrace(40071,
+ ShardingState::get(_txn)->initializeFromShardIdentity(
+ _txn, _shardIdentity, Date_t::max()));
}
void rollback() override {}
diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp
index 3e618df3f04..eaacf226826 100644
--- a/src/mongo/db/s/collection_sharding_state_test.cpp
+++ b/src/mongo/db/s/collection_sharding_state_test.cpp
@@ -61,10 +61,11 @@ public:
// Note: this assumes that globalInit will always be called on the same thread as the main
// test thread.
- ShardingState::get(txn())->setGlobalInitMethodForTest([this](const ConnectionString&) {
- _initCallCount++;
- return Status::OK();
- });
+ ShardingState::get(txn())->setGlobalInitMethodForTest(
+ [this](OperationContext*, const ConnectionString&, StringData) {
+ _initCallCount++;
+ return Status::OK();
+ });
}
void tearDown() override {}
diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp
index a65c83fc1ae..8688f7ccff8 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.cpp
+++ b/src/mongo/db/s/sharding_initialization_mongod.cpp
@@ -32,6 +32,7 @@
#include "mongo/platform/basic.h"
+#include "mongo/base/status.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/client/remote_command_targeter_factory_impl.h"
@@ -47,7 +48,9 @@
namespace mongo {
-Status initializeGlobalShardingStateForMongod(const ConnectionString& configCS) {
+Status initializeGlobalShardingStateForMongod(OperationContext* txn,
+ const ConnectionString& configCS,
+ StringData distLockProcessId) {
auto targeterFactory = stdx::make_unique<RemoteCommandTargeterFactoryImpl>();
auto targeterFactoryPtr = targeterFactory.get();
@@ -78,7 +81,9 @@ Status initializeGlobalShardingStateForMongod(const ConnectionString& configCS)
stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory));
return initializeGlobalShardingState(
+ txn,
configCS,
+ distLockProcessId,
std::move(shardFactory),
[]() { return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongod>(); },
[](ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> executor)
diff --git a/src/mongo/db/s/sharding_initialization_mongod.h b/src/mongo/db/s/sharding_initialization_mongod.h
index 34c515ffcbd..faf24aededd 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.h
+++ b/src/mongo/db/s/sharding_initialization_mongod.h
@@ -28,11 +28,13 @@
#pragma once
-#include "mongo/base/status.h"
+#include "mongo/base/string_data.h"
namespace mongo {
class ConnectionString;
+class OperationContext;
+class Status;
/**
* Initialize the sharding components of this server. This can be used on both shard and config
@@ -40,6 +42,8 @@ class ConnectionString;
*
* NOTE: This does not initialize ShardingState, which should only be done for shard servers.
*/
-Status initializeGlobalShardingStateForMongod(const ConnectionString& configCS);
+Status initializeGlobalShardingStateForMongod(OperationContext* txn,
+ const ConnectionString& configCS,
+ StringData distLockProcessId);
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index c33096ab054..b810a710d0b 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -418,7 +418,7 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn) {
return parseStatus.getStatus();
}
- auto status = initializeFromShardIdentity(parseStatus.getValue(), txn->getDeadline());
+ auto status = initializeFromShardIdentity(txn, parseStatus.getValue(), txn->getDeadline());
if (!status.isOK()) {
return status;
}
@@ -428,7 +428,8 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn) {
// NOTE: This method can be called inside a database lock so it should never take any database
// locks, perform I/O, or any long running operations.
-Status ShardingState::initializeFromShardIdentity(const ShardIdentityType& shardIdentity,
+Status ShardingState::initializeFromShardIdentity(OperationContext* txn,
+ const ShardIdentityType& shardIdentity,
Date_t deadline) {
if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) {
return Status::OK();
@@ -500,7 +501,7 @@ Status ShardingState::initializeFromShardIdentity(const ShardIdentityType& shard
ShardedConnectionInfo::addHook();
try {
- Status status = _globalInit(configSvrConnStr);
+ Status status = _globalInit(txn, configSvrConnStr, generateDistLockProcessId(txn));
// For backwards compatibility with old style inits from metadata commands.
if (status.isOK()) {
@@ -538,7 +539,7 @@ void ShardingState::_initializeImpl(ConnectionString configSvr) {
ShardedConnectionInfo::addHook();
try {
- Status status = _globalInit(configSvr);
+ Status status = _globalInit(txn.get(), configSvr, generateDistLockProcessId(txn.get()));
if (status.isOK()) {
ReplicaSetMonitor::setSynchronousConfigChangeHook(
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index 9ab74ea5b4a..5b12c44d19d 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -68,7 +68,8 @@ class ShardingState {
MONGO_DISALLOW_COPYING(ShardingState);
public:
- using GlobalInitFunc = stdx::function<Status(const ConnectionString&)>;
+ using GlobalInitFunc =
+ stdx::function<Status(OperationContext*, const ConnectionString&, StringData)>;
ShardingState();
~ShardingState();
@@ -125,7 +126,9 @@ public:
*
* Returns ErrorCodes::ExceededTimeLimit if deadline has passed.
*/
- Status initializeFromShardIdentity(const ShardIdentityType& shardIdentity, Date_t deadline);
+ Status initializeFromShardIdentity(OperationContext* txn,
+ const ShardIdentityType& shardIdentity,
+ Date_t deadline);
/**
* Shuts down sharding machinery on the shard.
diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp
index 76c6772994c..ebd26413091 100644
--- a/src/mongo/db/s/sharding_state_test.cpp
+++ b/src/mongo/db/s/sharding_state_test.cpp
@@ -132,7 +132,8 @@ public:
_client = _service.makeClient("ShardingStateTest");
_opCtx = _client->makeOperationContext();
- _shardingState.setGlobalInitMethodForTest([this](const ConnectionString& connStr) {
+ _shardingState.setGlobalInitMethodForTest([&](
+ OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) {
initGrid(_opCtx.get(), connStr);
return Status::OK();
});
@@ -172,7 +173,7 @@ TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) {
shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
- ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max()));
ASSERT_TRUE(shardingState()->enabled());
ASSERT_EQ("a", shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString());
@@ -185,22 +186,27 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) {
shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
- shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
- return Status{ErrorCodes::ShutdownInProgress, "shutting down"};
- });
+ shardingState()->setGlobalInitMethodForTest(
+ [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) {
+ return Status{ErrorCodes::ShutdownInProgress, "shutting down"};
+ });
{
- auto status = shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max());
+ auto status =
+ shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max());
ASSERT_EQ(ErrorCodes::ShutdownInProgress, status);
}
// ShardingState is now in error state, attempting to call it again will still result in error.
shardingState()->setGlobalInitMethodForTest(
- [](const ConnectionString& connStr) { return Status::OK(); });
+ [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) {
+ return Status::OK();
+ });
{
- auto status = shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max());
+ auto status =
+ shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max());
ASSERT_EQ(ErrorCodes::ManualInterventionRequired, status);
}
@@ -215,7 +221,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
shardIdentity.setShardName("a");
shardIdentity.setClusterId(clusterID);
- ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max()));
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
@@ -223,11 +229,12 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(clusterID);
- shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
+ shardingState()->setGlobalInitMethodForTest(
+ [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) {
+ return Status{ErrorCodes::InternalError, "should not reach here"};
+ });
- ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max()));
ASSERT_TRUE(shardingState()->enabled());
ASSERT_EQ("a", shardingState()->getShardName());
@@ -242,7 +249,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) {
shardIdentity.setShardName("a");
shardIdentity.setClusterId(clusterID);
- ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max()));
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
@@ -250,11 +257,12 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) {
shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(clusterID);
- shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
+ shardingState()->setGlobalInitMethodForTest(
+ [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) {
+ return Status{ErrorCodes::InternalError, "should not reach here"};
+ });
- ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max()));
ASSERT_TRUE(shardingState()->enabled());
ASSERT_EQ("a", shardingState()->getShardName());
@@ -269,7 +277,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) {
shardIdentity.setShardName("a");
shardIdentity.setClusterId(clusterID);
- ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max()));
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
@@ -277,11 +285,13 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) {
shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(clusterID);
- shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
+ shardingState()->setGlobalInitMethodForTest(
+ [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) {
+ return Status{ErrorCodes::InternalError, "should not reach here"};
+ });
- auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max());
+ auto status =
+ shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max());
ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status);
ASSERT_TRUE(shardingState()->enabled());
@@ -297,7 +307,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) {
shardIdentity.setShardName("a");
shardIdentity.setClusterId(clusterID);
- ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max()));
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
@@ -305,11 +315,13 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) {
shardIdentity2.setShardName("b");
shardIdentity2.setClusterId(clusterID);
- shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
+ shardingState()->setGlobalInitMethodForTest(
+ [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) {
+ return Status{ErrorCodes::InternalError, "should not reach here"};
+ });
- auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max());
+ auto status =
+ shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max());
ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status);
ASSERT_TRUE(shardingState()->enabled());
@@ -324,7 +336,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithPreviouslyUnsetClusterIdSucceeds) {
shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID());
- ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max()));
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
@@ -332,11 +344,12 @@ TEST_F(ShardingStateTest, InitializeAgainWithPreviouslyUnsetClusterIdSucceeds) {
shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(OID::gen());
- shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
+ shardingState()->setGlobalInitMethodForTest(
+ [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) {
+ return Status{ErrorCodes::InternalError, "should not reach here"};
+ });
- ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max()));
ASSERT_TRUE(shardingState()->enabled());
ASSERT_EQ("a", shardingState()->getShardName());
@@ -350,7 +363,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) {
shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
- ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()));
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max()));
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
@@ -358,11 +371,13 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) {
shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(OID::gen());
- shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
+ shardingState()->setGlobalInitMethodForTest(
+ [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) {
+ return Status{ErrorCodes::InternalError, "should not reach here"};
+ });
- auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max());
+ auto status =
+ shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max());
ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status);
ASSERT_TRUE(shardingState()->enabled());
diff --git a/src/mongo/s/balancer/balancer.cpp b/src/mongo/s/balancer/balancer.cpp
index 7082eac3f44..ab881db8022 100644
--- a/src/mongo/s/balancer/balancer.cpp
+++ b/src/mongo/s/balancer/balancer.cpp
@@ -195,7 +195,7 @@ Status executeSingleMigration(OperationContext* txn,
maxChunkSizeBytes,
secondaryThrottle,
waitForDelete,
- true); // takeDistLock flag.
+ false); // takeDistLock flag.
appendOperationDeadlineIfSet(txn, &builder);
@@ -208,12 +208,61 @@ Status executeSingleMigration(OperationContext* txn,
status = {ErrorCodes::ShardNotFound,
str::stream() << "shard " << migrateInfo.from << " not found"};
} else {
- StatusWith<Shard::CommandResponse> cmdStatus =
- shard->runCommand(txn,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- cmdObj,
- Shard::RetryPolicy::kNotIdempotent);
+ const std::string whyMessage(
+ str::stream() << "migrating chunk " << ChunkRange(c->getMin(), c->getMax()).toString()
+ << " in "
+ << nss.ns());
+ StatusWith<Shard::CommandResponse> cmdStatus{ErrorCodes::InternalError, "Uninitialized"};
+
+ // Send the first moveChunk command with the balancer holding the distlock.
+ {
+ StatusWith<DistLockManager::ScopedDistLock> distLockStatus =
+ Grid::get(txn)->catalogClient(txn)->distLock(txn, nss.ns(), whyMessage);
+ if (!distLockStatus.isOK()) {
+ const std::string msg = str::stream()
+ << "Could not acquire collection lock for " << nss.ns() << " to migrate chunk ["
+ << c->getMin() << "," << c->getMax() << ") due to "
+ << distLockStatus.getStatus().toString();
+ warning() << msg;
+ return {distLockStatus.getStatus().code(), msg};
+ }
+
+ cmdStatus = shard->runCommand(txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ cmdObj,
+ Shard::RetryPolicy::kNotIdempotent);
+ }
+
+ if (cmdStatus == ErrorCodes::LockBusy) {
+ // The moveChunk source shard attempted to take the distlock despite being told not to
+ // do so. The shard is likely v3.2 or earlier, which always expects to take the
+ // distlock. Reattempt the moveChunk without the balancer holding the distlock so that
+ // the shard can successfully acquire it.
+ BSONObjBuilder builder;
+ MoveChunkRequest::appendAsCommand(
+ &builder,
+ nss,
+ cm->getVersion(),
+ Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(),
+ migrateInfo.from,
+ migrateInfo.to,
+ ChunkRange(c->getMin(), c->getMax()),
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete,
+ true); // takeDistLock flag.
+
+ appendOperationDeadlineIfSet(txn, &builder);
+
+ cmdObj = builder.obj();
+
+ cmdStatus = shard->runCommand(txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ cmdObj,
+ Shard::RetryPolicy::kIdempotent);
+ }
if (!cmdStatus.isOK()) {
status = std::move(cmdStatus.getStatus());
@@ -385,8 +434,6 @@ void Balancer::_mainThread() {
log() << "CSRS balancer is starting";
- // TODO (SERVER-23096): Use the actual cluster id
- const OID csrsBalancerLockSessionID{OID()};
const Seconds kInitBackoffInterval(60);
// The balancer thread is holding the balancer during its entire lifetime
@@ -395,13 +442,8 @@ void Balancer::_mainThread() {
// Take the balancer distributed lock
while (!_stopRequested() && !scopedBalancerLock) {
auto shardingContext = Grid::get(txn.get());
- auto scopedDistLock =
- shardingContext->catalogClient(txn.get())->getDistLockManager()->lockWithSessionID(
- txn.get(),
- "balancer",
- "CSRS balancer starting",
- csrsBalancerLockSessionID,
- DistLockManager::kSingleLockAttemptTimeout);
+ auto scopedDistLock = shardingContext->catalogClient(txn.get())->distLock(
+ txn.get(), "balancer", "CSRS Balancer");
if (!scopedDistLock.isOK()) {
warning() << "Balancer distributed lock could not be acquired and will be retried in "
"one minute"
@@ -414,7 +456,7 @@ void Balancer::_mainThread() {
scopedBalancerLock = std::move(scopedDistLock.getValue());
}
- log() << "CSRS balancer started with instance id " << csrsBalancerLockSessionID;
+ log() << "CSRS balancer thread is now running";
// Main balancer loop
while (!_stopRequested()) {
diff --git a/src/mongo/s/catalog/replset/dist_lock_catalog_impl.cpp b/src/mongo/s/catalog/replset/dist_lock_catalog_impl.cpp
index bd8b1b6a0ba..7105ae33de5 100644
--- a/src/mongo/s/catalog/replset/dist_lock_catalog_impl.cpp
+++ b/src/mongo/s/catalog/replset/dist_lock_catalog_impl.cpp
@@ -69,6 +69,10 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
Seconds(15));
+const WriteConcernOptions kLocalWriteConcern(1,
+ WriteConcernOptions::SyncMode::UNSET,
+ Milliseconds(0));
+
/**
* Returns the resulting new object from the findAndModify response object.
* Returns LockStateChangeFailed if value field was null, which indicates that
@@ -325,7 +329,7 @@ Status DistLockCatalogImpl::unlockAll(OperationContext* txn, const std::string&
BatchedCommandRequest request(updateRequest.release());
request.setNS(_locksNS);
- request.setWriteConcern(kMajorityWriteConcern.toBSON());
+ request.setWriteConcern(kLocalWriteConcern.toBSON());
BSONObj cmdObj = request.toBSON();
diff --git a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp
index 879ef0cf0a2..1f6d3c7847e 100644
--- a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp
+++ b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp
@@ -1093,31 +1093,26 @@ TEST_F(DistLockCatalogFixture, BasicUnlockAll) {
ASSERT_OK(status);
});
- onCommand(
- [](const RemoteCommandRequest& request) -> StatusWith<BSONObj> {
- ASSERT_EQUALS(dummyHost, request.target);
- ASSERT_EQUALS("config", request.dbname);
+ onCommand([](const RemoteCommandRequest& request) -> StatusWith<BSONObj> {
+ ASSERT_EQUALS(dummyHost, request.target);
+ ASSERT_EQUALS("config", request.dbname);
- std::string errmsg;
- BatchedUpdateRequest batchRequest;
- ASSERT(batchRequest.parseBSON("config", request.cmdObj, &errmsg));
- ASSERT_EQUALS(LocksType::ConfigNS, batchRequest.getNS().toString());
- ASSERT_EQUALS(BSON("w"
- << "majority"
- << "wtimeout"
- << 15000),
- batchRequest.getWriteConcern());
- auto updates = batchRequest.getUpdates();
- ASSERT_EQUALS(1U, updates.size());
- auto update = updates.front();
- ASSERT_FALSE(update->getUpsert());
- ASSERT_TRUE(update->getMulti());
- ASSERT_EQUALS(BSON(LocksType::process("processID")), update->getQuery());
- ASSERT_EQUALS(BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED))),
- update->getUpdateExpr());
-
- return BSON("ok" << 1);
- });
+ std::string errmsg;
+ BatchedUpdateRequest batchRequest;
+ ASSERT(batchRequest.parseBSON("config", request.cmdObj, &errmsg));
+ ASSERT_EQUALS(LocksType::ConfigNS, batchRequest.getNS().toString());
+ ASSERT_EQUALS(BSON("w" << 1 << "wtimeout" << 0), batchRequest.getWriteConcern());
+ auto updates = batchRequest.getUpdates();
+ ASSERT_EQUALS(1U, updates.size());
+ auto update = updates.front();
+ ASSERT_FALSE(update->getUpsert());
+ ASSERT_TRUE(update->getMulti());
+ ASSERT_EQUALS(BSON(LocksType::process("processID")), update->getQuery());
+ ASSERT_EQUALS(BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED))),
+ update->getUpdateExpr());
+
+ return BSON("ok" << 1);
+ });
future.timed_get(kFutureTimeout);
}
diff --git a/src/mongo/s/catalog/replset/replset_dist_lock_manager.h b/src/mongo/s/catalog/replset/replset_dist_lock_manager.h
index 635b2ab343d..c29482fbe54 100644
--- a/src/mongo/s/catalog/replset/replset_dist_lock_manager.h
+++ b/src/mongo/s/catalog/replset/replset_dist_lock_manager.h
@@ -68,11 +68,11 @@ public:
virtual std::string getProcessID() override;
- virtual StatusWith<DistLockManager::ScopedDistLock> lock(OperationContext* txn,
- StringData name,
- StringData whyMessage,
- Milliseconds waitFor,
- Milliseconds lockTryInterval) override;
+ virtual StatusWith<ScopedDistLock> lock(OperationContext* txn,
+ StringData name,
+ StringData whyMessage,
+ Milliseconds waitFor,
+ Milliseconds lockTryInterval) override;
virtual StatusWith<ScopedDistLock> lockWithSessionID(OperationContext* txn,
StringData name,
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index af01e1d2c0e..d4912eead77 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -241,7 +241,9 @@ static Status initializeSharding(OperationContext* txn) {
stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory));
Status status = initializeGlobalShardingState(
+ txn,
mongosGlobalParams.configdbs,
+ generateDistLockProcessId(txn),
std::move(shardFactory),
[]() { return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(); },
[](ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> executor) {
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 9c3b2b60876..cdca5486b55 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -80,13 +80,7 @@ std::unique_ptr<ThreadPoolTaskExecutor> makeTaskExecutor(std::unique_ptr<Network
std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service,
ShardRegistry* shardRegistry,
- const HostAndPort& thisHost) {
- std::unique_ptr<SecureRandom> rng(SecureRandom::create());
- std::string distLockProcessId = str::stream()
- << thisHost.toString() << ':'
- << durationCount<Seconds>(service->getPreciseClockSource()->now().toDurationSinceEpoch())
- << ':' << static_cast<int32_t>(rng->nextInt64());
-
+ StringData distLockProcessId) {
auto distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry);
auto distLockManager =
stdx::make_unique<ReplSetDistLockManager>(service,
@@ -126,7 +120,21 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(
} // namespace
-Status initializeGlobalShardingState(const ConnectionString& configCS,
+const StringData kDistLockProcessIdForConfigServer("ConfigServer");
+
+std::string generateDistLockProcessId(OperationContext* txn) {
+ std::unique_ptr<SecureRandom> rng(SecureRandom::create());
+
+ return str::stream()
+ << HostAndPort(getHostName(), serverGlobalParams.port).toString() << ':'
+ << durationCount<Seconds>(
+ txn->getServiceContext()->getPreciseClockSource()->now().toDurationSinceEpoch())
+ << ':' << rng->nextInt64();
+}
+
+Status initializeGlobalShardingState(OperationContext* txn,
+ const ConnectionString& configCS,
+ StringData distLockProcessId,
std::unique_ptr<ShardFactory> shardFactory,
rpc::ShardingEgressMetadataHookBuilder hookBuilder,
ShardingCatalogManagerBuilder catalogManagerBuilder) {
@@ -144,9 +152,8 @@ Status initializeGlobalShardingState(const ConnectionString& configCS,
auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS));
- auto catalogClient = makeCatalogClient(getGlobalServiceContext(),
- shardRegistry.get(),
- HostAndPort(getHostName(), serverGlobalParams.port));
+ auto catalogClient =
+ makeCatalogClient(txn->getServiceContext(), shardRegistry.get(), distLockProcessId);
auto rawCatalogClient = catalogClient.get();
diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h
index a782dc6474a..0ecdd2a3508 100644
--- a/src/mongo/s/sharding_initialization.h
+++ b/src/mongo/s/sharding_initialization.h
@@ -31,6 +31,8 @@
#include <cstdint>
#include <memory>
+#include "mongo/base/string_data.h"
+#include "mongo/bson/oid.h"
#include "mongo/stdx/functional.h"
namespace mongo {
@@ -55,10 +57,22 @@ using ShardingEgressMetadataHookBuilder =
} // namespace rpc
/**
+ * Fixed process identifier for the dist lock manager running on a config server.
+ */
+extern const StringData kDistLockProcessIdForConfigServer;
+
+/**
+ * Generates a uniform string to be used as a process id for the distributed lock manager.
+ */
+std::string generateDistLockProcessId(OperationContext* txn);
+
+/**
* Takes in the connection string for reaching the config servers and initializes the global
* ShardingCatalogClient, ShardingCatalogManager, ShardRegistry, and Grid objects.
*/
-Status initializeGlobalShardingState(const ConnectionString& configCS,
+Status initializeGlobalShardingState(OperationContext* txn,
+ const ConnectionString& configCS,
+ StringData distLockProcessId,
std::unique_ptr<ShardFactory> shardFactory,
rpc::ShardingEgressMetadataHookBuilder hookBuilder,
ShardingCatalogManagerBuilder catalogManagerBuilder);