diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-06-29 17:20:59 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-07-06 20:31:46 -0400 |
commit | 56ebd8ce50af94b5beaa8af8f6b2da1d1accd44a (patch) | |
tree | 7b79ef6258aca638537bdd18ee481100651c7ad7 /src/mongo | |
parent | b3589be5ce8fe43b6f36a02fb80ed4aa358eee41 (diff) | |
download | mongo-56ebd8ce50af94b5beaa8af8f6b2da1d1accd44a.tar.gz |
SERVER-24781 CSRS primary should cleanup dist locks on promotion
This also reverts commit cc904854866b1cd1a25508573eeab322c0dacbff.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/db.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state_test.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.h | 8 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state_test.cpp | 91 | ||||
-rw-r--r-- | src/mongo/s/balancer/balancer.cpp | 76 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/dist_lock_catalog_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp | 43 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/replset_dist_lock_manager.h | 10 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 29 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.h | 16 |
16 files changed, 217 insertions, 115 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 74c3298f5a2..a10b0be5a39 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -779,7 +779,10 @@ static void _initAndListen(int listenPort) { uassertStatusOK(ShardingStateRecovery::recover(startupOpCtx.get())); } } else if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - uassertStatusOK(initializeGlobalShardingStateForMongod(ConnectionString::forLocal())); + uassertStatusOK( + initializeGlobalShardingStateForMongod(startupOpCtx.get(), + ConnectionString::forLocal(), + kDistLockProcessIdForConfigServer)); Balancer::create(startupOpCtx->getServiceContext()); } @@ -788,7 +791,8 @@ static void _initAndListen(int listenPort) { auto parseStatus = ShardIdentityType::fromBSON(serverGlobalParams.overrideShardIdentity); uassertStatusOK(parseStatus); uassertStatusOK(ShardingState::get(startupOpCtx.get()) - ->initializeFromShardIdentity(parseStatus.getValue(), Date_t::max())); + ->initializeFromShardIdentity( + startupOpCtx.get(), parseStatus.getValue(), Date_t::max())); uassertStatusOK(reloadShardRegistryUntilSuccess(startupOpCtx.get())); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index f9854dc31b9..20c2d200906 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -485,6 +485,7 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnDrainingStateHook(Operat // Don't fassert if we're mid-shutdown, let the shutdown happen gracefully. return; } + fassertFailedWithStatus(40184, Status(status.code(), str::stream() @@ -493,6 +494,10 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnDrainingStateHook(Operat << causedBy(status))); } + // Free any leftover locks from previous instantiations + auto distLockManager = Grid::get(txn)->catalogClient(txn)->getDistLockManager(); + distLockManager->unlockAll(txn, distLockManager->getProcessID()); + // If this is a config server node becoming a primary, start the balancer auto balancer = Balancer::get(txn); diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 20cc7e2d4d6..29606ca8640 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -60,9 +60,9 @@ public: : _txn(txn), _shardIdentity(std::move(shardIdentity)) {} void commit() override { - fassertNoTrace( - 40071, - ShardingState::get(_txn)->initializeFromShardIdentity(_shardIdentity, Date_t::max())); + fassertNoTrace(40071, + ShardingState::get(_txn)->initializeFromShardIdentity( + _txn, _shardIdentity, Date_t::max())); } void rollback() override {} diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index 3e618df3f04..eaacf226826 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -61,10 +61,11 @@ public: // Note: this assumes that globalInit will always be called on the same thread as the main // test thread. - ShardingState::get(txn())->setGlobalInitMethodForTest([this](const ConnectionString&) { - _initCallCount++; - return Status::OK(); - }); + ShardingState::get(txn())->setGlobalInitMethodForTest( + [this](OperationContext*, const ConnectionString&, StringData) { + _initCallCount++; + return Status::OK(); + }); } void tearDown() override {} diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index a65c83fc1ae..8688f7ccff8 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -32,6 +32,7 @@ #include "mongo/platform/basic.h" +#include "mongo/base/status.h" #include "mongo/client/connection_string.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/remote_command_targeter_factory_impl.h" @@ -47,7 +48,9 @@ namespace mongo { -Status initializeGlobalShardingStateForMongod(const ConnectionString& configCS) { +Status initializeGlobalShardingStateForMongod(OperationContext* txn, + const ConnectionString& configCS, + StringData distLockProcessId) { auto targeterFactory = stdx::make_unique<RemoteCommandTargeterFactoryImpl>(); auto targeterFactoryPtr = targeterFactory.get(); @@ -78,7 +81,9 @@ Status initializeGlobalShardingStateForMongod(const ConnectionString& configCS) stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory)); return initializeGlobalShardingState( + txn, configCS, + distLockProcessId, std::move(shardFactory), []() { return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongod>(); }, [](ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> executor) diff --git a/src/mongo/db/s/sharding_initialization_mongod.h b/src/mongo/db/s/sharding_initialization_mongod.h index 34c515ffcbd..faf24aededd 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.h +++ b/src/mongo/db/s/sharding_initialization_mongod.h @@ -28,11 +28,13 @@ #pragma once -#include "mongo/base/status.h" +#include "mongo/base/string_data.h" namespace mongo { class ConnectionString; +class OperationContext; +class Status; /** * Initialize the sharding components of this server. This can be used on both shard and config @@ -40,6 +42,8 @@ class ConnectionString; * * NOTE: This does not initialize ShardingState, which should only be done for shard servers. */ -Status initializeGlobalShardingStateForMongod(const ConnectionString& configCS); +Status initializeGlobalShardingStateForMongod(OperationContext* txn, + const ConnectionString& configCS, + StringData distLockProcessId); } // namespace mongo diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index c33096ab054..b810a710d0b 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -418,7 +418,7 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn) { return parseStatus.getStatus(); } - auto status = initializeFromShardIdentity(parseStatus.getValue(), txn->getDeadline()); + auto status = initializeFromShardIdentity(txn, parseStatus.getValue(), txn->getDeadline()); if (!status.isOK()) { return status; } @@ -428,7 +428,8 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn) { // NOTE: This method can be called inside a database lock so it should never take any database // locks, perform I/O, or any long running operations. -Status ShardingState::initializeFromShardIdentity(const ShardIdentityType& shardIdentity, +Status ShardingState::initializeFromShardIdentity(OperationContext* txn, + const ShardIdentityType& shardIdentity, Date_t deadline) { if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { return Status::OK(); @@ -500,7 +501,7 @@ Status ShardingState::initializeFromShardIdentity(const ShardIdentityType& shard ShardedConnectionInfo::addHook(); try { - Status status = _globalInit(configSvrConnStr); + Status status = _globalInit(txn, configSvrConnStr, generateDistLockProcessId(txn)); // For backwards compatibility with old style inits from metadata commands. if (status.isOK()) { @@ -538,7 +539,7 @@ void ShardingState::_initializeImpl(ConnectionString configSvr) { ShardedConnectionInfo::addHook(); try { - Status status = _globalInit(configSvr); + Status status = _globalInit(txn.get(), configSvr, generateDistLockProcessId(txn.get())); if (status.isOK()) { ReplicaSetMonitor::setSynchronousConfigChangeHook( diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 9ab74ea5b4a..5b12c44d19d 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -68,7 +68,8 @@ class ShardingState { MONGO_DISALLOW_COPYING(ShardingState); public: - using GlobalInitFunc = stdx::function<Status(const ConnectionString&)>; + using GlobalInitFunc = + stdx::function<Status(OperationContext*, const ConnectionString&, StringData)>; ShardingState(); ~ShardingState(); @@ -125,7 +126,9 @@ public: * * Returns ErrorCodes::ExceededTimeLimit if deadline has passed. */ - Status initializeFromShardIdentity(const ShardIdentityType& shardIdentity, Date_t deadline); + Status initializeFromShardIdentity(OperationContext* txn, + const ShardIdentityType& shardIdentity, + Date_t deadline); /** * Shuts down sharding machinery on the shard. diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp index 76c6772994c..ebd26413091 100644 --- a/src/mongo/db/s/sharding_state_test.cpp +++ b/src/mongo/db/s/sharding_state_test.cpp @@ -132,7 +132,8 @@ public: _client = _service.makeClient("ShardingStateTest"); _opCtx = _client->makeOperationContext(); - _shardingState.setGlobalInitMethodForTest([this](const ConnectionString& connStr) { + _shardingState.setGlobalInitMethodForTest([&]( + OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { initGrid(_opCtx.get(), connStr); return Status::OK(); }); @@ -172,7 +173,7 @@ TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) { shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); - ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); + ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max())); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString()); @@ -185,22 +186,27 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); - shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { - return Status{ErrorCodes::ShutdownInProgress, "shutting down"}; - }); + shardingState()->setGlobalInitMethodForTest( + [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { + return Status{ErrorCodes::ShutdownInProgress, "shutting down"}; + }); { - auto status = shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()); + auto status = + shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max()); ASSERT_EQ(ErrorCodes::ShutdownInProgress, status); } // ShardingState is now in error state, attempting to call it again will still result in error. shardingState()->setGlobalInitMethodForTest( - [](const ConnectionString& connStr) { return Status::OK(); }); + [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { + return Status::OK(); + }); { - auto status = shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max()); + auto status = + shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max()); ASSERT_EQ(ErrorCodes::ManualInterventionRequired, status); } @@ -215,7 +221,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); - ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); + ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( @@ -223,11 +229,12 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(clusterID); - shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingState()->setGlobalInitMethodForTest( + [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { + return Status{ErrorCodes::InternalError, "should not reach here"}; + }); - ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max())); + ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max())); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); @@ -242,7 +249,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); - ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); + ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( @@ -250,11 +257,12 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(clusterID); - shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingState()->setGlobalInitMethodForTest( + [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { + return Status{ErrorCodes::InternalError, "should not reach here"}; + }); - ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max())); + ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max())); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); @@ -269,7 +277,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) { shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); - ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); + ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( @@ -277,11 +285,13 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) { shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(clusterID); - shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingState()->setGlobalInitMethodForTest( + [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { + return Status{ErrorCodes::InternalError, "should not reach here"}; + }); - auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()); + auto status = + shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max()); ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); @@ -297,7 +307,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) { shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); - ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); + ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( @@ -305,11 +315,13 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) { shardIdentity2.setShardName("b"); shardIdentity2.setClusterId(clusterID); - shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingState()->setGlobalInitMethodForTest( + [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { + return Status{ErrorCodes::InternalError, "should not reach here"}; + }); - auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()); + auto status = + shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max()); ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); @@ -324,7 +336,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithPreviouslyUnsetClusterIdSucceeds) { shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID()); - ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); + ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( @@ -332,11 +344,12 @@ TEST_F(ShardingStateTest, InitializeAgainWithPreviouslyUnsetClusterIdSucceeds) { shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(OID::gen()); - shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingState()->setGlobalInitMethodForTest( + [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { + return Status{ErrorCodes::InternalError, "should not reach here"}; + }); - ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max())); + ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max())); ASSERT_TRUE(shardingState()->enabled()); ASSERT_EQ("a", shardingState()->getShardName()); @@ -350,7 +363,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) { shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); - ASSERT_OK(shardingState()->initializeFromShardIdentity(shardIdentity, Date_t::max())); + ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max())); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( @@ -358,11 +371,13 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) { shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(OID::gen()); - shardingState()->setGlobalInitMethodForTest([](const ConnectionString& connStr) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingState()->setGlobalInitMethodForTest( + [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { + return Status{ErrorCodes::InternalError, "should not reach here"}; + }); - auto status = shardingState()->initializeFromShardIdentity(shardIdentity2, Date_t::max()); + auto status = + shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max()); ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); diff --git a/src/mongo/s/balancer/balancer.cpp b/src/mongo/s/balancer/balancer.cpp index 7082eac3f44..ab881db8022 100644 --- a/src/mongo/s/balancer/balancer.cpp +++ b/src/mongo/s/balancer/balancer.cpp @@ -195,7 +195,7 @@ Status executeSingleMigration(OperationContext* txn, maxChunkSizeBytes, secondaryThrottle, waitForDelete, - true); // takeDistLock flag. + false); // takeDistLock flag. appendOperationDeadlineIfSet(txn, &builder); @@ -208,12 +208,61 @@ Status executeSingleMigration(OperationContext* txn, status = {ErrorCodes::ShardNotFound, str::stream() << "shard " << migrateInfo.from << " not found"}; } else { - StatusWith<Shard::CommandResponse> cmdStatus = - shard->runCommand(txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - cmdObj, - Shard::RetryPolicy::kNotIdempotent); + const std::string whyMessage( + str::stream() << "migrating chunk " << ChunkRange(c->getMin(), c->getMax()).toString() + << " in " + << nss.ns()); + StatusWith<Shard::CommandResponse> cmdStatus{ErrorCodes::InternalError, "Uninitialized"}; + + // Send the first moveChunk command with the balancer holding the distlock. + { + StatusWith<DistLockManager::ScopedDistLock> distLockStatus = + Grid::get(txn)->catalogClient(txn)->distLock(txn, nss.ns(), whyMessage); + if (!distLockStatus.isOK()) { + const std::string msg = str::stream() + << "Could not acquire collection lock for " << nss.ns() << " to migrate chunk [" + << c->getMin() << "," << c->getMax() << ") due to " + << distLockStatus.getStatus().toString(); + warning() << msg; + return {distLockStatus.getStatus().code(), msg}; + } + + cmdStatus = shard->runCommand(txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + cmdObj, + Shard::RetryPolicy::kNotIdempotent); + } + + if (cmdStatus == ErrorCodes::LockBusy) { + // The moveChunk source shard attempted to take the distlock despite being told not to + // do so. The shard is likely v3.2 or earlier, which always expects to take the + // distlock. Reattempt the moveChunk without the balancer holding the distlock so that + // the shard can successfully acquire it. + BSONObjBuilder builder; + MoveChunkRequest::appendAsCommand( + &builder, + nss, + cm->getVersion(), + Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(), + migrateInfo.from, + migrateInfo.to, + ChunkRange(c->getMin(), c->getMax()), + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete, + true); // takeDistLock flag. + + appendOperationDeadlineIfSet(txn, &builder); + + cmdObj = builder.obj(); + + cmdStatus = shard->runCommand(txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + cmdObj, + Shard::RetryPolicy::kIdempotent); + } if (!cmdStatus.isOK()) { status = std::move(cmdStatus.getStatus()); @@ -385,8 +434,6 @@ void Balancer::_mainThread() { log() << "CSRS balancer is starting"; - // TODO (SERVER-23096): Use the actual cluster id - const OID csrsBalancerLockSessionID{OID()}; const Seconds kInitBackoffInterval(60); // The balancer thread is holding the balancer during its entire lifetime @@ -395,13 +442,8 @@ void Balancer::_mainThread() { // Take the balancer distributed lock while (!_stopRequested() && !scopedBalancerLock) { auto shardingContext = Grid::get(txn.get()); - auto scopedDistLock = - shardingContext->catalogClient(txn.get())->getDistLockManager()->lockWithSessionID( - txn.get(), - "balancer", - "CSRS balancer starting", - csrsBalancerLockSessionID, - DistLockManager::kSingleLockAttemptTimeout); + auto scopedDistLock = shardingContext->catalogClient(txn.get())->distLock( + txn.get(), "balancer", "CSRS Balancer"); if (!scopedDistLock.isOK()) { warning() << "Balancer distributed lock could not be acquired and will be retried in " "one minute" @@ -414,7 +456,7 @@ void Balancer::_mainThread() { scopedBalancerLock = std::move(scopedDistLock.getValue()); } - log() << "CSRS balancer started with instance id " << csrsBalancerLockSessionID; + log() << "CSRS balancer thread is now running"; // Main balancer loop while (!_stopRequested()) { diff --git a/src/mongo/s/catalog/replset/dist_lock_catalog_impl.cpp b/src/mongo/s/catalog/replset/dist_lock_catalog_impl.cpp index bd8b1b6a0ba..7105ae33de5 100644 --- a/src/mongo/s/catalog/replset/dist_lock_catalog_impl.cpp +++ b/src/mongo/s/catalog/replset/dist_lock_catalog_impl.cpp @@ -69,6 +69,10 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(15)); +const WriteConcernOptions kLocalWriteConcern(1, + WriteConcernOptions::SyncMode::UNSET, + Milliseconds(0)); + /** * Returns the resulting new object from the findAndModify response object. * Returns LockStateChangeFailed if value field was null, which indicates that @@ -325,7 +329,7 @@ Status DistLockCatalogImpl::unlockAll(OperationContext* txn, const std::string& BatchedCommandRequest request(updateRequest.release()); request.setNS(_locksNS); - request.setWriteConcern(kMajorityWriteConcern.toBSON()); + request.setWriteConcern(kLocalWriteConcern.toBSON()); BSONObj cmdObj = request.toBSON(); diff --git a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp index 879ef0cf0a2..1f6d3c7847e 100644 --- a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp +++ b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp @@ -1093,31 +1093,26 @@ TEST_F(DistLockCatalogFixture, BasicUnlockAll) { ASSERT_OK(status); }); - onCommand( - [](const RemoteCommandRequest& request) -> StatusWith<BSONObj> { - ASSERT_EQUALS(dummyHost, request.target); - ASSERT_EQUALS("config", request.dbname); + onCommand([](const RemoteCommandRequest& request) -> StatusWith<BSONObj> { + ASSERT_EQUALS(dummyHost, request.target); + ASSERT_EQUALS("config", request.dbname); - std::string errmsg; - BatchedUpdateRequest batchRequest; - ASSERT(batchRequest.parseBSON("config", request.cmdObj, &errmsg)); - ASSERT_EQUALS(LocksType::ConfigNS, batchRequest.getNS().toString()); - ASSERT_EQUALS(BSON("w" - << "majority" - << "wtimeout" - << 15000), - batchRequest.getWriteConcern()); - auto updates = batchRequest.getUpdates(); - ASSERT_EQUALS(1U, updates.size()); - auto update = updates.front(); - ASSERT_FALSE(update->getUpsert()); - ASSERT_TRUE(update->getMulti()); - ASSERT_EQUALS(BSON(LocksType::process("processID")), update->getQuery()); - ASSERT_EQUALS(BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED))), - update->getUpdateExpr()); - - return BSON("ok" << 1); - }); + std::string errmsg; + BatchedUpdateRequest batchRequest; + ASSERT(batchRequest.parseBSON("config", request.cmdObj, &errmsg)); + ASSERT_EQUALS(LocksType::ConfigNS, batchRequest.getNS().toString()); + ASSERT_EQUALS(BSON("w" << 1 << "wtimeout" << 0), batchRequest.getWriteConcern()); + auto updates = batchRequest.getUpdates(); + ASSERT_EQUALS(1U, updates.size()); + auto update = updates.front(); + ASSERT_FALSE(update->getUpsert()); + ASSERT_TRUE(update->getMulti()); + ASSERT_EQUALS(BSON(LocksType::process("processID")), update->getQuery()); + ASSERT_EQUALS(BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED))), + update->getUpdateExpr()); + + return BSON("ok" << 1); + }); future.timed_get(kFutureTimeout); } diff --git a/src/mongo/s/catalog/replset/replset_dist_lock_manager.h b/src/mongo/s/catalog/replset/replset_dist_lock_manager.h index 635b2ab343d..c29482fbe54 100644 --- a/src/mongo/s/catalog/replset/replset_dist_lock_manager.h +++ b/src/mongo/s/catalog/replset/replset_dist_lock_manager.h @@ -68,11 +68,11 @@ public: virtual std::string getProcessID() override; - virtual StatusWith<DistLockManager::ScopedDistLock> lock(OperationContext* txn, - StringData name, - StringData whyMessage, - Milliseconds waitFor, - Milliseconds lockTryInterval) override; + virtual StatusWith<ScopedDistLock> lock(OperationContext* txn, + StringData name, + StringData whyMessage, + Milliseconds waitFor, + Milliseconds lockTryInterval) override; virtual StatusWith<ScopedDistLock> lockWithSessionID(OperationContext* txn, StringData name, diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index af01e1d2c0e..d4912eead77 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -241,7 +241,9 @@ static Status initializeSharding(OperationContext* txn) { stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory)); Status status = initializeGlobalShardingState( + txn, mongosGlobalParams.configdbs, + generateDistLockProcessId(txn), std::move(shardFactory), []() { return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(); }, [](ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> executor) { diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 9c3b2b60876..cdca5486b55 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -80,13 +80,7 @@ std::unique_ptr<ThreadPoolTaskExecutor> makeTaskExecutor(std::unique_ptr<Network std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service, ShardRegistry* shardRegistry, - const HostAndPort& thisHost) { - std::unique_ptr<SecureRandom> rng(SecureRandom::create()); - std::string distLockProcessId = str::stream() - << thisHost.toString() << ':' - << durationCount<Seconds>(service->getPreciseClockSource()->now().toDurationSinceEpoch()) - << ':' << static_cast<int32_t>(rng->nextInt64()); - + StringData distLockProcessId) { auto distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry); auto distLockManager = stdx::make_unique<ReplSetDistLockManager>(service, @@ -126,7 +120,21 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool( } // namespace -Status initializeGlobalShardingState(const ConnectionString& configCS, +const StringData kDistLockProcessIdForConfigServer("ConfigServer"); + +std::string generateDistLockProcessId(OperationContext* txn) { + std::unique_ptr<SecureRandom> rng(SecureRandom::create()); + + return str::stream() + << HostAndPort(getHostName(), serverGlobalParams.port).toString() << ':' + << durationCount<Seconds>( + txn->getServiceContext()->getPreciseClockSource()->now().toDurationSinceEpoch()) + << ':' << rng->nextInt64(); +} + +Status initializeGlobalShardingState(OperationContext* txn, + const ConnectionString& configCS, + StringData distLockProcessId, std::unique_ptr<ShardFactory> shardFactory, rpc::ShardingEgressMetadataHookBuilder hookBuilder, ShardingCatalogManagerBuilder catalogManagerBuilder) { @@ -144,9 +152,8 @@ Status initializeGlobalShardingState(const ConnectionString& configCS, auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS)); - auto catalogClient = makeCatalogClient(getGlobalServiceContext(), - shardRegistry.get(), - HostAndPort(getHostName(), serverGlobalParams.port)); + auto catalogClient = + makeCatalogClient(txn->getServiceContext(), shardRegistry.get(), distLockProcessId); auto rawCatalogClient = catalogClient.get(); diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h index a782dc6474a..0ecdd2a3508 100644 --- a/src/mongo/s/sharding_initialization.h +++ b/src/mongo/s/sharding_initialization.h @@ -31,6 +31,8 @@ #include <cstdint> #include <memory> +#include "mongo/base/string_data.h" +#include "mongo/bson/oid.h" #include "mongo/stdx/functional.h" namespace mongo { @@ -55,10 +57,22 @@ using ShardingEgressMetadataHookBuilder = } // namespace rpc /** + * Fixed process identifier for the dist lock manager running on a config server. + */ +extern const StringData kDistLockProcessIdForConfigServer; + +/** + * Generates a uniform string to be used as a process id for the distributed lock manager. + */ +std::string generateDistLockProcessId(OperationContext* txn); + +/** * Takes in the connection string for reaching the config servers and initializes the global * ShardingCatalogClient, ShardingCatalogManager, ShardRegistry, and Grid objects. */ -Status initializeGlobalShardingState(const ConnectionString& configCS, +Status initializeGlobalShardingState(OperationContext* txn, + const ConnectionString& configCS, + StringData distLockProcessId, std::unique_ptr<ShardFactory> shardFactory, rpc::ShardingEgressMetadataHookBuilder hookBuilder, ShardingCatalogManagerBuilder catalogManagerBuilder); |