summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/multiversion.yml2
-rw-r--r--jstests/multiVersion/2_test_launching_cluster.js19
-rw-r--r--src/mongo/client/parallel.cpp11
-rw-r--r--src/mongo/client/parallel.h1
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/cloner.cpp29
-rw-r--r--src/mongo/db/commands/clone.cpp1
-rw-r--r--src/mongo/db/commands/mr.cpp37
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/migration_impl.cpp5
-rw-r--r--src/mongo/db/s/migration_impl.h6
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp28
-rw-r--r--src/mongo/db/s/sharding_state_recovery.cpp8
-rw-r--r--src/mongo/s/SConscript6
-rw-r--r--src/mongo/s/balance.cpp11
-rw-r--r--src/mongo/s/balance.h4
-rw-r--r--src/mongo/s/catalog/SConscript5
-rw-r--r--src/mongo/s/catalog/catalog_manager.h16
-rw-r--r--src/mongo/s/catalog/catalog_manager_common.cpp8
-rw-r--r--src/mongo/s/catalog/catalog_manager_common.h5
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.cpp8
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.h10
-rw-r--r--src/mongo/s/catalog/forwarding_catalog_manager.cpp617
-rw-r--r--src/mongo/s/catalog/forwarding_catalog_manager.h298
-rw-r--r--src/mongo/s/catalog/legacy/SConscript26
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp1425
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.h234
-rw-r--r--src/mongo/s/catalog/legacy/cluster_client_internal.cpp50
-rw-r--r--src/mongo/s/catalog/legacy/cluster_client_internal.h45
-rw-r--r--src/mongo/s/catalog/legacy/config_coordinator.cpp430
-rw-r--r--src/mongo/s/catalog/legacy/config_coordinator.h61
-rw-r--r--src/mongo/s/catalog/legacy/distlock.cpp783
-rw-r--r--src/mongo/s/catalog/legacy/distlock.h240
-rw-r--r--src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp229
-rw-r--r--src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h92
-rw-r--r--src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp337
-rw-r--r--src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h144
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.h4
-rw-r--r--src/mongo/s/client/sharding_connection_hook.cpp19
-rw-r--r--src/mongo/s/client/sharding_network_connection_hook.cpp8
-rw-r--r--src/mongo/s/commands/cluster_is_master_cmd.cpp1
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_move_primary_cmd.cpp9
-rw-r--r--src/mongo/s/d_merge.cpp3
-rw-r--r--src/mongo/s/d_sharding_server_status.cpp5
-rw-r--r--src/mongo/s/d_split.cpp3
-rw-r--r--src/mongo/s/grid.cpp12
-rw-r--r--src/mongo/s/grid.h19
-rw-r--r--src/mongo/s/query/cluster_find.cpp77
-rw-r--r--src/mongo/s/s_sharding_server_status.cpp5
-rw-r--r--src/mongo/s/server.cpp2
-rw-r--r--src/mongo/s/sharding_initialization.cpp46
-rw-r--r--src/mongo/s/sharding_test_fixture.cpp4
-rw-r--r--src/mongo/s/strategy.cpp20
54 files changed, 131 insertions, 5342 deletions
diff --git a/buildscripts/resmokeconfig/suites/multiversion.yml b/buildscripts/resmokeconfig/suites/multiversion.yml
index 20211761071..00d3ed14d51 100644
--- a/buildscripts/resmokeconfig/suites/multiversion.yml
+++ b/buildscripts/resmokeconfig/suites/multiversion.yml
@@ -5,6 +5,8 @@ selector:
exclude_files:
# TODO: SERVER-21578
- jstests/multiVersion/balancer_multiVersion_detect.js
+# TODO: SERVER-22942
+ - jstests/multiVersion/dumprestore_sharded.js
# Multiversion tests start their own mongod's.
executor:
diff --git a/jstests/multiVersion/2_test_launching_cluster.js b/jstests/multiVersion/2_test_launching_cluster.js
index 24b06f22319..b8588ce530a 100644
--- a/jstests/multiVersion/2_test_launching_cluster.js
+++ b/jstests/multiVersion/2_test_launching_cluster.js
@@ -7,11 +7,13 @@ load('./jstests/multiVersion/libs/verify_versions.js');
(function() {
"use strict";
// Check our latest versions
-var versionsToCheck = [ "last-stable",
- "latest" ];
-
-var versionsToCheckMongos = [ "last-stable" ];
+//var versionsToCheck = [ "last-stable", "latest" ];
+//var versionsToCheckMongos = [ "last-stable" ];
+// TODO put this back when SERVER-22761 is resolved
+var versionsToCheck = [ "latest" ];
+var versionsToCheckMongos = [ "latest" ];
+
jsTest.log( "Testing legacy versions..." );
for( var i = 0; i < versionsToCheck.length; i++ ){
@@ -22,7 +24,6 @@ for( var i = 0; i < versionsToCheck.length; i++ ){
var st = new ShardingTest({ shards : 2,
mongos : 2,
- sync: true, // Old clusters can't use replsets for config servers
other : {
mongosOptions : { binVersion : version },
configOptions : { binVersion : version },
@@ -48,10 +49,6 @@ jsTest.log( "Testing mixed versions..." );
st = new ShardingTest({ shards : 2,
mongos : 2,
other : {
-
- // Three config servers
- sync : true,
-
mongosOptions : { binVersion : versionsToCheckMongos },
configOptions : { binVersion : versionsToCheck },
shardOptions : { binVersion : versionsToCheck }
@@ -91,10 +88,6 @@ jsTest.log( "Testing mixed versions with replica sets..." );
st = new ShardingTest({ shards : 2,
mongos : 2,
other : {
-
- // Three config servers
- sync : true,
-
// Replica set shards
rs : true,
diff --git a/src/mongo/client/parallel.cpp b/src/mongo/client/parallel.cpp
index 43865ac7ff0..37d2622538a 100644
--- a/src/mongo/client/parallel.cpp
+++ b/src/mongo/client/parallel.cpp
@@ -115,7 +115,7 @@ static void throwCursorError(DBClientCursor* cursor) {
ParallelSortClusteredCursor::ParallelSortClusteredCursor(const QuerySpec& qSpec,
const CommandInfo& cInfo)
- : _qSpec(qSpec), _cInfo(cInfo), _totalTries(0), _cmChangeAttempted(false) {
+ : _qSpec(qSpec), _cInfo(cInfo), _totalTries(0) {
_done = false;
_didInit = false;
@@ -688,15 +688,6 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) {
}
throw;
} catch (DBException& e) {
- if (e.getCode() == ErrorCodes::IncompatibleCatalogManager) {
- fassert(28792, !_cmChangeAttempted);
- _cmChangeAttempted = true;
-
- grid.forwardingCatalogManager()->waitForCatalogManagerChange(txn);
- startInit(txn);
- return;
- }
-
warning() << "db exception when initializing on " << shardId
<< ", current connection state is " << mdata.toBSON() << causedBy(e);
e._shard = shardId;
diff --git a/src/mongo/client/parallel.h b/src/mongo/client/parallel.h
index 65585d352b9..8e91d67b5f1 100644
--- a/src/mongo/client/parallel.h
+++ b/src/mongo/client/parallel.h
@@ -190,7 +190,6 @@ private:
std::map<std::string, int> _staleNSMap;
int _totalTries;
- bool _cmChangeAttempted;
std::map<ShardId, PCMData> _cursorMap;
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index e1c23c300df..995359afd34 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -643,7 +643,6 @@ serveronlyEnv.InjectThirdPartyIncludePaths(libraries=['snappy'])
serveronlyLibdeps = [
"$BUILD_DIR/mongo/client/parallel",
"$BUILD_DIR/mongo/executor/network_interface_factory",
- "$BUILD_DIR/mongo/s/catalog/legacy/catalog_manager_legacy",
"$BUILD_DIR/mongo/s/catalog/replset/catalog_manager_replica_set",
"$BUILD_DIR/mongo/s/client/sharding_connection_hook",
"$BUILD_DIR/mongo/s/coreshard",
diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp
index 8fda1b0b17f..3bb85bb0bc7 100644
--- a/src/mongo/db/cloner.cpp
+++ b/src/mongo/db/cloner.cpp
@@ -112,31 +112,6 @@ BSONObj fixindex(const string& newDbName, BSONObj o) {
return res;
}
-namespace {
-Status _checkForCatalogManagerChangeIfNeeded(const CloneOptions& opts) {
- if (!opts.checkForCatalogChange) {
- return Status::OK();
- }
- auto catalogManager = grid.forwardingCatalogManager();
- invariant(catalogManager);
-
- Status status = catalogManager->checkForPendingCatalogChange();
- if (!status.isOK()) {
- return status;
- }
-
- auto currentConfigServerMode = catalogManager->getMode();
- if (currentConfigServerMode != opts.initialCatalogMode) {
- invariant(opts.initialCatalogMode == CatalogManager::ConfigServerMode::SCCC &&
- currentConfigServerMode == CatalogManager::ConfigServerMode::CSRS);
- return Status(ErrorCodes::IncompatibleCatalogManager,
- "CatalogManager was swapped from SCCC to CSRS mode during movePrimary."
- "Aborting movePrimary to unblock mongos.");
- }
- return Status::OK();
-}
-} // namespace
-
Cloner::Cloner() {}
struct Cloner::Fun {
@@ -144,7 +119,6 @@ struct Cloner::Fun {
void operator()(DBClientCursorBatchIterator& i) {
invariant(from_collection.coll() != "system.indexes");
- uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(_opts));
// XXX: can probably take dblock instead
unique_ptr<ScopedTransaction> scopedXact(new ScopedTransaction(txn, MODE_X));
@@ -514,7 +488,6 @@ Status Cloner::copyDb(OperationContext* txn,
clonedColls->clear();
}
- uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(opts));
{
// getCollectionInfos may make a remote call, which may block indefinitely, so release
@@ -613,7 +586,6 @@ Status Cloner::copyDb(OperationContext* txn,
if (opts.snapshot)
q.snapshot();
- uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(opts));
copy(txn, toDBName, from_name, options, to_name, masterSameProcess, opts, q);
// Copy releases the lock, so we need to re-load the database. This should
@@ -671,7 +643,6 @@ Status Cloner::copyDb(OperationContext* txn,
NamespaceString from_name(opts.fromDB, collectionName);
NamespaceString to_name(toDBName, collectionName);
- uassertStatusOK(_checkForCatalogManagerChangeIfNeeded(opts));
copyIndexes(txn,
toDBName,
diff --git a/src/mongo/db/commands/clone.cpp b/src/mongo/db/commands/clone.cpp
index a438dfa95bc..b5632dd44d7 100644
--- a/src/mongo/db/commands/clone.cpp
+++ b/src/mongo/db/commands/clone.cpp
@@ -114,7 +114,6 @@ public:
"Cannot run clone command for use by sharding movePrimary command on a "
"node that isn't yet sharding aware"));
}
- opts.initialCatalogMode = catalogManager->getMode();
}
// See if there's any collections we should ignore
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 22dd23b462c..75fefbcb51f 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -1575,26 +1575,6 @@ public:
} mapReduceCommand;
-namespace {
-Status _checkForCatalogManagerChange(ForwardingCatalogManager* catalogManager,
- CatalogManager::ConfigServerMode initialConfigServerMode) {
- Status status = catalogManager->checkForPendingCatalogChange();
- if (!status.isOK()) {
- return status;
- }
-
- auto currentConfigServerMode = catalogManager->getMode();
- if (currentConfigServerMode != initialConfigServerMode) {
- invariant(initialConfigServerMode == CatalogManager::ConfigServerMode::SCCC &&
- currentConfigServerMode == CatalogManager::ConfigServerMode::CSRS);
- return Status(ErrorCodes::IncompatibleCatalogManager,
- "CatalogManager was swapped from SCCC to CSRS mode during mapreduce."
- "Aborting mapreduce to unblock mongos.");
- }
- return Status::OK();
-}
-} // namespace
-
/**
* This class represents a map/reduce command executed on the output server of a sharded env
*/
@@ -1635,11 +1615,6 @@ public:
<< dbname));
}
- // Store the initial catalog manager mode so we can check if it changes at any point.
- CatalogManager::ConfigServerMode initialConfigServerMode =
- grid.catalogManager(txn)->getMode();
-
-
boost::optional<DisableDocumentValidation> maybeDisableValidation;
if (shouldBypassDocumentValidationForCommand(cmdObj))
maybeDisableValidation.emplace(txn);
@@ -1737,12 +1712,6 @@ public:
BSONObj query;
BSONArrayBuilder chunkSizes;
while (true) {
- Status status = _checkForCatalogManagerChange(grid.forwardingCatalogManager(),
- initialConfigServerMode);
- if (!status.isOK()) {
- return appendCommandStatus(result, status);
- }
-
ChunkPtr chunk;
if (chunks.size() > 0) {
chunk = chunks[index];
@@ -1761,12 +1730,6 @@ public:
int chunkSize = 0;
while (cursor.more() || !values.empty()) {
- status = _checkForCatalogManagerChange(grid.forwardingCatalogManager(),
- initialConfigServerMode);
- if (!status.isOK()) {
- return appendCommandStatus(result, status);
- }
-
BSONObj t;
if (cursor.more()) {
t = cursor.next().getOwned();
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 14678aefb94..665ff4ed032 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -103,5 +103,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture',
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/s/mongoscore',
+ '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock',
+ '$BUILD_DIR/mongo/s/sharding_test_fixture',
]
)
diff --git a/src/mongo/db/s/migration_impl.cpp b/src/mongo/db/s/migration_impl.cpp
index a9d77fbc1fc..248bad2d8e9 100644
--- a/src/mongo/db/s/migration_impl.cpp
+++ b/src/mongo/db/s/migration_impl.cpp
@@ -144,12 +144,11 @@ Status ChunkMoveOperationState::initialize(const BSONObj& cmdObj) {
return Status::OK();
}
-StatusWith<ForwardingCatalogManager::ScopedDistLock*>
-ChunkMoveOperationState::acquireMoveMetadata() {
+StatusWith<DistLockManager::ScopedDistLock*> ChunkMoveOperationState::acquireMoveMetadata() {
// Get the distributed lock
const string whyMessage(stream() << "migrating chunk [" << _minKey << ", " << _maxKey << ") in "
<< _nss.ns());
- _distLockStatus = grid.forwardingCatalogManager()->distLock(_txn, _nss.ns(), whyMessage);
+ _distLockStatus = grid.catalogManager(_txn)->distLock(_txn, _nss.ns(), whyMessage);
if (!_distLockStatus->isOK()) {
const string msg = stream() << "could not acquire collection lock for " << _nss.ns()
diff --git a/src/mongo/db/s/migration_impl.h b/src/mongo/db/s/migration_impl.h
index 51e6a8ba295..8f2061ed5ac 100644
--- a/src/mongo/db/s/migration_impl.h
+++ b/src/mongo/db/s/migration_impl.h
@@ -36,7 +36,7 @@
#include "mongo/client/connection_string.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/write_concern_options.h"
-#include "mongo/s/catalog/forwarding_catalog_manager.h"
+#include "mongo/s/catalog/dist_lock_manager.h"
#include "mongo/s/chunk_version.h"
namespace mongo {
@@ -77,7 +77,7 @@ public:
* TODO: Once the entire chunk move process is moved to be inside this state machine, there
* will not be any need to expose the distributed lock.
*/
- StatusWith<ForwardingCatalogManager::ScopedDistLock*> acquireMoveMetadata();
+ StatusWith<DistLockManager::ScopedDistLock*> acquireMoveMetadata();
/**
* Starts the move chunk operation.
@@ -158,7 +158,7 @@ private:
BSONObj _maxKey;
// The distributed lock, which protects other migrations from happening on the same collection
- boost::optional<StatusWith<ForwardingCatalogManager::ScopedDistLock>> _distLockStatus;
+ boost::optional<StatusWith<DistLockManager::ScopedDistLock>> _distLockStatus;
// The cached collection metadata and the shard version from the time the migration process
// started. This metadata is guaranteed to not change until either failure or successful
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index 9fdc8114f45..0b9b72e9e33 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -239,13 +239,6 @@ public:
timing.done(2);
- Status distLockStatus = distLock->checkForPendingCatalogChange();
- if (!distLockStatus.isOK()) {
- warning() << "Aborting migration due to need to swap current catalog manager"
- << causedBy(distLockStatus);
- return appendCommandStatus(result, distLockStatus);
- }
-
MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep2);
// 3.
@@ -311,13 +304,6 @@ public:
timing.done(3);
- distLockStatus = distLock->checkForPendingCatalogChange();
- if (!distLockStatus.isOK()) {
- warning() << "Aborting migration due to need to swap current catalog manager"
- << causedBy(distLockStatus);
- return appendCommandStatus(result, distLockStatus);
- }
-
MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep3);
// 4.
@@ -414,24 +400,10 @@ public:
}
txn->checkForInterrupt();
-
- distLockStatus = distLock->checkForPendingCatalogChange();
- if (!distLockStatus.isOK()) {
- warning() << "Aborting migration due to need to swap current catalog manager"
- << causedBy(distLockStatus);
- return appendCommandStatus(result, distLockStatus);
- }
}
timing.done(4);
- distLockStatus = distLock->checkForPendingCatalogChange();
- if (!distLockStatus.isOK()) {
- warning() << "Aborting migration due to need to swap current catalog manager"
- << causedBy(distLockStatus);
- return appendCommandStatus(result, distLockStatus);
- }
-
MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep4);
// 5.
diff --git a/src/mongo/db/s/sharding_state_recovery.cpp b/src/mongo/db/s/sharding_state_recovery.cpp
index 69c7dfe5596..50c4b3c0555 100644
--- a/src/mongo/db/s/sharding_state_recovery.cpp
+++ b/src/mongo/db/s/sharding_state_recovery.cpp
@@ -219,10 +219,6 @@ Status modifyRecoveryDocument(OperationContext* txn,
} // namespace
Status ShardingStateRecovery::startMetadataOp(OperationContext* txn) {
- if (grid.catalogManager(txn)->getMode() != CatalogManager::ConfigServerMode::CSRS) {
- return Status::OK();
- }
-
Status upsertStatus =
modifyRecoveryDocument(txn, RecoveryDocument::Increment, kMajorityWriteConcern);
@@ -237,10 +233,6 @@ Status ShardingStateRecovery::startMetadataOp(OperationContext* txn) {
}
void ShardingStateRecovery::endMetadataOp(OperationContext* txn) {
- if (grid.catalogManager(txn)->getMode() != CatalogManager::ConfigServerMode::CSRS) {
- return;
- }
-
Status status = modifyRecoveryDocument(txn, RecoveryDocument::Decrement, WriteConcernOptions());
if (!status.isOK()) {
warning() << "Failed to decrement minOpTimeUpdaters due to " << status;
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 8d69566e0ed..bfd6eead67e 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -22,7 +22,7 @@ env.Library(
'$BUILD_DIR/mongo/executor/network_interface_factory',
'$BUILD_DIR/mongo/executor/network_interface_thread_pool',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor',
- '$BUILD_DIR/mongo/s/catalog/forwarding_catalog_manager',
+ '$BUILD_DIR/mongo/s/catalog/catalog_manager_impl',
'client/sharding_connection_hook',
'coreshard',
],
@@ -75,7 +75,7 @@ env.Library(
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
'$BUILD_DIR/mongo/rpc/metadata',
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock',
- '$BUILD_DIR/mongo/s/catalog/forwarding_catalog_manager',
+ '$BUILD_DIR/mongo/s/catalog/catalog_manager_impl',
'$BUILD_DIR/mongo/s/catalog/replset/catalog_manager_replica_set',
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/s/mongoscore',
@@ -166,7 +166,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/s/query/cluster_cursor_manager',
'$BUILD_DIR/mongo/executor/task_executor_pool',
- 'catalog/forwarding_catalog_manager',
+ 'catalog/catalog_manager_impl',
'catalog/catalog_types',
'client/sharding_client',
'cluster_ops_impl',
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp
index 939f682ef36..48d36bf3130 100644
--- a/src/mongo/s/balance.cpp
+++ b/src/mongo/s/balance.cpp
@@ -344,7 +344,7 @@ void warnOnMultiVersion(const ShardInfoMap& shardInfo) {
}
void Balancer::_doBalanceRound(OperationContext* txn,
- ForwardingCatalogManager::ScopedDistLock* distLock,
+ DistLockManager::ScopedDistLock* distLock,
vector<shared_ptr<MigrateInfo>>* candidateChunks) {
invariant(candidateChunks);
@@ -383,8 +383,6 @@ void Balancer::_doBalanceRound(OperationContext* txn,
// For each collection, check if the balancing policy recommends moving anything around.
for (const auto& coll : collections) {
- uassertStatusOK(distLock->checkForPendingCatalogChange());
-
// Skip collections for which balancing is disabled
const NamespaceString& nss = coll.getNs();
@@ -603,8 +601,11 @@ void Balancer::run() {
uassert(13258, "oids broken after resetting!", _checkOIDs(txn.get()));
{
- auto scopedDistLock = grid.forwardingCatalogManager()->distLock(
- txn.get(), "balancer", "doing balance round");
+ auto scopedDistLock = grid.catalogManager(txn.get())
+ ->distLock(txn.get(),
+ "balancer",
+ "doing balance round",
+ DistLockManager::kSingleLockAttemptTimeout);
if (!scopedDistLock.isOK()) {
LOG(1) << "skipping balancing round" << causedBy(scopedDistLock.getStatus());
diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h
index 6429091ef5c..f9c0eb069aa 100644
--- a/src/mongo/s/balance.h
+++ b/src/mongo/s/balance.h
@@ -30,7 +30,7 @@
#pragma once
-#include "mongo/s/catalog/forwarding_catalog_manager.h"
+#include "mongo/s/catalog/dist_lock_manager.h"
#include "mongo/util/background.h"
namespace mongo {
@@ -94,7 +94,7 @@ private:
* possibly be moved
*/
void _doBalanceRound(OperationContext* txn,
- ForwardingCatalogManager::ScopedDistLock* distLock,
+ DistLockManager::ScopedDistLock* distLock,
std::vector<std::shared_ptr<MigrateInfo>>* candidateChunks);
/**
diff --git a/src/mongo/s/catalog/SConscript b/src/mongo/s/catalog/SConscript
index 59a62d17af7..674e548a317 100644
--- a/src/mongo/s/catalog/SConscript
+++ b/src/mongo/s/catalog/SConscript
@@ -4,7 +4,6 @@ Import("env")
env.SConscript(
dirs=[
- 'legacy',
'replset',
],
)
@@ -65,13 +64,11 @@ env.Library(
)
env.Library(
- target='forwarding_catalog_manager',
+ target='catalog_manager_impl',
source=[
- 'forwarding_catalog_manager.cpp'
],
LIBDEPS=[
'catalog_manager',
- 'legacy/catalog_manager_legacy',
'replset/catalog_manager_replica_set',
'replset/dist_lock_catalog_impl',
'replset/replset_dist_lock_manager',
diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h
index eadfdc9c231..c58a2a89cc1 100644
--- a/src/mongo/s/catalog/catalog_manager.h
+++ b/src/mongo/s/catalog/catalog_manager.h
@@ -34,6 +34,7 @@
#include <vector>
#include "mongo/base/disallow_copying.h"
+#include "mongo/s/catalog/dist_lock_manager.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/optime_pair.h"
@@ -51,7 +52,6 @@ struct ChunkVersion;
class CollectionType;
class ConnectionString;
class DatabaseType;
-class DistLockManager;
class NamespaceString;
class OperationContext;
class SettingsType;
@@ -82,7 +82,6 @@ enum ShardDrainingStatus {
*/
class CatalogManager {
MONGO_DISALLOW_COPYING(CatalogManager);
- friend class ForwardingCatalogManager;
public:
enum class ConfigServerMode {
@@ -106,12 +105,6 @@ public:
virtual void shutDown(OperationContext* txn, bool allowNetworking = true) = 0;
/**
- * Returns what type of catalog manager this is - CSRS for the CatalogManagerReplicaSet and
- * SCCC for the CatalogManagerLegacy.
- */
- virtual ConfigServerMode getMode() = 0;
-
- /**
* Creates a new database or updates the sharding status for an existing one. Cannot be
* used for the admin/config/local DBs, which should not be created or sharded manually
* anyways.
@@ -445,6 +438,13 @@ public:
virtual Status appendInfoForConfigServerDatabases(OperationContext* txn,
BSONArrayBuilder* builder) = 0;
+
+ virtual StatusWith<DistLockManager::ScopedDistLock> distLock(
+ OperationContext* txn,
+ StringData name,
+ StringData whyMessage,
+ stdx::chrono::milliseconds waitFor = DistLockManager::kSingleLockAttemptTimeout) = 0;
+
protected:
CatalogManager() = default;
diff --git a/src/mongo/s/catalog/catalog_manager_common.cpp b/src/mongo/s/catalog/catalog_manager_common.cpp
index d8943f626d3..cc4d8d8106b 100644
--- a/src/mongo/s/catalog/catalog_manager_common.cpp
+++ b/src/mongo/s/catalog/catalog_manager_common.cpp
@@ -606,4 +606,12 @@ Status CatalogManagerCommon::_log(OperationContext* txn,
return result;
}
+StatusWith<DistLockManager::ScopedDistLock> CatalogManagerCommon::distLock(
+ OperationContext* txn,
+ StringData name,
+ StringData whyMessage,
+ stdx::chrono::milliseconds waitFor) {
+ return getDistLockManager()->lock(txn, name, whyMessage, waitFor);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/catalog_manager_common.h b/src/mongo/s/catalog/catalog_manager_common.h
index 95353f800f8..f6512c6f4df 100644
--- a/src/mongo/s/catalog/catalog_manager_common.h
+++ b/src/mongo/s/catalog/catalog_manager_common.h
@@ -79,6 +79,11 @@ public:
const std::string& ns,
const BSONObj& detail) final;
+ StatusWith<DistLockManager::ScopedDistLock> distLock(OperationContext* txn,
+ StringData name,
+ StringData whyMessage,
+ stdx::chrono::milliseconds waitFor) final;
+
protected:
/**
* Selects an optimal shard on which to place a newly created database from the set of
diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp
index 8ab6fdc3ee9..c4cce181b1e 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/catalog_manager_mock.cpp
@@ -145,6 +145,14 @@ StatusWith<OpTimePair<std::vector<ShardType>>> CatalogManagerMock::getAllShards(
return {ErrorCodes::InternalError, "Method not implemented"};
}
+StatusWith<DistLockManager::ScopedDistLock> CatalogManagerMock::distLock(
+ OperationContext* txn,
+ StringData name,
+ StringData whyMessage,
+ stdx::chrono::milliseconds waitFor) {
+ return {ErrorCodes::InternalError, "Method not implemented"};
+}
+
bool CatalogManagerMock::runUserManagementWriteCommand(OperationContext* txn,
const string& commandName,
const string& dbname,
diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h
index 2ff1c71fab4..01b7431e7ad 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.h
+++ b/src/mongo/s/catalog/catalog_manager_mock.h
@@ -41,10 +41,6 @@ public:
CatalogManagerMock();
~CatalogManagerMock();
- ConfigServerMode getMode() override {
- return ConfigServerMode::NONE;
- }
-
Status startup(OperationContext* txn, bool allowNetworking) override;
void shutDown(OperationContext* txn, bool allowNetworking) override;
@@ -160,6 +156,12 @@ public:
DistLockManager* getDistLockManager() override;
+ StatusWith<DistLockManager::ScopedDistLock> distLock(
+ OperationContext* txn,
+ StringData name,
+ StringData whyMessage,
+ stdx::chrono::milliseconds waitFor) override;
+
Status initConfigVersion(OperationContext* txn) override;
Status appendInfoForConfigServerDatabases(OperationContext* txn,
diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.cpp b/src/mongo/s/catalog/forwarding_catalog_manager.cpp
deleted file mode 100644
index 63e75997d1e..00000000000
--- a/src/mongo/s/catalog/forwarding_catalog_manager.cpp
+++ /dev/null
@@ -1,617 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/catalog/forwarding_catalog_manager.h"
-
-#include <cstdint>
-#include <vector>
-
-#include "mongo/client/connection_string.h"
-#include "mongo/db/client.h"
-#include "mongo/db/service_context.h"
-#include "mongo/platform/random.h"
-#include "mongo/s/catalog/legacy/catalog_manager_legacy.h"
-#include "mongo/s/catalog/replset/catalog_manager_replica_set.h"
-#include "mongo/s/catalog/replset/dist_lock_catalog_impl.h"
-#include "mongo/s/catalog/replset/replset_dist_lock_manager.h"
-#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/catalog/type_collection.h"
-#include "mongo/s/catalog/type_database.h"
-#include "mongo/s/catalog/type_shard.h"
-#include "mongo/s/catalog/type_settings.h"
-#include "mongo/s/catalog/type_tags.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/write_ops/batched_command_response.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/clock_source.h"
-#include "mongo/util/log.h"
-#include "mongo/util/mongoutils/str.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-
-using executor::TaskExecutor;
-
-namespace {
-std::unique_ptr<CatalogManager> makeCatalogManager(ServiceContext* service,
- const ConnectionString& configCS,
- ShardRegistry* shardRegistry,
- const HostAndPort& thisHost) {
- std::unique_ptr<SecureRandom> rng(SecureRandom::create());
- std::string distLockProcessId = str::stream()
- << thisHost.toString() << ':'
- << durationCount<Seconds>(service->getClockSource()->now().toDurationSinceEpoch()) << ':'
- << static_cast<int32_t>(rng->nextInt64());
-
- switch (configCS.type()) {
- case ConnectionString::SET: {
- auto distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry);
- auto distLockManager = stdx::make_unique<ReplSetDistLockManager>(
- service,
- distLockProcessId,
- std::move(distLockCatalog),
- ReplSetDistLockManager::kDistLockPingInterval,
- ReplSetDistLockManager::kDistLockExpirationTime);
-
- return stdx::make_unique<CatalogManagerReplicaSet>(std::move(distLockManager));
- }
- case ConnectionString::SYNC:
- case ConnectionString::MASTER:
- case ConnectionString::CUSTOM: {
- auto catalogManagerLegacy = stdx::make_unique<CatalogManagerLegacy>();
- uassertStatusOK(catalogManagerLegacy->init(configCS, distLockProcessId));
- return std::move(catalogManagerLegacy);
- }
- default:
- MONGO_UNREACHABLE;
- }
-}
-}
-
-ForwardingCatalogManager::ForwardingCatalogManager(ServiceContext* service,
- const ConnectionString& configCS,
- ShardRegistry* shardRegistry,
- const HostAndPort& thisHost)
- : ForwardingCatalogManager(service,
- makeCatalogManager(service, configCS, shardRegistry, thisHost),
- shardRegistry,
- thisHost) {}
-
-ForwardingCatalogManager::ForwardingCatalogManager(ServiceContext* service,
- std::unique_ptr<CatalogManager> actual,
- ShardRegistry* shardRegistry,
- const HostAndPort& thisHost)
- : _service(service),
- _shardRegistry(shardRegistry),
- _thisHost(thisHost),
- _operationLock("CatalogOperationLock"),
- _actual(std::move(actual)) {}
-
-ForwardingCatalogManager::~ForwardingCatalogManager() = default;
-
-
-StatusWith<ForwardingCatalogManager::ScopedDistLock> ForwardingCatalogManager::distLock(
- OperationContext* txn,
- StringData name,
- StringData whyMessage,
- stdx::chrono::milliseconds waitFor) {
- for (int i = 0; i < 2; ++i) {
- try {
- _operationLock.lock_shared();
- auto guard = MakeGuard([this] { _operationLock.unlock_shared(); });
- auto dlmLock = _actual->getDistLockManager()->lock(txn, name, whyMessage, waitFor);
- if (dlmLock.isOK()) {
- guard.Dismiss(); // Don't unlock _operationLock; hold it until the returned
- // ScopedDistLock goes out of scope!
- try {
- return ScopedDistLock(txn, this, std::move(dlmLock.getValue()));
- } catch (...) {
- // Once the guard that unlocks _operationLock is dismissed, any exception before
- // this method returns is fatal.
- std::terminate();
- }
- } else if (dlmLock.getStatus() != ErrorCodes::IncompatibleCatalogManager) {
- return dlmLock.getStatus();
- }
- } catch (const DBException& ex) {
- if (ex.getCode() != ErrorCodes::IncompatibleCatalogManager) {
- throw;
- }
- }
-
- waitForCatalogManagerChange(txn);
- }
- MONGO_UNREACHABLE;
-}
-
-namespace {
-const auto scopedDistLockHeld = OperationContext::declareDecoration<bool>();
-}
-
-CatalogManager* ForwardingCatalogManager::getCatalogManagerToUse(OperationContext* txn) {
- if (scopedDistLockHeld(txn)) {
- return _actual.get();
- } else {
- return this;
- }
-}
-
-ForwardingCatalogManager::ScopedDistLock::ScopedDistLock(OperationContext* txn,
- ForwardingCatalogManager* fcm,
- DistLockManager::ScopedDistLock theLock)
- : _txn(txn), _fcm(fcm), _lock(std::move(theLock)) {
- scopedDistLockHeld(txn) = true;
-}
-
-ForwardingCatalogManager::ScopedDistLock::ScopedDistLock(ScopedDistLock&& other)
- : _txn(other._txn), _fcm(other._fcm), _lock(std::move(other._lock)) {
- other._txn = nullptr;
- other._fcm = nullptr;
-}
-
-ForwardingCatalogManager::ScopedDistLock::~ScopedDistLock() {
- if (_fcm) { // This ScopedDistLock was not moved from
- auto guard = MakeGuard([this] { _fcm->_operationLock.unlock_shared(); });
- DistLockManager::ScopedDistLock dlmLock = std::move(_lock);
- scopedDistLockHeld(_txn) = false;
- }
-}
-
-ForwardingCatalogManager::ScopedDistLock& ForwardingCatalogManager::ScopedDistLock::operator=(
- ScopedDistLock&& other) {
-#if defined(_MSC_VER) && _MSC_VER < 1900 // MSVC 2013 STL can emit self-move-assign.
- if (&other == this)
- return *this;
-#endif
-
- invariant(!_fcm);
- _fcm = other._fcm;
- other._fcm = nullptr;
- _lock = std::move(other._lock);
- return *this;
-}
-
-Status ForwardingCatalogManager::ScopedDistLock::checkForPendingCatalogChange() {
- return _fcm->checkForPendingCatalogChange();
-}
-
-Status ForwardingCatalogManager::ScopedDistLock::checkStatus() {
- Status status = checkForPendingCatalogChange();
- if (!status.isOK()) {
- return status;
- }
-
- return _lock.checkStatus();
-}
-
-Status ForwardingCatalogManager::scheduleReplaceCatalogManagerIfNeeded(
- ConfigServerMode desiredMode, StringData replSetName, const HostAndPort& knownServer) {
- stdx::lock_guard<stdx::mutex> lk(_observerMutex);
- const auto currentMode = _actual->getMode();
- if (currentMode == desiredMode) {
- return Status::OK();
- }
- if (desiredMode == ConfigServerMode::SCCC) {
- // TODO(spencer): Support downgrade.
- return {ErrorCodes::IllegalOperation,
- "Config server reports that it legacy SCCC mode, but we are already using "
- "the replica set config server protocol for config server "
- "communication. Downgrade needed but not yet supported"};
- }
- invariant(desiredMode == ConfigServerMode::CSRS);
- if (_nextConfigChangeComplete.isValid()) {
- if (_nextConfigConnectionString.getSetName() != replSetName) {
- severe() << "Conflicting new config server replica set names: "
- << _nextConfigConnectionString.getSetName() << " vs " << replSetName;
- fassertFailed(28788);
- }
- } else {
- _nextConfigConnectionString = ConnectionString::forReplicaSet(replSetName, {knownServer});
- _nextConfigChangeComplete =
- fassertStatusOK(28789, _shardRegistry->getExecutor()->makeEvent());
- fassertStatusOK(
- 28787,
- _shardRegistry->getExecutor()->scheduleWork(
- [this](const TaskExecutor::CallbackArgs& args) { _replaceCatalogManager(args); }));
- }
- return {ErrorCodes::IncompatibleCatalogManager,
- "Need to swap sharding catalog manager. Config server "
- "reports that it is in replica set mode, but we are still using the "
- "legacy SCCC protocol for config server communication"};
-}
-
-void ForwardingCatalogManager::waitForCatalogManagerChange(OperationContext* txn) {
- fassert(28802, !scopedDistLockHeld(txn));
-
- stdx::unique_lock<stdx::mutex> oblk(_observerMutex);
- invariant(_nextConfigChangeComplete.isValid());
- auto configChangeComplete = _nextConfigChangeComplete;
- oblk.unlock();
- _shardRegistry->getExecutor()->waitForEvent(configChangeComplete);
-}
-
-Status ForwardingCatalogManager::checkForPendingCatalogChange() {
- stdx::lock_guard<stdx::mutex> lk(_observerMutex);
- if (!_nextConfigChangeComplete.isValid() || _configChangeComplete) {
- return Status::OK();
- }
- return Status(ErrorCodes::IncompatibleCatalogManager,
- "Need to swap sharding catalog manager. Config server "
- "reports that it is in replica set mode, but we are still using the "
- "legacy SCCC protocol for config server communication");
-}
-
-namespace {
-
-template <typename T>
-struct CheckForIncompatibleCatalogManager {
- T operator()(T&& v) {
- return std::forward<T>(v);
- }
-};
-
-template <typename T>
-struct CheckForIncompatibleCatalogManager<StatusWith<T>> {
- StatusWith<T> operator()(StatusWith<T>&& v) {
- if (!v.isOK() && v.getStatus().code() == ErrorCodes::IncompatibleCatalogManager) {
- uassertStatusOK(v);
- }
- return std::forward<StatusWith<T>>(v);
- }
-};
-
-template <>
-struct CheckForIncompatibleCatalogManager<Status> {
- Status operator()(Status&& v) {
- if (v.code() == ErrorCodes::IncompatibleCatalogManager) {
- uassertStatusOK(v);
- }
- return std::move(v);
- }
-};
-
-template <typename T>
-T checkForIncompatibleCatalogManager(T&& v) {
- return CheckForIncompatibleCatalogManager<T>()(std::forward<T>(v));
-}
-
-} // namespace
-
-template <typename Callable>
-auto ForwardingCatalogManager::retry(OperationContext* txn, Callable&& c)
- -> decltype(std::forward<Callable>(c)()) {
- for (int i = 0; i < 2; ++i) {
- try {
- rwlock_shared oplk(_operationLock);
- return checkForIncompatibleCatalogManager(std::forward<Callable>(c)());
- } catch (const DBException& ex) {
- if (ex.getCode() != ErrorCodes::IncompatibleCatalogManager) {
- throw;
- }
- }
-
- waitForCatalogManagerChange(txn);
- }
- MONGO_UNREACHABLE;
-}
-
-void ForwardingCatalogManager::_unlockOldDistLocks(std::string processID) {
- Client::initThreadIfNotAlready("DistributedLockUnlocker");
- auto txn = cc().makeOperationContext();
- log() << "Unlocking existing distributed locks after catalog manager swap";
- rwlock_shared oplk(_operationLock);
- _actual->getDistLockManager()->unlockAll(txn.get(), processID);
-}
-
-void ForwardingCatalogManager::_replaceCatalogManager(const TaskExecutor::CallbackArgs& args) {
- if (!args.status.isOK()) {
- return;
- }
- Client::initThreadIfNotAlready("CatalogManagerReplacer");
- auto txn = cc().makeOperationContext();
- log() << "Swapping sharding Catalog Manager from mirrored (SCCC) to replica set (CSRS) mode";
-
- stdx::lock_guard<RWLock> oplk(_operationLock);
- std::string distLockProcessID = _actual->getDistLockManager()->getProcessID();
- // Shut down the old catalog manager before taking _observerMutex to prevent deadlock with
- // the LegacyDistLockPinger thread calling scheduleReplaceCatalogManagerIfNeeded.
- _actual->shutDown(txn.get(), /* allowNetworking */ false);
-
- stdx::lock_guard<stdx::mutex> oblk(_observerMutex);
- _actual = makeCatalogManager(_service, _nextConfigConnectionString, _shardRegistry, _thisHost);
- _shardRegistry->updateConfigServerConnectionString(_nextConfigConnectionString);
- // Note: this assumes that downgrade is not supported, as this will not start the config
- // server consistency checker for the legacy catalog manager.
- fassert(28790, _actual->startup(txn.get(), false /* allowNetworking */));
- args.executor->signalEvent(_nextConfigChangeComplete);
- _configChangeComplete = true;
-
- log() << "Swapping sharding Catalog Manager to replica set (CSRS) mode completed successfully";
-
- // Must be done in a new thread to avoid deadlock resulting from running network operations
- // within a TaskExecutor callback.
- stdx::thread distLockUnlockThread(
- [this, distLockProcessID] { _unlockOldDistLocks(distLockProcessID); });
- distLockUnlockThread.detach();
-}
-
-CatalogManager::ConfigServerMode ForwardingCatalogManager::getMode() {
- stdx::lock_guard<stdx::mutex> lk(_observerMutex);
- return _actual->getMode();
-}
-
-Status ForwardingCatalogManager::startup(OperationContext* txn, bool allowNetworking) {
- return retry(txn,
- [this, txn, allowNetworking] { return _actual->startup(txn, allowNetworking); });
-}
-
-void ForwardingCatalogManager::shutDown(OperationContext* txn, bool allowNetworking) {
- retry(txn,
- [this, txn, allowNetworking] {
- _actual->shutDown(txn, allowNetworking);
- return 1;
- });
-}
-
-Status ForwardingCatalogManager::enableSharding(OperationContext* txn, const std::string& dbName) {
- return retry(txn, [&] { return _actual->enableSharding(txn, dbName); });
-}
-
-Status ForwardingCatalogManager::shardCollection(OperationContext* txn,
- const std::string& ns,
- const ShardKeyPattern& fieldsAndOrder,
- bool unique,
- const std::vector<BSONObj>& initPoints,
- const std::set<ShardId>& initShardsIds) {
- return retry(txn,
- [&] {
- return _actual->shardCollection(
- txn, ns, fieldsAndOrder, unique, initPoints, initShardsIds);
- });
-}
-
-StatusWith<std::string> ForwardingCatalogManager::addShard(
- OperationContext* txn,
- const std::string* shardProposedName,
- const ConnectionString& shardConnectionString,
- const long long maxSize) {
- return retry(
- txn,
- [&] { return _actual->addShard(txn, shardProposedName, shardConnectionString, maxSize); });
-}
-
-StatusWith<ShardDrainingStatus> ForwardingCatalogManager::removeShard(OperationContext* txn,
- const std::string& name) {
- return retry(txn, [&] { return _actual->removeShard(txn, name); });
-}
-
-Status ForwardingCatalogManager::updateDatabase(OperationContext* txn,
- const std::string& dbName,
- const DatabaseType& db) {
- return retry(txn, [&] { return _actual->updateDatabase(txn, dbName, db); });
-}
-
-StatusWith<OpTimePair<DatabaseType>> ForwardingCatalogManager::getDatabase(
- OperationContext* txn, const std::string& dbName) {
- return retry(txn, [&] { return _actual->getDatabase(txn, dbName); });
-}
-
-Status ForwardingCatalogManager::updateCollection(OperationContext* txn,
- const std::string& collNs,
- const CollectionType& coll) {
- return retry(txn, [&] { return _actual->updateCollection(txn, collNs, coll); });
-}
-
-StatusWith<OpTimePair<CollectionType>> ForwardingCatalogManager::getCollection(
- OperationContext* txn, const std::string& collNs) {
- return retry(txn, [&] { return _actual->getCollection(txn, collNs); });
-}
-
-Status ForwardingCatalogManager::getCollections(OperationContext* txn,
- const std::string* dbName,
- std::vector<CollectionType>* collections,
- repl::OpTime* opTime) {
- invariant(collections->empty());
- return retry(txn,
- [&] {
- collections->clear();
- return _actual->getCollections(txn, dbName, collections, opTime);
- });
-}
-
-Status ForwardingCatalogManager::dropCollection(OperationContext* txn, const NamespaceString& ns) {
- return retry(txn, [&] { return _actual->dropCollection(txn, ns); });
-}
-
-Status ForwardingCatalogManager::getDatabasesForShard(OperationContext* txn,
- const std::string& shardName,
- std::vector<std::string>* dbs) {
- invariant(dbs->empty());
- return retry(txn,
- [&] {
- dbs->clear();
- return _actual->getDatabasesForShard(txn, shardName, dbs);
- });
-}
-
-Status ForwardingCatalogManager::getChunks(OperationContext* txn,
- const BSONObj& query,
- const BSONObj& sort,
- boost::optional<int> limit,
- std::vector<ChunkType>* chunks,
- repl::OpTime* opTime) {
- invariant(chunks->empty());
- return retry(txn,
- [&] {
- chunks->clear();
- return _actual->getChunks(txn, query, sort, limit, chunks, opTime);
- });
-}
-
-Status ForwardingCatalogManager::getTagsForCollection(OperationContext* txn,
- const std::string& collectionNs,
- std::vector<TagsType>* tags) {
- invariant(tags->empty());
- return retry(txn,
- [&] {
- tags->clear();
- return _actual->getTagsForCollection(txn, collectionNs, tags);
- });
-}
-
-StatusWith<std::string> ForwardingCatalogManager::getTagForChunk(OperationContext* txn,
- const std::string& collectionNs,
- const ChunkType& chunk) {
- return retry(txn, [&] { return _actual->getTagForChunk(txn, collectionNs, chunk); });
-}
-
-StatusWith<OpTimePair<std::vector<ShardType>>> ForwardingCatalogManager::getAllShards(
- OperationContext* txn) {
- return retry(txn, [&] { return _actual->getAllShards(txn); });
-}
-
-bool ForwardingCatalogManager::runUserManagementWriteCommand(OperationContext* txn,
- const std::string& commandName,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) {
- return retry(txn,
- [&] {
- BSONObjBuilder builder;
- const bool success = _actual->runUserManagementWriteCommand(
- txn, commandName, dbname, cmdObj, &builder);
- result->appendElements(builder.done());
- return success;
- });
-}
-
-bool ForwardingCatalogManager::runUserManagementReadCommand(OperationContext* txn,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) {
- return retry(txn,
- [&] {
- BSONObjBuilder builder;
- const bool success =
- _actual->runUserManagementReadCommand(txn, dbname, cmdObj, &builder);
- result->appendElements(builder.done());
- return success;
- });
-}
-
-Status ForwardingCatalogManager::applyChunkOpsDeprecated(OperationContext* txn,
- const BSONArray& updateOps,
- const BSONArray& preCondition,
- const std::string& nss,
- const ChunkVersion& lastChunkVersion) {
- return retry(txn,
- [&] {
- return _actual->applyChunkOpsDeprecated(
- txn, updateOps, preCondition, nss, lastChunkVersion);
- });
-}
-
-Status ForwardingCatalogManager::logAction(OperationContext* txn,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) {
- return retry(txn, [&] { return _actual->logAction(txn, what, ns, detail); });
-}
-
-Status ForwardingCatalogManager::logChange(OperationContext* txn,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) {
- return retry(txn, [&] { return _actual->logChange(txn, what, ns, detail); });
-}
-
-StatusWith<SettingsType> ForwardingCatalogManager::getGlobalSettings(OperationContext* txn,
- const std::string& key) {
- return retry(txn, [&] { return _actual->getGlobalSettings(txn, key); });
-}
-
-void ForwardingCatalogManager::writeConfigServerDirect(OperationContext* txn,
- const BatchedCommandRequest& request,
- BatchedCommandResponse* response) {
- retry(txn,
- [&] {
- BatchedCommandResponse theResponse;
- _actual->writeConfigServerDirect(txn, request, &theResponse);
- theResponse.cloneTo(response);
- return 1;
- });
-}
-
-Status ForwardingCatalogManager::insertConfigDocument(OperationContext* txn,
- const std::string& ns,
- const BSONObj& doc) {
- return retry(txn, [&] { return _actual->insertConfigDocument(txn, ns, doc); });
-}
-
-StatusWith<bool> ForwardingCatalogManager::updateConfigDocument(OperationContext* txn,
- const std::string& ns,
- const BSONObj& query,
- const BSONObj& update,
- bool upsert) {
- return retry(txn,
- [&] { return _actual->updateConfigDocument(txn, ns, query, update, upsert); });
-}
-
-Status ForwardingCatalogManager::removeConfigDocuments(OperationContext* txn,
- const std::string& ns,
- const BSONObj& query) {
- return retry(txn, [&] { return _actual->removeConfigDocuments(txn, ns, query); });
-}
-
-Status ForwardingCatalogManager::createDatabase(OperationContext* txn, const std::string& dbName) {
- return retry(txn, [&] { return _actual->createDatabase(txn, dbName); });
-}
-
-DistLockManager* ForwardingCatalogManager::getDistLockManager() {
- warning() << "getDistLockManager called on ForwardingCatalogManager which should never happen "
- "outside of unit tests!";
- rwlock_shared oplk(_operationLock);
- return _actual->getDistLockManager();
-}
-
-Status ForwardingCatalogManager::initConfigVersion(OperationContext* txn) {
- return retry(txn, [&] { return _actual->initConfigVersion(txn); });
-}
-
-Status ForwardingCatalogManager::appendInfoForConfigServerDatabases(OperationContext* txn,
- BSONArrayBuilder* builder) {
- return retry(txn, [&] { return _actual->appendInfoForConfigServerDatabases(txn, builder); });
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.h b/src/mongo/s/catalog/forwarding_catalog_manager.h
deleted file mode 100644
index 15227b16c29..00000000000
--- a/src/mongo/s/catalog/forwarding_catalog_manager.h
+++ /dev/null
@@ -1,298 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/client/connection_string.h"
-#include "mongo/db/server_options.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/s/catalog/catalog_manager.h"
-#include "mongo/s/catalog/dist_lock_manager.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/util/concurrency/rwlock.h"
-#include "mongo/util/net/hostandport.h"
-
-namespace mongo {
-
-class NamespaceString;
-class ServiceContext;
-class ShardRegistry;
-class VersionType;
-struct ReadPreferenceSetting;
-
-/**
- * The ForwardingCatalogManager is an indirection layer that allows for dynamic switching of
- * catalog manager implementations at runtime, to facilitate upgrade.
- * Inheriting privately from CatalogManager is intentional. All callers of CatalogManager methods
- * on a ForwardingCatalogManager will get access to the FCM pointer by calling
- * FCM::getCatalogManagerToUse, which can return a CatalogManager* because it is a member of FCM
- * and thus knows that FCM inherits from CatalogManager. This makes it obvious if we try to call
- * CatalogManager methods directly on a ForwardingCatalogManager pointer.
- */
-class ForwardingCatalogManager final : private CatalogManager {
-public:
- class ScopedDistLock;
-
- ForwardingCatalogManager(ServiceContext* service,
- const ConnectionString& configCS,
- ShardRegistry* shardRegistry,
- const HostAndPort& thisHost);
-
- /**
- * Constructor for use in tests.
- */
- ForwardingCatalogManager(ServiceContext* service,
- std::unique_ptr<CatalogManager> actual,
- ShardRegistry* shardRegistry,
- const HostAndPort& thisHost);
-
- virtual ~ForwardingCatalogManager();
-
- // Only public because of unit tests
- DistLockManager* getDistLockManager() override;
-
- ConfigServerMode getMode() override;
-
- /**
- * If desiredMode doesn't equal _actual->getMode(), schedules work to swap the actual catalog
- * manager to one of the type specified by desiredMode.
- * Currently only supports going to CSRS mode from SCCC mode.
- */
- Status scheduleReplaceCatalogManagerIfNeeded(ConfigServerMode desiredMode,
- StringData replSetName,
- const HostAndPort& knownServer);
-
- /**
- * Blocking method, which will waits for a previously scheduled catalog manager change to
- * complete. It is illegal to call unless scheduleReplaceCatalogManagerIfNeeded has been called.
- */
- void waitForCatalogManagerChange(OperationContext* txn);
-
- /**
- * Checks to see if we are currently waiting to swap the catalog manager.
- */
- Status checkForPendingCatalogChange();
-
- /**
- * Returns a ScopedDistLock which is the RAII type for holding a distributed lock.
- * ScopedDistLock prevents the underlying CatalogManager from being swapped as long as it is
- * in scope.
- */
- StatusWith<ScopedDistLock> distLock(
- OperationContext* txn,
- StringData name,
- StringData whyMessage,
- stdx::chrono::milliseconds waitFor = DistLockManager::kSingleLockAttemptTimeout);
-
- /**
- * Returns a pointer to the CatalogManager that should be used for general CatalogManager
- * operation. Most of the time it will return 'this' - the ForwardingCatalogManager. If there
- * is a distributed lock held as part of this operation, however, it will return the underlying
- * CatalogManager to prevent deadlock from occurring by trying to swap the catalog manager while
- * a distlock is held.
- */
- CatalogManager* getCatalogManagerToUse(OperationContext* txn);
-
- Status appendInfoForConfigServerDatabases(OperationContext* txn,
- BSONArrayBuilder* builder) override;
-
-private:
- Status startup(OperationContext* txn, bool allowNetworking) override;
-
- void shutDown(OperationContext* txn, bool allowNetworking = true) override;
-
- Status enableSharding(OperationContext* txn, const std::string& dbName) override;
-
- Status shardCollection(OperationContext* txn,
- const std::string& ns,
- const ShardKeyPattern& fieldsAndOrder,
- bool unique,
- const std::vector<BSONObj>& initPoints,
- const std::set<ShardId>& initShardsIds) override;
-
- StatusWith<std::string> addShard(OperationContext* txn,
- const std::string* shardProposedName,
- const ConnectionString& shardConnectionString,
- const long long maxSize) override;
-
- StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn,
- const std::string& name) override;
-
- Status updateDatabase(OperationContext* txn,
- const std::string& dbName,
- const DatabaseType& db) override;
-
- StatusWith<OpTimePair<DatabaseType>> getDatabase(OperationContext* txn,
- const std::string& dbName) override;
-
- Status updateCollection(OperationContext* txn,
- const std::string& collNs,
- const CollectionType& coll) override;
-
- StatusWith<OpTimePair<CollectionType>> getCollection(OperationContext* txn,
- const std::string& collNs) override;
-
- Status getCollections(OperationContext* txn,
- const std::string* dbName,
- std::vector<CollectionType>* collections,
- repl::OpTime* opTime) override;
-
- Status dropCollection(OperationContext* txn, const NamespaceString& ns) override;
-
- Status getDatabasesForShard(OperationContext* txn,
- const std::string& shardName,
- std::vector<std::string>* dbs) override;
-
- Status getChunks(OperationContext* txn,
- const BSONObj& query,
- const BSONObj& sort,
- boost::optional<int> limit,
- std::vector<ChunkType>* chunks,
- repl::OpTime* opTime) override;
-
- Status getTagsForCollection(OperationContext* txn,
- const std::string& collectionNs,
- std::vector<TagsType>* tags) override;
-
- StatusWith<std::string> getTagForChunk(OperationContext* txn,
- const std::string& collectionNs,
- const ChunkType& chunk) override;
-
- StatusWith<OpTimePair<std::vector<ShardType>>> getAllShards(OperationContext* txn) override;
-
- bool runUserManagementWriteCommand(OperationContext* txn,
- const std::string& commandName,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) override;
-
- bool runUserManagementReadCommand(OperationContext* txn,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) override;
-
- Status applyChunkOpsDeprecated(OperationContext* txn,
- const BSONArray& updateOps,
- const BSONArray& preCondition,
- const std::string& nss,
- const ChunkVersion& lastChunkVersion) override;
-
- Status logAction(OperationContext* txn,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) override;
-
- Status logChange(OperationContext* txn,
- const std::string& what,
- const std::string& ns,
- const BSONObj& detail) override;
-
- StatusWith<SettingsType> getGlobalSettings(OperationContext* txn,
- const std::string& key) override;
-
- void writeConfigServerDirect(OperationContext* txn,
- const BatchedCommandRequest& request,
- BatchedCommandResponse* response) override;
-
- Status insertConfigDocument(OperationContext* txn,
- const std::string& ns,
- const BSONObj& doc) override;
-
- StatusWith<bool> updateConfigDocument(OperationContext* txn,
- const std::string& ns,
- const BSONObj& query,
- const BSONObj& update,
- bool upsert) override;
-
- Status removeConfigDocuments(OperationContext* txn,
- const std::string& ns,
- const BSONObj& query) override;
-
- Status createDatabase(OperationContext* txn, const std::string& dbName) override;
-
- Status initConfigVersion(OperationContext* txn) override;
-
- template <typename Callable>
- auto retry(OperationContext* txn, Callable&& c) -> decltype(std::forward<Callable>(c)());
-
- void _replaceCatalogManager(const executor::TaskExecutor::CallbackArgs& args);
-
- void _unlockOldDistLocks(std::string processID);
-
- ServiceContext* _service;
- ShardRegistry* _shardRegistry;
- HostAndPort _thisHost;
-
- RWLock _operationLock;
- stdx::mutex _observerMutex; // If going to hold both _operationLock and _observerMutex, get
- // _operationLock first.
-
- // The actual catalog manager implementation.
- //
- // Must hold _operationLock or _observerMutex in any mode to read. Must hold both in exclusive
- // mode to write.
- std::unique_ptr<CatalogManager> _actual;
-
- ConnectionString _nextConfigConnectionString; // Guarded by _observerMutex.
- executor::TaskExecutor::EventHandle _nextConfigChangeComplete; // Guarded by _observerMutex.
- bool _configChangeComplete{false};
-};
-
-class ForwardingCatalogManager::ScopedDistLock {
- MONGO_DISALLOW_COPYING(ScopedDistLock);
-
-public:
- ScopedDistLock(OperationContext* txn,
- ForwardingCatalogManager* fcm,
- DistLockManager::ScopedDistLock theLock);
- ScopedDistLock(ScopedDistLock&& other);
- ~ScopedDistLock();
-
- ScopedDistLock& operator=(ScopedDistLock&& other);
-
- /**
- * Checks to see if we are currently waiting to swap the catalog manager. If so, holding on to
- * this ScopedDistLock will block the swap from happening, so it is important that if this
- * returns a non-OK status the caller must release the lock (most likely by failing the current
- * operation).
- */
- Status checkForPendingCatalogChange();
-
- /**
- * Queries the config server to make sure the lock is still present, as well as checking
- * if we need to swap the catalog manager
- */
- Status checkStatus();
-
-private:
- OperationContext* _txn;
- ForwardingCatalogManager* _fcm;
- DistLockManager::ScopedDistLock _lock;
-};
-
-} // namespace mongo
diff --git a/src/mongo/s/catalog/legacy/SConscript b/src/mongo/s/catalog/legacy/SConscript
deleted file mode 100644
index dfca5d3b84f..00000000000
--- a/src/mongo/s/catalog/legacy/SConscript
+++ /dev/null
@@ -1,26 +0,0 @@
-# -*- mode: python -*-
-
-Import("env")
-
-# TODO: config upgrade tests are currently in dbtests
-env.Library(
- target='catalog_manager_legacy',
- source=[
- 'catalog_manager_legacy.cpp',
- 'cluster_client_internal.cpp',
- 'config_coordinator.cpp',
- 'config_upgrade.cpp',
- 'distlock.cpp',
- 'legacy_dist_lock_manager.cpp',
- 'legacy_dist_lock_pinger.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/executor/network_interface',
- '$BUILD_DIR/mongo/s/catalog/catalog_manager',
- '$BUILD_DIR/mongo/s/catalog/dist_lock_manager',
- ],
- LIBDEPS_TAGS=[
- # Depends on inShutdown
- 'incomplete',
- ],
-)
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
deleted file mode 100644
index 0b9b8c034d5..00000000000
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
+++ /dev/null
@@ -1,1425 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/catalog/legacy/catalog_manager_legacy.h"
-
-#include <pcrecpp.h>
-
-#include "mongo/base/status.h"
-#include "mongo/base/status_with.h"
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/bson/util/bson_extract.h"
-#include "mongo/client/connpool.h"
-#include "mongo/db/audit.h"
-#include "mongo/db/client.h"
-#include "mongo/db/commands.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/optime.h"
-#include "mongo/db/server_options.h"
-#include "mongo/executor/network_interface.h"
-#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/catalog/config_server_version.h"
-#include "mongo/s/catalog/legacy/cluster_client_internal.h"
-#include "mongo/s/catalog/legacy/config_coordinator.h"
-#include "mongo/s/catalog/legacy/config_upgrade.h"
-#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/catalog/type_collection.h"
-#include "mongo/s/catalog/type_config_version.h"
-#include "mongo/s/catalog/type_database.h"
-#include "mongo/s/catalog/type_settings.h"
-#include "mongo/s/catalog/type_shard.h"
-#include "mongo/s/catalog/type_tags.h"
-#include "mongo/s/chunk_manager.h"
-#include "mongo/s/client/dbclient_multi_command.h"
-#include "mongo/s/client/shard.h"
-#include "mongo/s/client/shard_connection.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/config.h"
-#include "mongo/s/catalog/dist_lock_manager.h"
-#include "mongo/s/catalog/legacy/legacy_dist_lock_manager.h"
-#include "mongo/s/catalog/type_config_version.h"
-#include "mongo/s/grid.h"
-#include "mongo/s/set_shard_version_request.h"
-#include "mongo/s/shard_key_pattern.h"
-#include "mongo/s/write_ops/batched_command_request.h"
-#include "mongo/s/write_ops/batched_command_response.h"
-#include "mongo/util/fail_point_service.h"
-#include "mongo/util/mongoutils/str.h"
-#include "mongo/util/net/hostandport.h"
-#include "mongo/util/log.h"
-#include "mongo/util/stringutils.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-
-MONGO_FP_DECLARE(setSCCCDropCollDistLockWait);
-
-using std::set;
-using std::shared_ptr;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-using str::stream;
-
-namespace {
-
-bool validConfigWC(const BSONObj& writeConcern) {
- BSONElement elem(writeConcern["w"]);
- if (elem.eoo()) {
- return true;
- }
-
- if (elem.isNumber() && elem.numberInt() <= 1) {
- return true;
- }
-
- if (elem.type() == String && elem.str() == "majority") {
- return true;
- }
-
- return false;
-}
-
-void toBatchError(const Status& status, BatchedCommandResponse* response) {
- response->clear();
- response->setErrCode(status.code());
- response->setErrMessage(status.reason());
- response->setOk(false);
-
- dassert(response->isValid(NULL));
-}
-
-} // namespace
-
-
-CatalogManagerLegacy::CatalogManagerLegacy() = default;
-
-CatalogManagerLegacy::~CatalogManagerLegacy() = default;
-
-Status CatalogManagerLegacy::init(const ConnectionString& configDBCS,
- const std::string& distLockProcessId) {
- // Initialization should not happen more than once
- invariant(!_configServerConnectionString.isValid());
- invariant(_configServers.empty());
- invariant(configDBCS.isValid());
-
- // Extract the hosts in HOST:PORT format
- set<HostAndPort> configHostsAndPortsSet;
- set<string> configHostsOnly;
- std::vector<HostAndPort> configHostAndPorts = configDBCS.getServers();
- for (size_t i = 0; i < configHostAndPorts.size(); i++) {
- // Append the default port, if not specified
- HostAndPort configHost = configHostAndPorts[i];
- if (!configHost.hasPort()) {
- configHost = HostAndPort(configHost.host(), ServerGlobalParams::ConfigServerPort);
- }
-
- // Make sure there are no duplicates
- if (!configHostsAndPortsSet.insert(configHost).second) {
- StringBuilder sb;
- sb << "Host " << configHost.toString()
- << " exists twice in the config servers listing.";
-
- return Status(ErrorCodes::InvalidOptions, sb.str());
- }
-
- configHostsOnly.insert(configHost.host());
- }
-
- // Make sure the hosts are reachable
- for (set<string>::const_iterator i = configHostsOnly.begin(); i != configHostsOnly.end(); i++) {
- const string host = *i;
-
- // If this is a CUSTOM connection string (for testing) don't do DNS resolution
- if (uassertStatusOK(ConnectionString::parse(host)).type() == ConnectionString::CUSTOM) {
- continue;
- }
-
- bool ok = false;
-
- for (int x = 10; x > 0; x--) {
- if (!hostbyname(host.c_str()).empty()) {
- ok = true;
- break;
- }
-
- log() << "can't resolve DNS for [" << host << "] sleeping and trying " << x
- << " more times";
- sleepsecs(10);
- }
-
- if (!ok) {
- return Status(ErrorCodes::HostNotFound,
- stream() << "unable to resolve DNS for host " << host);
- }
- }
-
- LOG(1) << " config string : " << configDBCS.toString();
-
- // Now that the config hosts are verified, initialize the catalog manager. The code below
- // should never fail.
-
- _configServerConnectionString = configDBCS;
-
- if (_configServerConnectionString.type() == ConnectionString::MASTER) {
- _configServers.push_back(_configServerConnectionString);
- } else if (_configServerConnectionString.type() == ConnectionString::SYNC) {
- const vector<HostAndPort> configHPs = _configServerConnectionString.getServers();
- for (vector<HostAndPort>::const_iterator it = configHPs.begin(); it != configHPs.end();
- ++it) {
- _configServers.push_back(ConnectionString(*it));
- }
- } else {
- // This is only for tests.
- invariant(_configServerConnectionString.type() == ConnectionString::CUSTOM);
- _configServers.push_back(_configServerConnectionString);
- }
-
- _distLockManager =
- stdx::make_unique<LegacyDistLockManager>(_configServerConnectionString, distLockProcessId);
-
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inShutdown = false;
- _consistentFromLastCheck = true;
- }
-
- return Status::OK();
-}
-
-Status CatalogManagerLegacy::startup(OperationContext* txn, bool allowNetworking) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_started) {
- return Status::OK();
- }
-
- if (allowNetworking) {
- Status status = _startConfigServerChecker();
- if (!status.isOK()) {
- return status;
- }
- }
-
- _distLockManager->startUp();
-
- _started = true;
- return Status::OK();
-}
-
-Status CatalogManagerLegacy::initConfigVersion(OperationContext* txn) {
- return checkAndInitConfigVersion(txn, this, getDistLockManager());
-}
-
-Status CatalogManagerLegacy::_startConfigServerChecker() {
- const auto status = _checkConfigServersConsistent();
- if (!status.isOK()) {
- return status;
- }
-
- stdx::thread t(stdx::bind(&CatalogManagerLegacy::_consistencyChecker, this));
- _consistencyCheckerThread.swap(t);
-
- return Status::OK();
-}
-
-void CatalogManagerLegacy::shutDown(OperationContext* txn, bool allowNetworking) {
- LOG(1) << "CatalogManagerLegacy::shutDown() called.";
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- _consistencyCheckerCV.notify_one();
- }
-
- // Only try to join the thread if we actually started it.
- if (_consistencyCheckerThread.joinable())
- _consistencyCheckerThread.join();
-
- invariant(_distLockManager);
- _distLockManager->shutDown(txn, allowNetworking);
-}
-
-Status CatalogManagerLegacy::shardCollection(OperationContext* txn,
- const string& ns,
- const ShardKeyPattern& fieldsAndOrder,
- bool unique,
- const vector<BSONObj>& initPoints,
- const set<ShardId>& initShardIds) {
- // Lock the collection globally so that no other mongos can try to shard or drop the collection
- // at the same time.
- auto scopedDistLock = getDistLockManager()->lock(txn, ns, "shardCollection");
- if (!scopedDistLock.isOK()) {
- return scopedDistLock.getStatus();
- }
-
- auto status = getDatabase(txn, nsToDatabase(ns));
- if (!status.isOK()) {
- return status.getStatus();
- }
-
- ShardId dbPrimaryShardId = status.getValue().value.getPrimary();
- const auto primaryShard = grid.shardRegistry()->getShard(txn, dbPrimaryShardId);
-
- // This is an extra safety check that the collection is not getting sharded concurrently by
- // two different mongos instances. It is not 100%-proof, but it reduces the chance that two
- // invocations of shard collection will step on each other's toes.
- {
- ScopedDbConnection conn(_configServerConnectionString, 30);
- unsigned long long existingChunks =
- conn->count(ChunkType::ConfigNS, BSON(ChunkType::ns(ns)));
- if (existingChunks > 0) {
- conn.done();
- return Status(ErrorCodes::AlreadyInitialized,
- str::stream() << "collection " << ns << " already sharded with "
- << existingChunks << " chunks.");
- }
-
- conn.done();
- }
-
- log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder;
-
- // Record start in changelog
- {
- BSONObjBuilder collectionDetail;
- collectionDetail.append("shardKey", fieldsAndOrder.toBSON());
- collectionDetail.append("collection", ns);
- collectionDetail.append("primary", primaryShard->toString());
-
- {
- BSONArrayBuilder initialShards(collectionDetail.subarrayStart("initShards"));
- for (const ShardId& shardId : initShardIds) {
- initialShards.append(shardId);
- }
- }
-
- collectionDetail.append("numChunks", static_cast<int>(initPoints.size() + 1));
-
- logChange(txn, "shardCollection.start", ns, collectionDetail.obj());
- }
-
- shared_ptr<ChunkManager> manager(new ChunkManager(ns, fieldsAndOrder, unique));
- Status createFirstChunksStatus =
- manager->createFirstChunks(txn, dbPrimaryShardId, &initPoints, &initShardIds);
- if (!createFirstChunksStatus.isOK()) {
- return createFirstChunksStatus;
- }
- manager->loadExistingRanges(txn, nullptr);
-
- CollectionInfo collInfo;
- collInfo.useChunkManager(manager);
- collInfo.save(txn, ns);
-
- // Tell the primary mongod to refresh its data
- // TODO: Think the real fix here is for mongos to just
- // assume that all collections are sharded, when we get there
- SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist(
- grid.shardRegistry()->getConfigServerConnectionString(),
- dbPrimaryShardId,
- primaryShard->getConnString(),
- NamespaceString(ns),
- manager->getVersion(),
- true);
-
- auto ssvStatus = grid.shardRegistry()->runCommandWithNotMasterRetries(
- txn, dbPrimaryShardId, "admin", ssv.toBSON());
- if (!ssvStatus.isOK()) {
- warning() << "could not update initial version of " << ns << " on shard primary "
- << dbPrimaryShardId << ssvStatus.getStatus();
- }
-
- logChange(txn, "shardCollection.end", ns, BSON("version" << manager->getVersion().toString()));
-
- return Status::OK();
-}
-
-StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationContext* txn,
- const std::string& name) {
- ScopedDbConnection conn(_configServerConnectionString, 30);
-
- if (conn->count(ShardType::ConfigNS,
- BSON(ShardType::name() << NE << name << ShardType::draining(true)))) {
- conn.done();
- return Status(ErrorCodes::ConflictingOperationInProgress,
- "Can't have more than one draining shard at a time");
- }
-
- if (conn->count(ShardType::ConfigNS, BSON(ShardType::name() << NE << name)) == 0) {
- conn.done();
- return Status(ErrorCodes::IllegalOperation, "Can't remove last shard");
- }
-
- // Case 1: start draining chunks
- BSONObj shardDoc = conn->findOne(ShardType::ConfigNS,
- BSON(ShardType::name() << name << ShardType::draining(true)));
- if (shardDoc.isEmpty()) {
- log() << "going to start draining shard: " << name;
-
- auto updateStatus = updateConfigDocument(txn,
- ShardType::ConfigNS,
- BSON(ShardType::name() << name),
- BSON("$set" << BSON(ShardType::draining(true))),
- false);
- if (!updateStatus.isOK()) {
- log() << "error starting removeShard: " << name << causedBy(updateStatus.getStatus());
- return updateStatus.getStatus();
- }
-
- grid.shardRegistry()->reload(txn);
- conn.done();
-
- // Record start in changelog
- logChange(txn, "removeShard.start", "", BSON("shard" << name));
- return ShardDrainingStatus::STARTED;
- }
-
- // Case 2: all chunks drained
- BSONObj shardIDDoc = BSON(ChunkType::shard(shardDoc[ShardType::name()].str()));
- long long chunkCount = conn->count(ChunkType::ConfigNS, shardIDDoc);
- long long dbCount =
- conn->count(DatabaseType::ConfigNS,
- BSON(DatabaseType::name.ne("local") << DatabaseType::primary(name)));
- if (chunkCount == 0 && dbCount == 0) {
- log() << "going to remove shard: " << name;
- audit::logRemoveShard(txn->getClient(), name);
-
- Status status =
- removeConfigDocuments(txn, ShardType::ConfigNS, BSON(ShardType::name() << name));
- if (!status.isOK()) {
- log() << "Error concluding removeShard operation on: " << name
- << "; err: " << status.reason();
- return status;
- }
-
- grid.shardRegistry()->remove(name);
- grid.shardRegistry()->reload(txn);
- conn.done();
-
- // Record finish in changelog
- logChange(txn, "removeShard", "", BSON("shard" << name));
- return ShardDrainingStatus::COMPLETED;
- }
-
- // case 3: draining ongoing
- return ShardDrainingStatus::ONGOING;
-}
-
-StatusWith<OpTimePair<DatabaseType>> CatalogManagerLegacy::getDatabase(OperationContext* txn,
- const std::string& dbName) {
- if (!NamespaceString::validDBName(dbName)) {
- return {ErrorCodes::InvalidNamespace, stream() << dbName << " is not a valid db name"};
- }
-
- // The two databases that are hosted on the config server are config and admin
- if (dbName == "config" || dbName == "admin") {
- DatabaseType dbt;
- dbt.setName(dbName);
- dbt.setSharded(false);
- dbt.setPrimary("config");
-
- return OpTimePair<DatabaseType>(dbt);
- }
-
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
-
- BSONObj dbObj = conn->findOne(DatabaseType::ConfigNS, BSON(DatabaseType::name(dbName)));
- if (dbObj.isEmpty()) {
- conn.done();
- return {ErrorCodes::NamespaceNotFound, stream() << "database " << dbName << " not found"};
- }
-
- conn.done();
-
- auto parseStatus = DatabaseType::fromBSON(dbObj);
-
- if (!parseStatus.isOK()) {
- return parseStatus.getStatus();
- }
-
- return OpTimePair<DatabaseType>(parseStatus.getValue());
-}
-
-StatusWith<OpTimePair<CollectionType>> CatalogManagerLegacy::getCollection(
- OperationContext* txn, const std::string& collNs) {
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
-
- BSONObj collObj = conn->findOne(CollectionType::ConfigNS, BSON(CollectionType::fullNs(collNs)));
- if (collObj.isEmpty()) {
- conn.done();
- return Status(ErrorCodes::NamespaceNotFound,
- stream() << "collection " << collNs << " not found");
- }
-
- conn.done();
-
- auto parseStatus = CollectionType::fromBSON(collObj);
-
- if (!parseStatus.isOK()) {
- return parseStatus.getStatus();
- }
-
- return OpTimePair<CollectionType>(parseStatus.getValue());
-}
-
-Status CatalogManagerLegacy::getCollections(OperationContext* txn,
- const std::string* dbName,
- std::vector<CollectionType>* collections,
- repl::OpTime* optime) {
- BSONObjBuilder b;
- if (dbName) {
- invariant(!dbName->empty());
- b.appendRegex(CollectionType::fullNs(),
- (string) "^" + pcrecpp::RE::QuoteMeta(*dbName) + "\\.");
- }
-
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
-
- std::unique_ptr<DBClientCursor> cursor(
- _safeCursor(conn->query(CollectionType::ConfigNS, b.obj())));
-
- while (cursor->more()) {
- const BSONObj collObj = cursor->next();
-
- const auto collectionResult = CollectionType::fromBSON(collObj);
- if (!collectionResult.isOK()) {
- conn.done();
- collections->clear();
- return Status(ErrorCodes::FailedToParse,
- str::stream() << "error while parsing " << CollectionType::ConfigNS
- << " document: " << collObj << " : "
- << collectionResult.getStatus().toString());
- }
-
- collections->push_back(collectionResult.getValue());
- }
-
- conn.done();
- return Status::OK();
-}
-
-Status CatalogManagerLegacy::dropCollection(OperationContext* txn, const NamespaceString& ns) {
- logChange(txn, "dropCollection.start", ns.ns(), BSONObj());
-
- auto shardsStatus = getAllShards(txn);
- if (!shardsStatus.isOK()) {
- return shardsStatus.getStatus();
- }
- vector<ShardType> allShards = std::move(shardsStatus.getValue().value);
-
- LOG(1) << "dropCollection " << ns << " started";
-
- // Lock the collection globally so that split/migrate cannot run
- stdx::chrono::seconds waitFor(DistLockManager::kDefaultLockTimeout);
- MONGO_FAIL_POINT_BLOCK(setSCCCDropCollDistLockWait, customWait) {
- const BSONObj& data = customWait.getData();
- waitFor = stdx::chrono::seconds(data["waitForSecs"].numberInt());
- }
-
- auto scopedDistLock = getDistLockManager()->lock(txn, ns.ns(), "drop", waitFor);
- if (!scopedDistLock.isOK()) {
- return scopedDistLock.getStatus();
- }
-
- LOG(1) << "dropCollection " << ns << " locked";
-
- std::map<string, BSONObj> errors;
- auto* shardRegistry = grid.shardRegistry();
-
- for (const auto& shardEntry : allShards) {
- auto dropResult = shardRegistry->runCommandWithNotMasterRetries(
- txn, shardEntry.getName(), ns.db().toString(), BSON("drop" << ns.coll()));
-
- if (!dropResult.isOK()) {
- return dropResult.getStatus();
- }
-
- auto dropStatus = getStatusFromCommandResult(dropResult.getValue());
- if (!dropStatus.isOK()) {
- if (dropStatus.code() == ErrorCodes::NamespaceNotFound) {
- continue;
- }
-
- errors.emplace(shardEntry.getHost(), dropResult.getValue());
- }
- }
-
- if (!errors.empty()) {
- StringBuilder sb;
- sb << "Dropping collection failed on the following hosts: ";
-
- for (auto it = errors.cbegin(); it != errors.cend(); ++it) {
- if (it != errors.cbegin()) {
- sb << ", ";
- }
-
- sb << it->first << ": " << it->second;
- }
-
- return {ErrorCodes::OperationFailed, sb.str()};
- }
-
- LOG(1) << "dropCollection " << ns << " shard data deleted";
-
- // Remove chunk data
- Status result = removeConfigDocuments(txn, ChunkType::ConfigNS, BSON(ChunkType::ns(ns.ns())));
- if (!result.isOK()) {
- return result;
- }
-
- LOG(1) << "dropCollection " << ns << " chunk data deleted";
-
- // Mark the collection as dropped
- CollectionType coll;
- coll.setNs(ns);
- coll.setDropped(true);
- coll.setEpoch(ChunkVersion::DROPPED().epoch());
- coll.setUpdatedAt(grid.shardRegistry()->getNetwork()->now());
-
- result = updateCollection(txn, ns.ns(), coll);
- if (!result.isOK()) {
- return result;
- }
-
- LOG(1) << "dropCollection " << ns << " collection marked as dropped";
-
- for (const auto& shardEntry : allShards) {
- SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist(
- grid.shardRegistry()->getConfigServerConnectionString(),
- shardEntry.getName(),
- fassertStatusOK(28753, ConnectionString::parse(shardEntry.getHost())),
- ns,
- ChunkVersion::DROPPED(),
- true);
-
- auto ssvResult = shardRegistry->runCommandWithNotMasterRetries(
- txn, shardEntry.getName(), "admin", ssv.toBSON());
-
- if (!ssvResult.isOK()) {
- return ssvResult.getStatus();
- }
-
- auto ssvStatus = getStatusFromCommandResult(ssvResult.getValue());
- if (!ssvStatus.isOK()) {
- return ssvStatus;
- }
-
- auto unsetShardingStatus = shardRegistry->runCommandWithNotMasterRetries(
- txn, shardEntry.getName(), "admin", BSON("unsetSharding" << 1));
-
- if (!unsetShardingStatus.isOK()) {
- return unsetShardingStatus.getStatus();
- }
-
- auto unsetShardingResult = getStatusFromCommandResult(unsetShardingStatus.getValue());
- if (!unsetShardingResult.isOK()) {
- return unsetShardingResult;
- }
- }
-
- LOG(1) << "dropCollection " << ns << " completed";
-
- logChange(txn, "dropCollection", ns.ns(), BSONObj());
-
- return Status::OK();
-}
-
-StatusWith<SettingsType> CatalogManagerLegacy::getGlobalSettings(OperationContext* txn,
- const string& key) {
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30);
- BSONObj settingsDoc = conn->findOne(SettingsType::ConfigNS, BSON(SettingsType::key(key)));
- conn.done();
-
- if (settingsDoc.isEmpty()) {
- return Status(ErrorCodes::NoMatchingDocument,
- str::stream() << "can't find settings document with key: " << key);
- }
-
- StatusWith<SettingsType> settingsResult = SettingsType::fromBSON(settingsDoc);
- if (!settingsResult.isOK()) {
- return Status(ErrorCodes::FailedToParse,
- str::stream() << "error while parsing settings document: " << settingsDoc
- << " : " << settingsResult.getStatus().toString());
- }
-
- const SettingsType& settings = settingsResult.getValue();
-
- Status validationStatus = settings.validate();
- if (!validationStatus.isOK()) {
- return validationStatus;
- }
-
- return settingsResult;
- } catch (const DBException& ex) {
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "unable to successfully obtain "
- << "config.settings document: " << causedBy(ex));
- }
-}
-
-Status CatalogManagerLegacy::getDatabasesForShard(OperationContext* txn,
- const string& shardName,
- vector<string>* dbs) {
- dbs->clear();
-
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
- std::unique_ptr<DBClientCursor> cursor(_safeCursor(
- conn->query(DatabaseType::ConfigNS, Query(BSON(DatabaseType::primary(shardName))))));
- if (!cursor.get()) {
- conn.done();
- return Status(ErrorCodes::HostUnreachable,
- str::stream() << "unable to open cursor for " << DatabaseType::ConfigNS);
- }
-
- while (cursor->more()) {
- BSONObj dbObj = cursor->nextSafe();
-
- string dbName;
- Status status = bsonExtractStringField(dbObj, DatabaseType::name(), &dbName);
- if (!status.isOK()) {
- dbs->clear();
- return status;
- }
-
- dbs->push_back(dbName);
- }
-
- conn.done();
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
-
- return Status::OK();
-}
-
-Status CatalogManagerLegacy::getChunks(OperationContext* txn,
- const BSONObj& query,
- const BSONObj& sort,
- boost::optional<int> limit,
- vector<ChunkType>* chunks,
- repl::OpTime* opTime) {
- chunks->clear();
-
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
-
- const Query queryWithSort(Query(query).sort(sort));
-
- std::unique_ptr<DBClientCursor> cursor(
- _safeCursor(conn->query(ChunkType::ConfigNS, queryWithSort, limit.get_value_or(0))));
- if (!cursor.get()) {
- conn.done();
- return Status(ErrorCodes::HostUnreachable, "unable to open chunk cursor");
- }
-
- while (cursor->more()) {
- BSONObj chunkObj = cursor->nextSafe();
-
- StatusWith<ChunkType> chunkRes = ChunkType::fromBSON(chunkObj);
- if (!chunkRes.isOK()) {
- conn.done();
- chunks->clear();
- return {ErrorCodes::FailedToParse,
- stream() << "Failed to parse chunk with id ("
- << chunkObj[ChunkType::name()].toString()
- << "): " << chunkRes.getStatus().toString()};
- }
-
- chunks->push_back(chunkRes.getValue());
- }
-
- conn.done();
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
-
- return Status::OK();
-}
-
-Status CatalogManagerLegacy::getTagsForCollection(OperationContext* txn,
- const std::string& collectionNs,
- std::vector<TagsType>* tags) {
- tags->clear();
-
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30);
- std::unique_ptr<DBClientCursor> cursor(_safeCursor(conn->query(
- TagsType::ConfigNS, Query(BSON(TagsType::ns(collectionNs))).sort(TagsType::min()))));
- if (!cursor.get()) {
- conn.done();
- return Status(ErrorCodes::HostUnreachable, "unable to open tags cursor");
- }
-
- while (cursor->more()) {
- BSONObj tagObj = cursor->nextSafe();
-
- StatusWith<TagsType> tagRes = TagsType::fromBSON(tagObj);
- if (!tagRes.isOK()) {
- tags->clear();
- conn.done();
- return Status(ErrorCodes::FailedToParse,
- str::stream()
- << "Failed to parse tag: " << tagRes.getStatus().toString());
- }
-
- tags->push_back(tagRes.getValue());
- }
-
- conn.done();
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
-
- return Status::OK();
-}
-
-StatusWith<string> CatalogManagerLegacy::getTagForChunk(OperationContext* txn,
- const std::string& collectionNs,
- const ChunkType& chunk) {
- BSONObj tagDoc;
-
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30);
-
- Query query(BSON(TagsType::ns(collectionNs)
- << TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max()
- << BSON("$gte" << chunk.getMax())));
-
- tagDoc = conn->findOne(TagsType::ConfigNS, query);
- conn.done();
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
-
- if (tagDoc.isEmpty()) {
- return std::string("");
- }
-
- auto status = TagsType::fromBSON(tagDoc);
- if (status.isOK()) {
- return status.getValue().getTag();
- }
-
- return status.getStatus();
-}
-
-StatusWith<OpTimePair<std::vector<ShardType>>> CatalogManagerLegacy::getAllShards(
- OperationContext* txn) {
- std::vector<ShardType> shards;
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
- std::unique_ptr<DBClientCursor> cursor(
- _safeCursor(conn->query(ShardType::ConfigNS, BSONObj())));
- while (cursor->more()) {
- BSONObj shardObj = cursor->nextSafe();
-
- StatusWith<ShardType> shardRes = ShardType::fromBSON(shardObj);
- if (!shardRes.isOK()) {
- shards.clear();
- conn.done();
- return Status(ErrorCodes::FailedToParse,
- str::stream() << "Failed to parse shard with id ("
- << shardObj[ShardType::name()].toString()
- << "): " << shardRes.getStatus().toString());
- }
-
- shards.push_back(shardRes.getValue());
- }
- conn.done();
-
- return OpTimePair<std::vector<ShardType>>{std::move(shards)};
-}
-
-bool CatalogManagerLegacy::runUserManagementWriteCommand(OperationContext* txn,
- const string& commandName,
- const string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) {
- DBClientMultiCommand dispatcher(true);
- for (const ConnectionString& configServer : _configServers) {
- dispatcher.addCommand(configServer, dbname, cmdObj);
- }
-
- auto scopedDistLock = getDistLockManager()->lock(txn, "authorizationData", commandName);
- if (!scopedDistLock.isOK()) {
- return Command::appendCommandStatus(*result, scopedDistLock.getStatus());
- }
-
- dispatcher.sendAll();
-
- BSONObj responseObj;
-
- Status prevStatus{Status::OK()};
- Status currStatus{Status::OK()};
-
- BSONObjBuilder responses;
- unsigned failedCount = 0;
- bool sameError = true;
- while (dispatcher.numPending() > 0) {
- ConnectionString host;
- RawBSONSerializable responseCmdSerial;
-
- Status dispatchStatus = dispatcher.recvAny(&host, &responseCmdSerial);
-
- if (!dispatchStatus.isOK()) {
- return Command::appendCommandStatus(*result, dispatchStatus);
- }
-
- responseObj = responseCmdSerial.toBSON();
- responses.append(host.toString(), responseObj);
-
- currStatus = Command::getStatusFromCommandResult(responseObj);
- if (!currStatus.isOK()) {
- // same error <=> adjacent error statuses are the same
- if (failedCount > 0 && prevStatus != currStatus) {
- sameError = false;
- }
- failedCount++;
- prevStatus = currStatus;
- }
- }
-
- if (failedCount == 0) {
- result->appendElements(responseObj);
- return true;
- }
-
- // if the command succeeds on at least one config server and fails on at least one,
- // manual intervention is required
- if (failedCount < _configServers.size()) {
- Status status(ErrorCodes::ManualInterventionRequired,
- str::stream() << "Config write was not consistent - "
- << "user management command failed on at least one "
- << "config server but passed on at least one other. "
- << "Manual intervention may be required. "
- << "Config responses: " << responses.obj().toString());
- return Command::appendCommandStatus(*result, status);
- }
-
- if (sameError) {
- result->appendElements(responseObj);
- return false;
- }
-
- Status status(ErrorCodes::ManualInterventionRequired,
- str::stream() << "Config write was not consistent - "
- << "user management command produced inconsistent results. "
- << "Manual intervention may be required. "
- << "Config responses: " << responses.obj().toString());
- return Command::appendCommandStatus(*result, status);
-}
-
-bool CatalogManagerLegacy::runUserManagementReadCommand(OperationContext* txn,
- const string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) {
- return _runReadCommand(txn, dbname, cmdObj, result);
-}
-
-bool CatalogManagerLegacy::_runReadCommand(OperationContext* txn,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) {
- try {
- // let SyncClusterConnection handle connecting to the first config server
- // that is reachable and returns data
- ScopedDbConnection conn(_configServerConnectionString, 30);
-
- BSONObj cmdResult;
- const bool ok = conn->runCommand(dbname, cmdObj, cmdResult);
- result->appendElements(cmdResult);
- conn.done();
- return ok;
- } catch (const DBException& ex) {
- return Command::appendCommandStatus(*result, ex.toStatus());
- }
-}
-
-Status CatalogManagerLegacy::applyChunkOpsDeprecated(OperationContext* txn,
- const BSONArray& updateOps,
- const BSONArray& preCondition,
- const std::string& nss,
- const ChunkVersion& lastChunkVersion) {
- BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition);
- BSONObj cmdResult;
- try {
- ScopedDbConnection conn(_configServerConnectionString, 30);
- conn->runCommand("config", cmd, cmdResult);
- conn.done();
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
-
- Status status = Command::getStatusFromCommandResult(cmdResult);
-
- if (MONGO_FAIL_POINT(failApplyChunkOps)) {
- status = Status(ErrorCodes::InternalError, "Failpoint 'failApplyChunkOps' generated error");
- }
-
- if (!status.isOK()) {
- string errMsg;
-
- // This could be a blip in the network connectivity. Check if the commit request made it.
- //
- // If all the updates were successfully written to the chunks collection, the last
- // document in the list of updates should be returned from a query to the chunks
- // collection. The last chunk can be identified by namespace and version number.
-
- warning() << "chunk operation commit failed and metadata will be revalidated"
- << causedBy(status);
-
- std::vector<ChunkType> newestChunk;
- BSONObjBuilder query;
- lastChunkVersion.addToBSON(query, ChunkType::DEPRECATED_lastmod());
- query.append(ChunkType::ns(), nss);
- Status chunkStatus = getChunks(txn, query.obj(), BSONObj(), 1, &newestChunk, nullptr);
-
- if (!chunkStatus.isOK()) {
- warning() << "getChunks function failed, unable to validate chunk operation metadata"
- << causedBy(chunkStatus);
- errMsg = str::stream() << "getChunks function failed, unable to validate chunk "
- << "operation metadata: " << causedBy(chunkStatus)
- << ". applyChunkOpsDeprecated failed to get confirmation "
- << "of commit. Unable to save chunk ops. Command: " << cmd
- << ". Result: " << cmdResult;
- } else if (!newestChunk.empty()) {
- invariant(newestChunk.size() == 1);
- log() << "chunk operation commit confirmed";
- return Status::OK();
- } else {
- errMsg = str::stream() << "chunk operation commit failed: version "
- << lastChunkVersion.toString() << " doesn't exist in namespace"
- << nss << ". Unable to save chunk ops. Command: " << cmd
- << ". Result: " << cmdResult;
- }
-
- return Status(status.code(), errMsg);
- }
-
- return Status::OK();
-}
-
-void CatalogManagerLegacy::writeConfigServerDirect(OperationContext* txn,
- const BatchedCommandRequest& request,
- BatchedCommandResponse* response) {
- // check if config servers are consistent
- if (!_isConsistentFromLastCheck()) {
- toBatchError(Status(ErrorCodes::ConfigServersInconsistent,
- "Data inconsistency detected amongst config servers"),
- response);
- return;
- }
-
- // We only support batch sizes of one for config writes
- if (request.sizeWriteOps() != 1) {
- toBatchError(Status(ErrorCodes::InvalidOptions,
- str::stream() << "Writes to config servers must have batch size of 1, "
- << "found " << request.sizeWriteOps()),
- response);
-
- return;
- }
-
- // We only support {w: 0}, {w: 1}, and {w: 'majority'} write concern for config writes
- if (request.isWriteConcernSet() && !validConfigWC(request.getWriteConcern())) {
- toBatchError(Status(ErrorCodes::InvalidOptions,
- str::stream() << "Invalid write concern for write to "
- << "config servers: " << request.getWriteConcern()),
- response);
-
- return;
- }
-
- DBClientMultiCommand dispatcher(true);
- if (_configServers.size() > 1) {
- // We can't support no-_id upserts to multiple config servers - the _ids will differ
- if (BatchedCommandRequest::containsNoIDUpsert(request)) {
- toBatchError(
- Status(ErrorCodes::InvalidOptions,
- str::stream() << "upserts to multiple config servers must include _id"),
- response);
- return;
- }
- }
-
- ConfigCoordinator exec(&dispatcher, _configServerConnectionString);
- exec.executeBatch(request, response);
-}
-
-Status CatalogManagerLegacy::insertConfigDocument(OperationContext* txn,
- const std::string& ns,
- const BSONObj& doc) {
- const NamespaceString nss(ns);
- invariant(nss.db() == "config");
- invariant(doc.hasField("_id"));
-
- auto insert(stdx::make_unique<BatchedInsertRequest>());
- insert->addToDocuments(doc);
-
- BatchedCommandRequest request(insert.release());
- request.setNS(nss);
-
- BatchedCommandResponse response;
- writeConfigServerDirect(txn, request, &response);
-
- return response.toStatus();
-}
-
-StatusWith<bool> CatalogManagerLegacy::updateConfigDocument(OperationContext* txn,
- const string& ns,
- const BSONObj& query,
- const BSONObj& update,
- bool upsert) {
- const NamespaceString nss(ns);
- invariant(nss.db() == "config");
-
- const BSONElement idField = query.getField("_id");
- invariant(!idField.eoo());
-
- unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument());
- updateDoc->setQuery(query);
- updateDoc->setUpdateExpr(update);
- updateDoc->setUpsert(upsert);
- updateDoc->setMulti(false);
-
- unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest());
- updateRequest->addToUpdates(updateDoc.release());
-
- BatchedCommandRequest request(updateRequest.release());
- request.setNS(nss);
-
- BatchedCommandResponse response;
- writeConfigServerDirect(txn, request, &response);
-
- Status status = response.toStatus();
- if (!status.isOK()) {
- return status;
- }
-
- const auto nSelected = response.getN();
- invariant(nSelected == 0 || nSelected == 1);
- return (nSelected == 1);
-}
-
-Status CatalogManagerLegacy::removeConfigDocuments(OperationContext* txn,
- const string& ns,
- const BSONObj& query) {
- const NamespaceString nss(ns);
- invariant(nss.db() == "config");
-
- auto deleteDoc(stdx::make_unique<BatchedDeleteDocument>());
- deleteDoc->setQuery(query);
- deleteDoc->setLimit(0);
-
- auto deleteRequest(stdx::make_unique<BatchedDeleteRequest>());
- deleteRequest->addToDeletes(deleteDoc.release());
-
- BatchedCommandRequest request(deleteRequest.release());
- request.setNS(nss);
-
- BatchedCommandResponse response;
- writeConfigServerDirect(txn, request, &response);
-
- return response.toStatus();
-}
-
-Status CatalogManagerLegacy::_checkDbDoesNotExist(OperationContext* txn,
- const std::string& dbName,
- DatabaseType* db) {
- ScopedDbConnection conn(_configServerConnectionString, 30);
-
- BSONObjBuilder b;
- b.appendRegex(DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbName) + "$", "i");
-
- BSONObj dbObj = conn->findOne(DatabaseType::ConfigNS, b.obj());
- conn.done();
-
- // If our name is exactly the same as the name we want, try loading
- // the database again.
- if (!dbObj.isEmpty() && dbObj[DatabaseType::name()].String() == dbName) {
- if (db) {
- auto parseDBStatus = DatabaseType::fromBSON(dbObj);
- if (!parseDBStatus.isOK()) {
- return parseDBStatus.getStatus();
- }
-
- *db = parseDBStatus.getValue();
- }
-
- return Status(ErrorCodes::NamespaceExists,
- str::stream() << "database " << dbName << " already exists");
- }
-
- if (!dbObj.isEmpty()) {
- return Status(ErrorCodes::DatabaseDifferCase,
- str::stream() << "can't have 2 databases that just differ on case "
- << " have: " << dbObj[DatabaseType::name()].String()
- << " want to add: " << dbName);
- }
-
- return Status::OK();
-}
-
-StatusWith<string> CatalogManagerLegacy::_generateNewShardName(OperationContext* txn) {
- BSONObj o;
- {
- ScopedDbConnection conn(_configServerConnectionString, 30);
- o = conn->findOne(ShardType::ConfigNS,
- Query(fromjson("{" + ShardType::name() + ": /^shard/}"))
- .sort(BSON(ShardType::name() << -1)));
- conn.done();
- }
-
- int count = 0;
- if (!o.isEmpty()) {
- string last = o[ShardType::name()].String();
- std::istringstream is(last.substr(5));
- is >> count;
- count++;
- }
-
- // TODO fix so that we can have more than 10000 automatically generated shard names
- if (count < 9999) {
- std::stringstream ss;
- ss << "shard" << std::setfill('0') << std::setw(4) << count;
- return ss.str();
- }
-
- return Status(ErrorCodes::OperationFailed, "unable to generate new shard name");
-}
-
-Status CatalogManagerLegacy::_createCappedConfigCollection(OperationContext* txn,
- StringData collName,
- int cappedSize) {
- try {
- const NamespaceString nss("config", collName);
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
-
- BSONObj result;
- const int maxNumDocuments = 0;
- conn->createCollection(nss.ns(), cappedSize, true, maxNumDocuments, &result);
- conn.done();
-
- return getStatusFromCommandResult(result);
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
-}
-
-size_t CatalogManagerLegacy::_getShardCount(const BSONObj& query) const {
- ScopedDbConnection conn(_configServerConnectionString, 30.0);
- long long shardCount = conn->count(ShardType::ConfigNS, query);
- conn.done();
-
- return shardCount;
-}
-
-DistLockManager* CatalogManagerLegacy::getDistLockManager() {
- invariant(_distLockManager);
- return _distLockManager.get();
-}
-
-Status CatalogManagerLegacy::_checkConfigServersConsistent(const unsigned tries) const {
- if (tries <= 0) {
- return {ErrorCodes::ConfigServersInconsistent,
- "too many retries after unsuccessful checks"};
- }
-
- unsigned firstGood = 0;
- int up = 0;
- // becomes false if we are able to get any response (even a failed one) from any of the config
- // servers
- bool networkError = true;
- vector<BSONObj> res;
-
- // The last error we saw on a config server
- string errMsg;
-
- for (unsigned i = 0; i < _configServers.size(); i++) {
- BSONObj result;
- std::unique_ptr<ScopedDbConnection> conn;
-
- try {
- conn.reset(new ScopedDbConnection(_configServers[i], 30.0));
-
- if (!conn->get()->runCommand(
- "config",
- BSON("dbhash" << 1 << "collections" << BSON_ARRAY("chunks"
- << "databases"
- << "collections"
- << "shards"
- << "version")),
- result)) {
- errMsg = result["errmsg"].eoo() ? "" : result["errmsg"].String();
- if (!result["assertion"].eoo())
- errMsg = result["assertion"].String();
-
- warning() << "couldn't check dbhash on config server " << _configServers[i]
- << causedBy(result.toString());
-
- result = BSONObj();
- } else {
- result = result.getOwned();
- if (up == 0)
- firstGood = i;
- up++;
- }
- // Network errors throw, so if we got this far there wasn't a network error
- networkError = false;
- conn->done();
- } catch (const DBException& excep) {
- if (conn) {
- conn->kill();
- }
-
- if (excep.getCode() == ErrorCodes::IncompatibleCatalogManager) {
- return excep.toStatus();
- }
-
- // We need to catch DBExceptions b/c sometimes we throw them
- // instead of socket exceptions when findN fails
-
- errMsg = excep.toString();
- warning() << " couldn't check dbhash on config server " << _configServers[i]
- << causedBy(excep);
- }
- res.push_back(result);
- }
-
- if (_configServers.size() == 1) {
- return Status::OK();
- }
-
- if (up == 0) {
- return {networkError ? ErrorCodes::HostUnreachable : ErrorCodes::UnknownError,
- str::stream() << "no config servers successfully contacted" << causedBy(errMsg)};
- } else if (up == 1) {
- warning() << "only 1 config server reachable, continuing";
- return Status::OK();
- }
-
- BSONObj base = res[firstGood];
- for (unsigned i = firstGood + 1; i < res.size(); i++) {
- if (res[i].isEmpty())
- continue;
-
- string chunksHash1 = base.getFieldDotted("collections.chunks");
- string chunksHash2 = res[i].getFieldDotted("collections.chunks");
-
- string databaseHash1 = base.getFieldDotted("collections.databases");
- string databaseHash2 = res[i].getFieldDotted("collections.databases");
-
- string collectionsHash1 = base.getFieldDotted("collections.collections");
- string collectionsHash2 = res[i].getFieldDotted("collections.collections");
-
- string shardHash1 = base.getFieldDotted("collections.shards");
- string shardHash2 = res[i].getFieldDotted("collections.shards");
-
- string versionHash1 = base.getFieldDotted("collections.version");
- string versionHash2 = res[i].getFieldDotted("collections.version");
-
- if (chunksHash1 == chunksHash2 && databaseHash1 == databaseHash2 &&
- collectionsHash1 == collectionsHash2 && shardHash1 == shardHash2 &&
- versionHash1 == versionHash2) {
- continue;
- }
-
- warning() << "config servers " << _configServers[firstGood].toString() << " and "
- << _configServers[i].toString() << " differ";
-
- if (tries <= 1) {
- return {ErrorCodes::ConfigServersInconsistent,
- str::stream() << "hash from " << _configServers[firstGood].toString() << ": "
- << base["collections"].Obj() << " vs hash from "
- << _configServers[i].toString() << ": "
- << res[i]["collections"].Obj()};
- }
-
- return _checkConfigServersConsistent(tries - 1);
- }
-
- return Status::OK();
-}
-
-void CatalogManagerLegacy::_consistencyChecker() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_inShutdown) {
- lk.unlock();
- const auto status = _checkConfigServersConsistent();
-
- lk.lock();
- _consistentFromLastCheck = status.isOK();
- if (_inShutdown)
- break;
- _consistencyCheckerCV.wait_for(lk, Seconds(60));
- }
- LOG(1) << "Consistency checker thread shutting down";
-}
-
-bool CatalogManagerLegacy::_isConsistentFromLastCheck() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- return _consistentFromLastCheck;
-}
-
-Status CatalogManagerLegacy::appendInfoForConfigServerDatabases(OperationContext* txn,
- BSONArrayBuilder* builder) {
- BSONObjBuilder resultBuilder;
- if (!_runReadCommand(txn, "admin", BSON("listDatabases" << 1), &resultBuilder)) {
- return getStatusFromCommandResult(resultBuilder.obj());
- }
-
- auto listDBResponse = resultBuilder.done();
- BSONElement dbListArray;
- auto dbListStatus = bsonExtractTypedField(listDBResponse, "databases", Array, &dbListArray);
- if (!dbListStatus.isOK()) {
- return dbListStatus;
- }
-
- BSONObjIterator iter(dbListArray.Obj());
-
- while (iter.more()) {
- auto dbEntry = iter.next().Obj();
- string name;
- auto parseStatus = bsonExtractStringField(dbEntry, "name", &name);
-
- if (!parseStatus.isOK()) {
- return parseStatus;
- }
-
- if (name == "config" || name == "admin") {
- builder->append(dbEntry);
- }
- }
-
- return Status::OK();
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
deleted file mode 100644
index 3c99ec4c978..00000000000
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/client/connection_string.h"
-#include "mongo/s/catalog/catalog_manager_common.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/thread.h"
-
-namespace mongo {
-
-/**
- * Implements the catalog manager using the legacy 3-config server protocol.
- */
-class CatalogManagerLegacy final : public CatalogManagerCommon {
-public:
- CatalogManagerLegacy();
- ~CatalogManagerLegacy();
-
- ConfigServerMode getMode() override {
- return ConfigServerMode::SCCC;
- }
-
- /**
- * Initializes the catalog manager with the hosts, which will be used as a configuration
- * server. Can only be called once for the lifetime.
- */
- Status init(const ConnectionString& configCS, const std::string& distLockProcessId);
-
- /**
- * Can terminate the server if called more than once.
- */
- Status startup(OperationContext* txn, bool allowNetworking) override;
-
- void shutDown(OperationContext* txn, bool allowNetworking) override;
-
- Status shardCollection(OperationContext* txn,
- const std::string& ns,
- const ShardKeyPattern& fieldsAndOrder,
- bool unique,
- const std::vector<BSONObj>& initPoints,
- const std::set<ShardId>& initShardIds) override;
-
- StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn,
- const std::string& name) override;
-
- StatusWith<OpTimePair<DatabaseType>> getDatabase(OperationContext* txn,
- const std::string& dbName) override;
-
- StatusWith<OpTimePair<CollectionType>> getCollection(OperationContext* txn,
- const std::string& collNs) override;
-
- Status getCollections(OperationContext* txn,
- const std::string* dbName,
- std::vector<CollectionType>* collections,
- repl::OpTime* optime);
-
- Status dropCollection(OperationContext* txn, const NamespaceString& ns) override;
-
- Status getDatabasesForShard(OperationContext* txn,
- const std::string& shardName,
- std::vector<std::string>* dbs) override;
-
- Status getChunks(OperationContext* txn,
- const BSONObj& query,
- const BSONObj& sort,
- boost::optional<int> limit,
- std::vector<ChunkType>* chunks,
- repl::OpTime* opTime) override;
-
- Status getTagsForCollection(OperationContext* txn,
- const std::string& collectionNs,
- std::vector<TagsType>* tags) override;
-
- StatusWith<std::string> getTagForChunk(OperationContext* txn,
- const std::string& collectionNs,
- const ChunkType& chunk) override;
-
- StatusWith<OpTimePair<std::vector<ShardType>>> getAllShards(OperationContext* txn) override;
-
- /**
- * Grabs a distributed lock and runs the command on all config servers.
- */
- bool runUserManagementWriteCommand(OperationContext* txn,
- const std::string& commandName,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) override;
-
- bool runUserManagementReadCommand(OperationContext* txn,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) override;
-
- Status applyChunkOpsDeprecated(OperationContext* txn,
- const BSONArray& updateOps,
- const BSONArray& preCondition,
- const std::string& nss,
- const ChunkVersion& lastChunkVersion) override;
-
- StatusWith<SettingsType> getGlobalSettings(OperationContext* txn,
- const std::string& key) override;
-
- void writeConfigServerDirect(OperationContext* txn,
- const BatchedCommandRequest& request,
- BatchedCommandResponse* response) override;
-
- Status insertConfigDocument(OperationContext* txn,
- const std::string& ns,
- const BSONObj& doc) override;
-
- StatusWith<bool> updateConfigDocument(OperationContext* txn,
- const std::string& ns,
- const BSONObj& query,
- const BSONObj& update,
- bool upsert) override;
-
- Status removeConfigDocuments(OperationContext* txn,
- const std::string& ns,
- const BSONObj& query) override;
-
- DistLockManager* getDistLockManager() override;
-
- Status initConfigVersion(OperationContext* txn) override;
-
- Status appendInfoForConfigServerDatabases(OperationContext* txn,
- BSONArrayBuilder* builder) override;
-
-private:
- Status _checkDbDoesNotExist(OperationContext* txn,
- const std::string& dbName,
- DatabaseType* db) override;
-
- StatusWith<std::string> _generateNewShardName(OperationContext* txn) override;
-
- Status _createCappedConfigCollection(OperationContext* txn,
- StringData collName,
- int cappedSize) override;
-
- /**
- * Starts the thread that periodically checks data consistency amongst the config servers.
- * Note: this is not thread safe and can only be called once for the lifetime.
- */
- Status _startConfigServerChecker();
-
- /**
- * Returns the number of shards recognized by the config servers
- * in this sharded cluster.
- * Optional: use query parameter to filter shard count.
- */
- size_t _getShardCount(const BSONObj& query) const;
-
- /**
- * Returns OK if all config servers that were contacted have the same state.
- * If inconsistency detected on first attempt, checks at most 3 more times.
- */
- Status _checkConfigServersConsistent(const unsigned tries = 4) const;
-
- /**
- * Checks data consistency amongst config servers every 60 seconds.
- */
- void _consistencyChecker();
-
- /**
- * Returns true if the config servers have the same contents since the last
- * check was performed.
- */
- bool _isConsistentFromLastCheck();
-
- /**
- * Sends a read only command to the config server.
- */
- bool _runReadCommand(OperationContext* txn,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder* result);
-
- // Parsed config server hosts, as specified on the command line.
- ConnectionString _configServerConnectionString;
- std::vector<ConnectionString> _configServers;
-
- // Distributed lock manager singleton.
- std::unique_ptr<DistLockManager> _distLockManager;
-
- // protects _inShutdown, _consistentFromLastCheck; used by _consistencyCheckerCV
- stdx::mutex _mutex;
-
- // True if CatalogManagerLegacy::shutDown has been called. False, otherwise.
- bool _inShutdown = false;
-
- // Set to true once startup() has been called and returned an OK status. Allows startup() to be
- // called multiple times with any time after the first successful call being a no-op.
- bool _started = false;
-
- // used by consistency checker thread to check if config
- // servers are consistent
- bool _consistentFromLastCheck = false;
-
- // Thread that runs dbHash on config servers for checking data consistency.
- stdx::thread _consistencyCheckerThread;
-
- // condition variable used by the consistency checker thread to wait
- // for <= 60s, on every iteration, until shutDown is called
- stdx::condition_variable _consistencyCheckerCV;
-};
-
-} // namespace mongo
diff --git a/src/mongo/s/catalog/legacy/cluster_client_internal.cpp b/src/mongo/s/catalog/legacy/cluster_client_internal.cpp
deleted file mode 100644
index cc03875aa87..00000000000
--- a/src/mongo/s/catalog/legacy/cluster_client_internal.cpp
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Copyright (C) 2012 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/catalog/legacy/cluster_client_internal.h"
-
-#include "mongo/client/connpool.h"
-#include "mongo/util/stringutils.h"
-
-namespace mongo {
-
-using std::unique_ptr;
-using mongoutils::str::stream;
-
-// Helper function for safe cursors
-DBClientCursor* _safeCursor(unique_ptr<DBClientCursor> cursor) {
- // TODO: Make error handling more consistent, it's annoying that cursors error out by
- // throwing exceptions *and* being empty
- uassert(16625, str::stream() << "cursor not found, transport error", cursor.get());
- return cursor.release();
-}
-}
diff --git a/src/mongo/s/catalog/legacy/cluster_client_internal.h b/src/mongo/s/catalog/legacy/cluster_client_internal.h
deleted file mode 100644
index af4d9f423ef..00000000000
--- a/src/mongo/s/catalog/legacy/cluster_client_internal.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Copyright (C) 2012 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#pragma once
-
-#include "mongo/stdx/memory.h"
-
-namespace mongo {
-
-class DBClientCursor;
-
-//
-// Needed to normalize exception behavior of connections and cursors
-// TODO: Remove when we refactor the client connection interface to something more consistent.
-//
-
-// Helper function which throws for invalid cursor initialization.
-// Note: cursor ownership will be passed to this function.
-DBClientCursor* _safeCursor(std::unique_ptr<DBClientCursor> cursor);
-}
diff --git a/src/mongo/s/catalog/legacy/config_coordinator.cpp b/src/mongo/s/catalog/legacy/config_coordinator.cpp
deleted file mode 100644
index cdfd7e93b62..00000000000
--- a/src/mongo/s/catalog/legacy/config_coordinator.cpp
+++ /dev/null
@@ -1,430 +0,0 @@
-/**
- * Copyright (C) 2013 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/catalog/legacy/config_coordinator.h"
-
-#include "mongo/base/owned_pointer_vector.h"
-#include "mongo/db/field_parser.h"
-#include "mongo/db/lasterror.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/s/client/multi_command_dispatch.h"
-#include "mongo/s/set_shard_version_request.h"
-#include "mongo/s/write_ops/batched_command_request.h"
-#include "mongo/s/write_ops/batched_command_response.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-
-using std::string;
-using std::vector;
-
-namespace {
-
-/**
- * A BSON serializable object representing a setShardVersion command response.
- */
-class SSVResponse : public BSONSerializable {
- MONGO_DISALLOW_COPYING(SSVResponse);
-
-public:
- static const BSONField<int> ok;
- static const BSONField<int> errCode;
- static const BSONField<string> errMessage;
-
-
- SSVResponse() {
- clear();
- }
-
- bool isValid(std::string* errMsg) const {
- return _isOkSet;
- }
-
- BSONObj toBSON() const {
- BSONObjBuilder builder;
-
- if (_isOkSet)
- builder << ok(_ok);
- if (_isErrCodeSet)
- builder << errCode(_errCode);
- if (_isErrMessageSet)
- builder << errMessage(_errMessage);
-
- return builder.obj();
- }
-
- bool parseBSON(const BSONObj& source, std::string* errMsg) {
- FieldParser::FieldState result;
-
- result = FieldParser::extractNumber(source, ok, &_ok, errMsg);
- if (result == FieldParser::FIELD_INVALID) {
- return false;
- }
- _isOkSet = result != FieldParser::FIELD_NONE;
-
- result = FieldParser::extract(source, errCode, &_errCode, errMsg);
- if (result == FieldParser::FIELD_INVALID) {
- return false;
- }
- _isErrCodeSet = result != FieldParser::FIELD_NONE;
-
- result = FieldParser::extract(source, errMessage, &_errMessage, errMsg);
- if (result == FieldParser::FIELD_INVALID) {
- return false;
- }
- _isErrMessageSet = result != FieldParser::FIELD_NONE;
-
- return true;
- }
-
- void clear() {
- _ok = false;
- _isOkSet = false;
-
- _errCode = 0;
- _isErrCodeSet = false;
-
- _errMessage = "";
- _isErrMessageSet = false;
- }
-
- string toString() const {
- return toBSON().toString();
- }
-
- int getOk() {
- dassert(_isOkSet);
- return _ok;
- }
-
- void setOk(int ok) {
- _ok = ok;
- _isOkSet = true;
- }
-
- int getErrCode() {
- if (_isErrCodeSet) {
- return _errCode;
- } else {
- return errCode.getDefault();
- }
- }
-
- void setErrCode(int errCode) {
- _errCode = errCode;
- _isErrCodeSet = true;
- }
-
- bool isErrCodeSet() const {
- return _isErrCodeSet;
- }
-
- const string& getErrMessage() {
- dassert(_isErrMessageSet);
- return _errMessage;
- }
-
- void setErrMessage(StringData errMsg) {
- _errMessage = errMsg.toString();
- _isErrMessageSet = true;
- }
-
-private:
- int _ok;
- bool _isOkSet;
-
- int _errCode;
- bool _isErrCodeSet;
-
- string _errMessage;
- bool _isErrMessageSet;
-};
-
-const BSONField<int> SSVResponse::ok("ok");
-const BSONField<int> SSVResponse::errCode("code");
-const BSONField<string> SSVResponse::errMessage("errmsg");
-
-
-struct ConfigResponse {
- ConnectionString configHost;
- BatchedCommandResponse response;
-};
-
-void buildErrorFrom(const Status& status, BatchedCommandResponse* response) {
- response->setOk(false);
- response->setErrCode(static_cast<int>(status.code()));
- response->setErrMessage(status.reason());
-
- dassert(response->isValid(NULL));
-}
-
-bool areResponsesEqual(const BatchedCommandResponse& responseA,
- const BatchedCommandResponse& responseB) {
- // Note: This needs to also take into account comparing responses from legacy writes
- // and write commands.
-
- // TODO: Better reporting of why not equal
- if (responseA.getOk() != responseB.getOk()) {
- return false;
- }
-
- if (responseA.getN() != responseB.getN()) {
- return false;
- }
-
- if (responseA.isUpsertDetailsSet()) {
- // TODO:
- }
-
- if (responseA.getOk()) {
- return true;
- }
-
- // TODO: Compare errors here
-
- return true;
-}
-
-bool areAllResponsesEqual(const vector<ConfigResponse*>& responses) {
- BatchedCommandResponse* lastResponse = NULL;
-
- for (vector<ConfigResponse*>::const_iterator it = responses.begin(); it != responses.end();
- ++it) {
- BatchedCommandResponse* response = &(*it)->response;
-
- if (lastResponse != NULL) {
- if (!areResponsesEqual(*lastResponse, *response)) {
- return false;
- }
- }
-
- lastResponse = response;
- }
-
- return true;
-}
-
-void combineResponses(const vector<ConfigResponse*>& responses,
- BatchedCommandResponse* clientResponse) {
- if (areAllResponsesEqual(responses)) {
- responses.front()->response.cloneTo(clientResponse);
- return;
- }
-
- BSONObjBuilder builder;
- for (vector<ConfigResponse*>::const_iterator it = responses.begin(); it != responses.end();
- ++it) {
- builder.append((*it)->configHost.toString(), (*it)->response.toBSON());
- }
-
- clientResponse->setOk(false);
- clientResponse->setErrCode(ErrorCodes::ManualInterventionRequired);
- clientResponse->setErrMessage(
- "config write was not consistent, "
- "manual intervention may be required. "
- "config responses: " +
- builder.obj().toString());
-}
-
-} // namespace
-
-
-ConfigCoordinator::ConfigCoordinator(MultiCommandDispatch* dispatcher,
- const ConnectionString& configServerConnectionString)
- : _dispatcher(dispatcher), _configServerConnectionString(configServerConnectionString) {}
-
-bool ConfigCoordinator::_checkConfigString(BatchedCommandResponse* clientResponse) {
- //
- // Send side
- //
-
- for (const HostAndPort& server : _configServerConnectionString.getServers()) {
- SetShardVersionRequest ssv = SetShardVersionRequest::makeForInit(
- _configServerConnectionString, "config", _configServerConnectionString);
- _dispatcher->addCommand(ConnectionString(server), "admin", ssv.toBSON());
- }
-
- _dispatcher->sendAll();
-
- //
- // Recv side
- //
-
- bool ssvError = false;
- while (_dispatcher->numPending() > 0) {
- ConnectionString configHost;
- SSVResponse response;
-
- // We've got to recv everything, no matter what - even if some failed.
- Status dispatchStatus = _dispatcher->recvAny(&configHost, &response);
-
- if (ssvError) {
- // record only the first failure.
- continue;
- }
-
- if (!dispatchStatus.isOK()) {
- ssvError = true;
- clientResponse->setOk(false);
- clientResponse->setErrCode(static_cast<int>(dispatchStatus.code()));
- clientResponse->setErrMessage(dispatchStatus.reason());
- } else if (!response.getOk()) {
- ssvError = true;
- clientResponse->setOk(false);
- clientResponse->setErrMessage(response.getErrMessage());
-
- if (response.isErrCodeSet()) {
- clientResponse->setErrCode(response.getErrCode());
- }
- }
- }
-
- return !ssvError;
-}
-
-/**
- * The core config write functionality.
- *
- * Config writes run in two passes - the first is a quick check to ensure the config servers
- * are all reachable, the second runs the actual write.
- *
- * TODO: Upgrade and move this logic to the config servers, a state machine implementation
- * is probably the next step.
- */
-void ConfigCoordinator::executeBatch(const BatchedCommandRequest& clientRequest,
- BatchedCommandResponse* clientResponse) {
- const NamespaceString nss(clientRequest.getNS());
-
- // Should never use it for anything other than DBs residing on the config server
- dassert(nss.db() == "config" || nss.db() == "admin");
- dassert(clientRequest.sizeWriteOps() == 1u);
-
- // This is an opportunistic check that all config servers look healthy by calling
- // getLastError on each one of them. If there was some form of write/journaling error, get
- // last error would fail.
- {
- for (const HostAndPort& server : _configServerConnectionString.getServers()) {
- _dispatcher->addCommand(
- ConnectionString(server), "admin", BSON("getLastError" << true << "fsync" << true));
- }
-
- _dispatcher->sendAll();
-
- bool error = false;
- while (_dispatcher->numPending()) {
- ConnectionString host;
- RawBSONSerializable response;
-
- Status status = _dispatcher->recvAny(&host, &response);
- if (status.isOK()) {
- BSONObj obj = response.toBSON();
-
- LOG(3) << "Response " << obj.toString();
-
- // If the ok field is anything other than 1, count it as error
- if (!obj["ok"].trueValue()) {
- error = true;
- log() << "Config server check for host " << host
- << " returned error: " << response;
- }
- } else if (status == ErrorCodes::IncompatibleCatalogManager) {
- uassertStatusOK(status);
- } else {
- error = true;
- log() << "Config server check for host " << host
- << " failed with status: " << status;
- }
- }
-
- // All responses should have been gathered by this point
- if (error) {
- clientResponse->setOk(false);
- clientResponse->setErrCode(ErrorCodes::RemoteValidationError);
- clientResponse->setErrMessage(
- "Could not verify that config servers were active"
- " and reachable before write");
- return;
- }
- }
-
- if (!_checkConfigString(clientResponse)) {
- return;
- }
-
- //
- // Do the actual writes
- //
-
- BatchedCommandRequest configRequest(clientRequest.getBatchType());
- clientRequest.cloneTo(&configRequest);
- configRequest.setNS(nss);
-
- OwnedPointerVector<ConfigResponse> responsesOwned;
- vector<ConfigResponse*>& responses = responsesOwned.mutableVector();
-
- //
- // Send the actual config writes
- //
-
- // Get as many batches as we can at once
- for (const HostAndPort& server : _configServerConnectionString.getServers()) {
- _dispatcher->addCommand(ConnectionString(server), nss.db(), configRequest.toBSON());
- }
-
- // Send them all out
- _dispatcher->sendAll();
-
- //
- // Recv side
- //
-
- while (_dispatcher->numPending() > 0) {
- // Get the response
- responses.push_back(new ConfigResponse());
-
- ConfigResponse& configResponse = *responses.back();
- Status dispatchStatus =
- _dispatcher->recvAny(&configResponse.configHost, &configResponse.response);
-
- if (!dispatchStatus.isOK()) {
- if (dispatchStatus == ErrorCodes::IncompatibleCatalogManager) {
- uassertStatusOK(dispatchStatus);
- }
-
- buildErrorFrom(dispatchStatus, &configResponse.response);
- }
- }
-
- combineResponses(responses, clientResponse);
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/catalog/legacy/config_coordinator.h b/src/mongo/s/catalog/legacy/config_coordinator.h
deleted file mode 100644
index b1a3e18ca88..00000000000
--- a/src/mongo/s/catalog/legacy/config_coordinator.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Copyright (C) 2013 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <vector>
-
-#include "mongo/client/dbclientinterface.h"
-#include "mongo/s/write_ops/batched_command_request.h"
-#include "mongo/s/write_ops/batched_command_response.h"
-
-namespace mongo {
-
-class MultiCommandDispatch;
-
-class ConfigCoordinator {
-public:
- ConfigCoordinator(MultiCommandDispatch* dispatcher,
- const ConnectionString& configServerConnectionString);
-
- void executeBatch(const BatchedCommandRequest& request, BatchedCommandResponse* response);
-
-private:
- /**
- * Initialize configDB string in config server or if already initialized,
- * check that it matches. Returns false if an error occured.
- */
- bool _checkConfigString(BatchedCommandResponse* clientResponse);
-
-
- // Not owned here
- MultiCommandDispatch* const _dispatcher;
-
- const ConnectionString _configServerConnectionString;
-};
-}
diff --git a/src/mongo/s/catalog/legacy/distlock.cpp b/src/mongo/s/catalog/legacy/distlock.cpp
deleted file mode 100644
index c3cbe9e7158..00000000000
--- a/src/mongo/s/catalog/legacy/distlock.cpp
+++ /dev/null
@@ -1,783 +0,0 @@
-/* Copyright 2009 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/catalog/legacy/distlock.h"
-
-#include "mongo/client/dbclientcursor.h"
-#include "mongo/client/connpool.h"
-#include "mongo/s/catalog/type_locks.h"
-#include "mongo/s/catalog/type_lockpings.h"
-#include "mongo/util/fail_point_service.h"
-#include "mongo/util/log.h"
-#include "mongo/util/timer.h"
-
-namespace mongo {
-
-MONGO_FP_DECLARE(setSCCCDistLockTimeout);
-
-using std::endl;
-using std::list;
-using std::set;
-using std::string;
-using std::stringstream;
-using std::unique_ptr;
-using std::vector;
-
-LabeledLevel DistributedLock::logLvl(1);
-DistributedLock::LastPings DistributedLock::lastPings;
-
-LockException::LockException(StringData msg, int code) : LockException(msg, code, OID()) {}
-
-LockException::LockException(StringData msg, int code, DistLockHandle lockID)
- : DBException(msg.toString(), code), _mustUnlockID(lockID) {}
-
-DistLockHandle LockException::getMustUnlockID() const {
- return _mustUnlockID;
-}
-
-/**
- * Create a new distributed lock, potentially with a custom sleep and takeover time. If a custom
- * sleep time is specified (time between pings)
- */
-DistributedLock::DistributedLock(const ConnectionString& conn,
- const string& name,
- const std::string& processId,
- unsigned long long lockTimeout)
- : _conn(conn),
- _name(name),
- _processId(processId),
- _lockId(str::stream() << _processId << ":" << getThreadName() << ":" << rand()),
- _lockTimeout(lockTimeout == 0 ? LOCK_TIMEOUT : lockTimeout),
- _maxClockSkew(_lockTimeout / LOCK_SKEW_FACTOR),
- _maxNetSkew(_maxClockSkew),
- _lockPing(_maxClockSkew) {
- LOG(logLvl) << "created new distributed lock for " << name << " on " << conn
- << " ( lock timeout : " << _lockTimeout << ", ping interval : " << _lockPing
- << ", processId: " << _processId << ", lockId: " << _lockId << " )";
-}
-
-DistLockPingInfo DistributedLock::LastPings::getLastPing(const ConnectionString& conn,
- const string& lockName) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _lastPings[std::make_pair(conn.toString(), lockName)];
-}
-
-void DistributedLock::LastPings::setLastPing(const ConnectionString& conn,
- const string& lockName,
- const DistLockPingInfo& pd) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _lastPings[std::make_pair(conn.toString(), lockName)] = pd;
-}
-
-bool DistributedLock::isRemoteTimeSkewed() const {
- return !DistributedLock::checkSkew(_conn, NUM_LOCK_SKEW_CHECKS, _maxClockSkew, _maxNetSkew);
-}
-
-const ConnectionString& DistributedLock::getRemoteConnection() const {
- return _conn;
-}
-
-const string& DistributedLock::getProcessId() const {
- return _processId;
-}
-
-const string& DistributedLock::getDistLockId() const {
- return _lockId;
-}
-
-/**
- * Returns the remote time as reported by the cluster or server. The maximum difference between the
- * reported time and the actual time on the remote server (at the completion of the function) is the
- * maxNetSkew
- */
-Date_t DistributedLock::remoteTime(const ConnectionString& cluster, unsigned long long maxNetSkew) {
- ConnectionString server(*cluster.getServers().begin());
-
- // Get result and delay if successful, errMsg if not
- bool success = false;
- BSONObj result;
- string errMsg;
- Milliseconds delay{0};
-
- unique_ptr<ScopedDbConnection> connPtr;
- try {
- connPtr.reset(new ScopedDbConnection(server.toString()));
- ScopedDbConnection& conn = *connPtr;
-
- Date_t then = jsTime();
- success = conn->runCommand(string("admin"), BSON("serverStatus" << 1), result);
- delay = jsTime() - then;
-
- if (!success)
- errMsg = result.toString();
- conn.done();
- } catch (const DBException& ex) {
- if (connPtr && connPtr->get()->isFailed()) {
- // Return to the pool so the pool knows about the failure
- connPtr->done();
- }
-
- success = false;
- errMsg = ex.toString();
- }
-
- if (!success) {
- throw TimeNotFoundException(str::stream() << "could not get status from server "
- << server.toString() << " in cluster "
- << cluster.toString() << " to check time"
- << causedBy(errMsg),
- 13647);
- }
-
- // Make sure that our delay is not more than 2x our maximum network skew, since this is the max
- // our remote time value can be off by if we assume a response in the middle of the delay.
- if (delay > Milliseconds(maxNetSkew * 2)) {
- throw TimeNotFoundException(
- str::stream() << "server " << server.toString() << " in cluster " << cluster.toString()
- << " did not respond within max network delay of " << maxNetSkew << "ms",
- 13648);
- }
-
- return result["localTime"].Date() - (delay / 2);
-}
-
-bool DistributedLock::checkSkew(const ConnectionString& cluster,
- unsigned skewChecks,
- unsigned long long maxClockSkew,
- unsigned long long maxNetSkew) {
- vector<HostAndPort> servers = cluster.getServers();
-
- if (servers.size() < 1)
- return true;
-
- vector<long long> avgSkews;
-
- for (unsigned i = 0; i < skewChecks; i++) {
- // Find the average skew for each server
- unsigned s = 0;
- for (vector<HostAndPort>::iterator si = servers.begin(); si != servers.end(); ++si, s++) {
- if (i == 0)
- avgSkews.push_back(0);
-
- // Could check if this is self, but shouldn't matter since local network connection
- // should be fast.
- ConnectionString server(*si);
-
- vector<long long> skew;
-
- BSONObj result;
-
- Date_t remote = remoteTime(server, maxNetSkew);
- Date_t local = jsTime();
-
- // Remote time can be delayed by at most MAX_NET_SKEW
-
- // Skew is how much time we'd have to add to local to get to remote
- avgSkews[s] += durationCount<Milliseconds>(remote - local);
-
- LOG(logLvl + 1) << "skew from remote server " << server
- << " found: " << (remote - local);
- }
- }
-
- // Analyze skews
-
- long long serverMaxSkew = 0;
- long long serverMinSkew = 0;
-
- for (unsigned s = 0; s < avgSkews.size(); s++) {
- long long avgSkew = (avgSkews[s] /= skewChecks);
-
- // Keep track of max and min skews
- if (s == 0) {
- serverMaxSkew = avgSkew;
- serverMinSkew = avgSkew;
- } else {
- if (avgSkew > serverMaxSkew)
- serverMaxSkew = avgSkew;
- if (avgSkew < serverMinSkew)
- serverMinSkew = avgSkew;
- }
- }
-
- long long totalSkew = serverMaxSkew - serverMinSkew;
-
- // Make sure our max skew is not more than our pre-set limit
- if (totalSkew > (long long)maxClockSkew) {
- LOG(logLvl + 1) << "total clock skew of " << totalSkew << "ms for servers " << cluster
- << " is out of " << maxClockSkew << "ms bounds." << endl;
- return false;
- }
-
- LOG(logLvl + 1) << "total clock skew of " << totalSkew << "ms for servers " << cluster
- << " is in " << maxClockSkew << "ms bounds." << endl;
- return true;
-}
-
-Status DistributedLock::checkStatus(double timeout) {
- BSONObj lockObj;
- try {
- ScopedDbConnection conn(_conn.toString(), timeout);
- lockObj = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))).getOwned();
- conn.done();
- } catch (DBException& e) {
- return e.toStatus();
- }
-
- if (lockObj.isEmpty()) {
- return Status(ErrorCodes::LockFailed,
- str::stream() << "no lock for " << _name
- << " exists in the locks collection");
- }
-
- if (lockObj[LocksType::state()].numberInt() < LocksType::LOCKED) {
- return Status(ErrorCodes::LockFailed,
- str::stream() << "lock " << _name << " current state is not held ("
- << lockObj[LocksType::state()].numberInt() << ")");
- }
-
- if (lockObj[LocksType::process()].String() != _processId) {
- return Status(ErrorCodes::LockFailed,
- str::stream() << "lock " << _name << " is currently being held by "
- << "another process (" << lockObj[LocksType::process()].String()
- << ")");
- }
-
- return Status::OK();
-}
-
-static void logErrMsgOrWarn(StringData messagePrefix,
- StringData lockName,
- StringData errMsg,
- StringData altErrMsg) {
- if (errMsg.empty()) {
- LOG(DistributedLock::logLvl - 1) << messagePrefix << " '" << lockName << "' " << altErrMsg
- << std::endl;
- } else {
- warning() << messagePrefix << " '" << lockName << "' " << causedBy(errMsg.toString());
- }
-}
-
-// Semantics of this method are basically that if the lock cannot be acquired, returns false,
-// can be retried. If the lock should not be tried again (some unexpected error),
-// a LockException is thrown.
-bool DistributedLock::lock_try(const OID& lockID,
- const string& why,
- BSONObj* other,
- double timeout) {
- // This should always be true, if not, we are using the lock incorrectly.
- verify(_name != "");
-
- auto lockTimeout = _lockTimeout;
- MONGO_FAIL_POINT_BLOCK(setSCCCDistLockTimeout, customTimeout) {
- const BSONObj& data = customTimeout.getData();
- lockTimeout = data["timeoutMs"].numberInt();
- }
- LOG(logLvl) << "trying to acquire new distributed lock for " << _name << " on " << _conn
- << " ( lock timeout : " << lockTimeout << ", ping interval : " << _lockPing
- << ", process : " << _processId << " )" << endl;
-
- // write to dummy if 'other' is null
- BSONObj dummyOther;
- if (other == NULL)
- other = &dummyOther;
-
- ScopedDbConnection conn(_conn.toString(), timeout);
-
- BSONObjBuilder queryBuilder;
- queryBuilder.append(LocksType::name(), _name);
- queryBuilder.append(LocksType::state(), LocksType::UNLOCKED);
-
- {
- // make sure its there so we can use simple update logic below
- BSONObj o = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name))).getOwned();
-
- // Case 1: No locks
- if (o.isEmpty()) {
- try {
- LOG(logLvl) << "inserting initial doc in " << LocksType::ConfigNS << " for lock "
- << _name << endl;
- conn->insert(LocksType::ConfigNS,
- BSON(LocksType::name(_name) << LocksType::state(LocksType::UNLOCKED)
- << LocksType::who("")
- << LocksType::lockID(OID())));
- } catch (UserException& e) {
- warning() << "could not insert initial doc for distributed lock " << _name
- << causedBy(e) << endl;
- }
- }
-
- // Case 2: A set lock that we might be able to force
- else if (o[LocksType::state()].numberInt() > LocksType::UNLOCKED) {
- string lockName =
- o[LocksType::name()].String() + string("/") + o[LocksType::process()].String();
-
- BSONObj lastPing = conn->findOne(
- LockpingsType::ConfigNS, o[LocksType::process()].wrap(LockpingsType::process()));
- if (lastPing.isEmpty()) {
- LOG(logLvl) << "empty ping found for process in lock '" << lockName << "'" << endl;
- // TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then,
- // so will a lot.
- lastPing = BSON(LockpingsType::process(o[LocksType::process()].String())
- << LockpingsType::ping(Date_t()));
- }
-
- unsigned long long elapsed = 0;
- unsigned long long takeover = lockTimeout;
-
- DistLockPingInfo lastPingEntry = getLastPing();
-
- LOG(logLvl) << "checking last ping for lock '" << lockName << "' against process "
- << lastPingEntry.processId << " and ping " << lastPingEntry.lastPing;
-
- try {
- Date_t remote = remoteTime(_conn);
-
- auto pingDocProcessId = lastPing[LockpingsType::process()].String();
- auto pingDocPingValue = lastPing[LockpingsType::ping()].Date();
-
- // Timeout the elapsed time using comparisons of remote clock
- // For non-finalized locks, timeout 15 minutes since last seen (ts)
- // For finalized locks, timeout 15 minutes since last ping
- bool recPingChange = o[LocksType::state()].numberInt() == LocksType::LOCKED &&
- (lastPingEntry.processId != pingDocProcessId ||
- lastPingEntry.lastPing != pingDocPingValue);
- bool recTSChange = lastPingEntry.lockSessionId != o[LocksType::lockID()].OID();
-
- if (recPingChange || recTSChange) {
- // If the ping has changed since we last checked, mark the current date and time
- setLastPing(DistLockPingInfo(pingDocProcessId,
- pingDocPingValue,
- remote,
- o[LocksType::lockID()].OID(),
- OID()));
- } else {
- // GOTCHA! Due to network issues, it is possible that the current time
- // is less than the remote time. We *have* to check this here, otherwise
- // we overflow and our lock breaks.
- if (lastPingEntry.configLocalTime >= remote)
- elapsed = 0;
- else
- elapsed =
- durationCount<Milliseconds>(remote - lastPingEntry.configLocalTime);
- }
- } catch (LockException& e) {
- // Remote server cannot be found / is not responsive
- warning() << "Could not get remote time from " << _conn << causedBy(e);
- // If our config server is having issues, forget all the pings until we can see it
- // again
- resetLastPing();
- }
-
- if (elapsed <= takeover) {
- LOG(1) << "could not force lock '" << lockName << "' because elapsed time "
- << elapsed << " <= takeover time " << takeover;
- *other = o;
- other->getOwned();
- conn.done();
- return false;
- }
-
- LOG(0) << "forcing lock '" << lockName << "' because elapsed time " << elapsed
- << " > takeover time " << takeover;
-
- if (elapsed > takeover) {
- // Lock may forced, reset our timer if succeeds or fails
- // Ensures that another timeout must happen if something borks up here, and resets
- // our pristine ping state if acquired.
- resetLastPing();
-
- try {
- // Check the clock skew again. If we check this before we get a lock
- // and after the lock times out, we can be pretty sure the time is
- // increasing at the same rate on all servers and therefore our
- // timeout is accurate
- if (isRemoteTimeSkewed()) {
- string msg(str::stream() << "remote time in cluster " << _conn.toString()
- << " is now skewed, cannot force lock.");
- throw LockException(msg, ErrorCodes::DistributedClockSkewed);
- }
-
- // Make sure we break the lock with the correct "ts" (OID) value, otherwise
- // we can overwrite a new lock inserted in the meantime.
- conn->update(LocksType::ConfigNS,
- BSON(LocksType::name(_name)
- << LocksType::state() << o[LocksType::state()].numberInt()
- << LocksType::lockID(o[LocksType::lockID()].OID())),
- BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED))));
-
- BSONObj err = conn->getLastErrorDetailed();
- string errMsg = DBClientWithCommands::getLastErrorString(err);
-
- // TODO: Clean up all the extra code to exit this method, probably with a
- // refactor
- if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) {
- logErrMsgOrWarn(
- "Could not force lock", lockName, errMsg, "(another force won");
- *other = o;
- other->getOwned();
- conn.done();
- return false;
- }
-
- } catch (UpdateNotTheSame&) {
- // Ok to continue since we know we forced at least one lock document, and all
- // lock docs are required for a lock to be held.
- warning() << "lock forcing " << lockName << " inconsistent" << endl;
- } catch (const LockException&) {
- // Let the exception go up and don't repackage the exception.
- throw;
- } catch (std::exception& e) {
- conn.done();
- string msg(str::stream() << "exception forcing distributed lock " << lockName
- << causedBy(e));
- throw LockException(msg, 13660);
- }
-
- } else {
- // Not strictly necessary, but helpful for small timeouts where thread
- // scheduling is significant. This ensures that two attempts are still
- // required for a force if not acquired, and resets our state if we
- // are acquired.
- resetLastPing();
-
- // Test that the lock is held by trying to update the finalized state of the lock to
- // the same state if it does not update or does not update on all servers, we can't
- // re-enter.
- try {
- // Test the lock with the correct "ts" (OID) value
- conn->update(LocksType::ConfigNS,
- BSON(LocksType::name(_name)
- << LocksType::state(LocksType::LOCKED)
- << LocksType::lockID(o[LocksType::lockID()].OID())),
- BSON("$set" << BSON(LocksType::state(LocksType::LOCKED))));
-
- BSONObj err = conn->getLastErrorDetailed();
- string errMsg = DBClientWithCommands::getLastErrorString(err);
-
- // TODO: Clean up all the extra code to exit this method, probably with a
- // refactor
- if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) {
- logErrMsgOrWarn(
- "Could not re-enter lock", lockName, errMsg, "(not sure lock is held");
- *other = o;
- other->getOwned();
- conn.done();
- return false;
- }
-
- } catch (UpdateNotTheSame&) {
- // NOT ok to continue since our lock isn't held by all servers, so isn't valid.
- warning() << "inconsistent state re-entering lock, lock " << lockName
- << " not held" << endl;
- *other = o;
- other->getOwned();
- conn.done();
- return false;
- } catch (std::exception& e) {
- conn.done();
- string msg(str::stream() << "exception re-entering distributed lock "
- << lockName << causedBy(e));
- throw LockException(msg, 13660);
- }
-
- LOG(logLvl - 1) << "re-entered distributed lock '" << lockName << "'" << endl;
- *other = o.getOwned();
- conn.done();
- return true;
- }
-
- LOG(logLvl - 1) << "lock '" << lockName << "' successfully forced" << endl;
-
- // We don't need the ts value in the query, since we will only ever replace locks with
- // state=0.
- }
- // Case 3: We have an expired lock
- else if (o[LocksType::lockID()].type()) {
- queryBuilder.append(o[LocksType::lockID()]);
- }
- }
-
- // Always reset our ping if we're trying to get a lock, since getting a lock implies the lock
- // state is open and no locks need to be forced. If anything goes wrong, we don't want to
- // remember an old lock.
- resetLastPing();
-
- bool gotLock = false;
- BSONObj currLock;
-
- BSONObj lockDetails =
- BSON(LocksType::state(LocksType::LOCK_PREP)
- << LocksType::who(getDistLockId()) << LocksType::process(_processId)
- << LocksType::when(jsTime()) << LocksType::why(why) << LocksType::lockID(lockID));
- BSONObj whatIWant = BSON("$set" << lockDetails);
-
- BSONObj query = queryBuilder.obj();
-
- string lockName = _name + string("/") + _processId;
-
- try {
- // Main codepath to acquire lock
-
- LOG(logLvl) << "about to acquire distributed lock '" << lockName << "'";
-
- LOG(logLvl + 1) << "trying to acquire lock " << query.toString(false, true)
- << " with details " << lockDetails.toString(false, true) << endl;
-
- conn->update(LocksType::ConfigNS, query, whatIWant);
-
- BSONObj err = conn->getLastErrorDetailed();
- string errMsg = DBClientWithCommands::getLastErrorString(err);
-
- currLock = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name)));
-
- if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) {
- logErrMsgOrWarn("could not acquire lock", lockName, errMsg, "(another update won)");
- *other = currLock;
- other->getOwned();
- gotLock = false;
- } else {
- gotLock = true;
- }
-
- } catch (UpdateNotTheSame& up) {
- // this means our update got through on some, but not others
- warning() << "distributed lock '" << lockName << " did not propagate properly."
- << causedBy(up) << endl;
-
- // Overall protection derives from:
- // All unlocking updates use the ts value when setting state to 0
- // This ensures that during locking, we can override all smaller ts locks with
- // our own safe ts value and not be unlocked afterward.
- for (unsigned i = 0; i < up.size(); i++) {
- ScopedDbConnection indDB(up[i].first);
- BSONObj indUpdate;
-
- try {
- indUpdate = indDB->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name)));
- const auto currentLockID = indUpdate[LocksType::lockID()].OID();
- // If we override this lock in any way, grab and protect it.
- // We assume/ensure that if a process does not have all lock documents, it is no
- // longer holding the lock.
- // Note - finalized locks may compete too, but we know they've won already if
- // competing in this round. Cleanup of crashes during finalizing may take a few
- // tries.
- if (currentLockID < lockID ||
- indUpdate[LocksType::state()].numberInt() == LocksType::UNLOCKED) {
- BSONObj grabQuery =
- BSON(LocksType::name(_name) << LocksType::lockID(currentLockID));
-
- // Change ts so we won't be forced, state so we won't be relocked
- BSONObj grabChanges =
- BSON(LocksType::lockID(lockID) << LocksType::state(LocksType::LOCK_PREP));
-
- // Either our update will succeed, and we'll grab the lock, or it will fail b/c
- // some other process grabbed the lock (which will change the ts), but the lock
- // will be set until forcing
- indDB->update(LocksType::ConfigNS, grabQuery, BSON("$set" << grabChanges));
-
- indUpdate = indDB->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name)));
-
- // The tournament was interfered and it is not safe to proceed further.
- // One case this could happen is when the LockPinger processes old
- // entries from addUnlockOID. See SERVER-10688 for more detailed
- // description of race.
- if (indUpdate[LocksType::state()].numberInt() <= LocksType::UNLOCKED) {
- LOG(logLvl - 1) << "lock tournament interrupted, "
- << "so no lock was taken; "
- << "new state of lock: " << indUpdate << endl;
-
- // We now break and set our currLock lockID value to zero, so that
- // we know that we did not acquire the lock below. Later code will
- // cleanup failed entries.
- currLock = BSON(LocksType::lockID(OID()));
- indDB.done();
- break;
- }
- }
- // else our lock is the same, in which case we're safe, or it's a bigger lock,
- // in which case we won't need to protect anything since we won't have the lock.
-
- } catch (std::exception& e) {
- conn.done();
- string msg(str::stream() << "distributed lock " << lockName
- << " had errors communicating with individual server "
- << up[1].first << causedBy(e));
- throw LockException(msg, 13661, lockID);
- }
-
- verify(!indUpdate.isEmpty());
-
- // Find max TS value
- if (currLock.isEmpty() ||
- currLock[LocksType::lockID()] < indUpdate[LocksType::lockID()]) {
- currLock = indUpdate.getOwned();
- }
-
- indDB.done();
- }
-
- // Locks on all servers are now set and safe until forcing
-
- if (currLock[LocksType::lockID()].OID() == lockID) {
- LOG(logLvl - 1) << "lock update won, completing lock propagation for '" << lockName
- << "'" << endl;
- gotLock = true;
- } else {
- LOG(logLvl - 1) << "lock update lost, lock '" << lockName << "' not propagated."
- << endl;
- gotLock = false;
- }
- } catch (std::exception& e) {
- conn.done();
- string msg(str::stream() << "exception creating distributed lock " << lockName
- << causedBy(e));
- throw LockException(msg, 13663, lockID);
- }
-
- // Complete lock propagation
- if (gotLock) {
- // This is now safe, since we know that no new locks will be placed on top of the ones we've
- // checked for at least 15 minutes. Sets the state = 2, so that future clients can
- // determine that the lock is truly set. The invariant for rollbacks is that we will never
- // force locks with state = 2 and active pings, since that indicates the lock is active, but
- // this means the process creating/destroying them must explicitly poll when something goes
- // wrong.
- try {
- BSONObjBuilder finalLockDetails;
- BSONObjIterator bi(lockDetails);
- while (bi.more()) {
- BSONElement el = bi.next();
- if ((string)(el.fieldName()) == LocksType::state())
- finalLockDetails.append(LocksType::state(), LocksType::LOCKED);
- else
- finalLockDetails.append(el);
- }
-
- conn->update(LocksType::ConfigNS,
- BSON(LocksType::name(_name) << LocksType::state()
- << BSON("$eq" << LocksType::LOCK_PREP)),
- BSON("$set" << finalLockDetails.obj()));
-
- BSONObj err = conn->getLastErrorDetailed();
- string errMsg = DBClientWithCommands::getLastErrorString(err);
-
- currLock = conn->findOne(LocksType::ConfigNS, BSON(LocksType::name(_name)));
-
- if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ||
- currLock[LocksType::lockID()].OID() != lockID) {
- warning() << "could not finalize winning lock " << lockName
- << (!errMsg.empty() ? causedBy(errMsg) : " (did not update lock) ")
- << ", winning lock: " << currLock;
- gotLock = false;
- } else {
- // SUCCESS!
- gotLock = true;
- }
-
- } catch (std::exception& e) {
- conn.done();
- string msg(str::stream() << "exception finalizing winning lock" << causedBy(e));
- // Inform caller about the potential orphan lock.
- throw LockException(msg, 13662, lockID);
- }
- }
-
- *other = currLock;
- other->getOwned();
-
- // Log our lock results
- if (gotLock)
- LOG(logLvl - 1) << "distributed lock '" << lockName << "' acquired for '" << why
- << "', ts : " << currLock[LocksType::lockID()].OID();
- else
- LOG(logLvl - 1) << "distributed lock '" << lockName << "' was not acquired.";
-
- conn.done();
-
- return gotLock;
-}
-
-// This function *must not* throw exceptions, since it can be used in destructors - failure
-// results in queuing and trying again later.
-bool DistributedLock::unlock(const DistLockHandle& lockID) {
- verify(_name != "");
- string lockName = _name + string("/") + _processId;
-
- const int maxAttempts = 3;
- int attempted = 0;
-
- while (++attempted <= maxAttempts) {
- // Awkward, but necessary since the constructor itself throws exceptions
- unique_ptr<ScopedDbConnection> connPtr;
-
- try {
- connPtr.reset(new ScopedDbConnection(_conn.toString()));
- ScopedDbConnection& conn = *connPtr;
-
- // Use ts when updating lock, so that new locks can be sure they won't get trampled.
- conn->update(LocksType::ConfigNS,
- BSON(LocksType::name(_name) << LocksType::lockID(lockID)),
- BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED))));
-
- // Check that the lock was actually unlocked... if not, try again
- BSONObj err = conn->getLastErrorDetailed();
- string errMsg = DBClientWithCommands::getLastErrorString(err);
-
- if (!errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1) {
- warning() << "distributed lock unlock update failed, retrying "
- << (errMsg.empty() ? causedBy("( update not registered )")
- : causedBy(errMsg)) << endl;
- conn.done();
- continue;
- }
-
- LOG(0) << "distributed lock '" << lockName << "' unlocked. ";
- conn.done();
- return true;
- } catch (const UpdateNotTheSame&) {
- LOG(0) << "distributed lock '" << lockName << "' unlocked (messily). ";
- // This isn't a connection problem, so don't throw away the conn
- connPtr->done();
- return true;
- } catch (std::exception& e) {
- warning() << "distributed lock '" << lockName << "' failed unlock attempt."
- << causedBy(e) << endl;
-
- // TODO: If our lock timeout is small, sleeping this long may be unsafe.
- if (attempted != maxAttempts)
- sleepsecs(1 << attempted);
- }
- }
-
- return false;
-}
-}
diff --git a/src/mongo/s/catalog/legacy/distlock.h b/src/mongo/s/catalog/legacy/distlock.h
deleted file mode 100644
index 4247d057d2a..00000000000
--- a/src/mongo/s/catalog/legacy/distlock.h
+++ /dev/null
@@ -1,240 +0,0 @@
-// distlock.h
-
-/* Copyright 2009 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#pragma once
-
-#include "mongo/client/connpool.h"
-#include "mongo/client/syncclusterconnection.h"
-#include "mongo/logger/labeled_level.h"
-#include "mongo/s/catalog/dist_lock_manager.h"
-#include "mongo/s/catalog/dist_lock_ping_info.h"
-
-namespace mongo {
-
-namespace {
-
-enum TimeConstants {
- LOCK_TIMEOUT = 15 * 60 * 1000,
- LOCK_SKEW_FACTOR = 30,
- LOCK_PING = LOCK_TIMEOUT / LOCK_SKEW_FACTOR,
- MAX_LOCK_NET_SKEW = LOCK_TIMEOUT / LOCK_SKEW_FACTOR,
- MAX_LOCK_CLOCK_SKEW = LOCK_TIMEOUT / LOCK_SKEW_FACTOR,
- NUM_LOCK_SKEW_CHECKS = 3,
-};
-
-// The maximum clock skew we need to handle between config servers is
-// 2 * MAX_LOCK_NET_SKEW + MAX_LOCK_CLOCK_SKEW.
-
-// Net effect of *this* clock being slow is effectively a multiplier on the max net skew
-// and a linear increase or decrease of the max clock skew.
-}
-
-/**
- * Exception class to encapsulate exceptions while managing distributed locks
- */
-class LockException : public DBException {
-public:
- LockException(StringData msg, int code);
-
- /**
- * Use this to signal that a lock with lockID needs to be unlocked. For example, in cases
- * where the final lock acquisition was not propagated properly to all config servers.
- */
- LockException(StringData msg, int code, DistLockHandle lockID);
-
- /**
- * Returns the OID of the lock that needs to be unlocked.
- */
- DistLockHandle getMustUnlockID() const;
-
- virtual ~LockException() = default;
-
-private:
- // The identifier of a lock that needs to be unlocked.
- DistLockHandle _mustUnlockID;
-};
-
-/**
- * Indicates an error in retrieving time values from remote servers.
- */
-class TimeNotFoundException : public LockException {
-public:
- TimeNotFoundException(const char* msg, int code) : LockException(msg, code) {}
- TimeNotFoundException(const std::string& msg, int code) : LockException(msg, code) {}
- virtual ~TimeNotFoundException() = default;
-};
-
-/**
- * The distributed lock is a configdb backed way of synchronizing system-wide tasks. A task
- * must be identified by a unique name across the system (e.g., "balancer"). A lock is taken
- * by writing a document in the configdb's locks collection with that name.
- *
- * To be maintained, each taken lock needs to be revalidated ("pinged") within a
- * pre-established amount of time. This class does this maintenance automatically once a
- * DistributedLock object was constructed. The ping procedure records the local time to
- * the ping document, but that time is untrusted and is only used as a point of reference
- * of whether the ping was refreshed or not. Ultimately, the clock a configdb is the source
- * of truth when determining whether a ping is still fresh or not. This is achieved by
- * (1) remembering the ping document time along with config server time when unable to
- * take a lock, and (2) ensuring all config servers report similar times and have similar
- * time rates (the difference in times must start and stay small).
- *
- * Lock states include:
- * 0: unlocked
- * 1: about to be locked
- * 2: locked
- *
- * Valid state transitions:
- * 0 -> 1
- * 1 -> 2
- * 2 -> 0
- *
- * Note that at any point in time, a lock can be force unlocked if the ping for the lock
- * becomes too stale.
- */
-class DistributedLock {
-public:
- static logger::LabeledLevel logLvl;
-
- class LastPings {
- public:
- DistLockPingInfo getLastPing(const ConnectionString& conn, const std::string& lockName);
- void setLastPing(const ConnectionString& conn,
- const std::string& lockName,
- const DistLockPingInfo& pd);
-
- stdx::mutex _mutex;
- std::map<std::pair<std::string, std::string>, DistLockPingInfo> _lastPings;
- };
-
- static LastPings lastPings;
-
- /**
- * The constructor does not connect to the configdb yet and constructing does not mean the lock
- * was acquired. Construction does trigger a lock "pinging" mechanism, though.
- *
- * @param conn address of config(s) server(s)
- * @param name identifier for the lock
- * @param processId this process's distributed lock process ID
- * @param lockTimeout how long can the log go "unpinged" before a new attempt to lock steals it
- * (in minutes).
- *
- */
- DistributedLock(const ConnectionString& conn,
- const std::string& name,
- const std::string& processId,
- unsigned long long lockTimeout = 0);
- ~DistributedLock(){};
-
- /**
- * Attempts to acquire 'this' lock, checking if it could or should be stolen from the previous
- * holder. Please consider using the dist_lock_try construct to acquire this lock in an
- * exception safe way.
- *
- * @param lockID the lockID to use for acquiring the lock.
- * @param why human readable description of why the lock is being taken (used to log)
- * @param other configdb's lock document that is currently holding the lock, if lock is taken,
- * or our own lock details if not
- * @return true if it managed to grab the lock
- */
- bool lock_try(const OID& lockID,
- const std::string& why,
- BSONObj* other = 0,
- double timeout = 0.0);
-
- /**
- * Returns OK if this lock is held (but does not guarantee that this owns it) and
- * it was possible to confirm that, within 'timeout' seconds, if provided, with the
- * config servers.
- */
- Status checkStatus(double timeout);
-
- /**
- * Releases a previously taken lock. Returns true on success.
- */
- bool unlock(const OID& lockID);
-
- bool isRemoteTimeSkewed() const;
-
- const std::string& getProcessId() const;
-
- const std::string& getDistLockId() const;
-
- const ConnectionString& getRemoteConnection() const;
-
- /**
- * Checks the skew among a cluster of servers and returns true if the min and max clock
- * times among the servers are within maxClockSkew.
- */
- static bool checkSkew(const ConnectionString& cluster,
- unsigned skewChecks = NUM_LOCK_SKEW_CHECKS,
- unsigned long long maxClockSkew = MAX_LOCK_CLOCK_SKEW,
- unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW);
-
- /**
- * Get the remote time from a server or cluster
- */
- static Date_t remoteTime(const ConnectionString& cluster,
- unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW);
-
- /**
- * Namespace for lock pings
- */
- static const std::string lockPingNS;
-
- /**
- * Namespace for locks
- */
- static const std::string locksNS;
-
- const ConnectionString _conn;
- const std::string _name;
- const std::string _processId;
- const std::string _lockId;
-
- // Timeout for lock, usually LOCK_TIMEOUT
- const unsigned long long _lockTimeout;
- const unsigned long long _maxClockSkew;
- const unsigned long long _maxNetSkew;
- const unsigned long long _lockPing;
-
-private:
- void resetLastPing() {
- lastPings.setLastPing(_conn, _name, DistLockPingInfo());
- }
-
- void setLastPing(const DistLockPingInfo& pd) {
- lastPings.setLastPing(_conn, _name, pd);
- }
-
- DistLockPingInfo getLastPing() {
- return lastPings.getLastPing(_conn, _name);
- }
-};
-}
diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp b/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp
deleted file mode 100644
index 4b3eb1134fa..00000000000
--- a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.cpp
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/catalog/legacy/legacy_dist_lock_manager.h"
-
-#include "mongo/s/catalog/type_locks.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/log.h"
-#include "mongo/util/timer.h"
-
-namespace mongo {
-
-using std::string;
-using std::unique_ptr;
-using stdx::chrono::milliseconds;
-
-
-namespace {
-const stdx::chrono::seconds kDefaultSocketTimeout(30);
-const milliseconds kDefaultPingInterval(30 * 1000);
-} // unnamed namespace
-
-bool LegacyDistLockManager::_pingerEnabled = true;
-
-LegacyDistLockManager::LegacyDistLockManager(ConnectionString configServer,
- const std::string& processId)
- : _configServer(std::move(configServer)), _processId(processId), _isStopped(false) {}
-
-void LegacyDistLockManager::startUp() {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- invariant(!_pinger);
- _pinger = stdx::make_unique<LegacyDistLockPinger>();
-
- if (_pingerEnabled) {
- uassertStatusOK(_pinger->startup(_configServer, _processId, kDefaultPingInterval));
- }
-}
-
-void LegacyDistLockManager::shutDown(OperationContext* txn, bool allowNetworking) {
- stdx::unique_lock<stdx::mutex> sl(_mutex);
- _isStopped = true;
-
- while (!_lockMap.empty()) {
- _noLocksCV.wait(sl);
- }
-
- if (_pinger) {
- _pinger->shutdown(allowNetworking);
- }
-}
-
-std::string LegacyDistLockManager::getProcessID() {
- return _processId;
-}
-
-StatusWith<DistLockManager::ScopedDistLock> LegacyDistLockManager::lock(
- OperationContext* txn,
- StringData name,
- StringData whyMessage,
- milliseconds waitFor,
- milliseconds lockTryInterval) {
- auto distLock = stdx::make_unique<DistributedLock>(_configServer, name.toString(), _processId);
-
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
-
- if (_isStopped) {
- return Status(ErrorCodes::LockBusy, "legacy distlock manager is stopped");
- }
- }
-
- auto lastStatus =
- Status(ErrorCodes::LockBusy, str::stream() << "timed out waiting for " << name);
-
- Timer timer;
- Timer msgTimer;
- while (waitFor <= milliseconds::zero() || milliseconds(timer.millis()) < waitFor) {
- bool acquired = false;
- BSONObj lockDoc;
- OID lockID(OID::gen());
-
- try {
- acquired = distLock->lock_try(lockID,
- whyMessage.toString(),
- &lockDoc,
- durationCount<Seconds>(kDefaultSocketTimeout));
-
- if (!acquired) {
- lastStatus = Status(ErrorCodes::LockBusy,
- str::stream() << "Lock for " << whyMessage << " is taken.");
- // cleanup failed attempt to acquire lock.
- _pinger->addUnlockOID(lockID);
- }
- } catch (const LockException& lockExcep) {
- OID needUnlockID(lockExcep.getMustUnlockID());
- if (needUnlockID.isSet()) {
- _pinger->addUnlockOID(needUnlockID);
- }
-
- lastStatus = lockExcep.toStatus();
- } catch (...) {
- lastStatus = exceptionToStatus();
- _pinger->addUnlockOID(lockID);
- }
-
- if (acquired) {
- verify(!lockDoc.isEmpty());
-
- auto locksTypeResult = LocksType::fromBSON(lockDoc);
- if (!locksTypeResult.isOK()) {
- return StatusWith<ScopedDistLock>(
- ErrorCodes::UnsupportedFormat,
- str::stream() << "error while parsing lock document: " << lockDoc << " : "
- << locksTypeResult.getStatus().toString());
- }
- const LocksType& lock = locksTypeResult.getValue();
- dassert(lock.isLockIDSet());
-
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- _lockMap.insert(std::make_pair(lock.getLockID(), std::move(distLock)));
- }
-
- return ScopedDistLock(txn, lock.getLockID(), this);
- }
-
- if (waitFor == milliseconds::zero())
- break;
-
- if (lastStatus != ErrorCodes::LockBusy) {
- return lastStatus;
- }
-
- // Periodically message for debugging reasons
- if (msgTimer.seconds() > 10) {
- log() << "waited " << timer.seconds() << "s for distributed lock " << name << " for "
- << whyMessage << ": " << lastStatus.toString();
-
- msgTimer.reset();
- }
-
- milliseconds timeRemaining =
- std::max(milliseconds::zero(), waitFor - milliseconds(timer.millis()));
- sleepFor(std::min(lockTryInterval, timeRemaining));
- }
-
- return lastStatus;
-}
-
-void LegacyDistLockManager::unlock(OperationContext* txn,
- const DistLockHandle& lockHandle) BOOST_NOEXCEPT {
- unique_ptr<DistributedLock> distLock;
-
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- auto iter = _lockMap.find(lockHandle);
- invariant(iter != _lockMap.end());
-
- distLock = std::move(iter->second);
- _lockMap.erase(iter);
- }
-
- if (!distLock->unlock(lockHandle)) {
- _pinger->addUnlockOID(lockHandle);
- }
-
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (_lockMap.empty()) {
- _noLocksCV.notify_all();
- }
- }
-}
-
-Status LegacyDistLockManager::checkStatus(OperationContext* txn, const DistLockHandle& lockHandle) {
- // Note: this should not happen when locks are managed through ScopedDistLock.
- if (_pinger->willUnlockOID(lockHandle)) {
- return Status(ErrorCodes::LockFailed,
- str::stream() << "lock " << lockHandle << " is not held and "
- << "is currently being scheduled for lazy unlock");
- }
-
- DistributedLock* distLock = nullptr;
-
- {
- // Assumption: lockHandles are never shared across threads.
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- auto iter = _lockMap.find(lockHandle);
- invariant(iter != _lockMap.end());
-
- distLock = iter->second.get();
- }
-
- return distLock->checkStatus(durationCount<Seconds>(kDefaultSocketTimeout));
-}
-
-void LegacyDistLockManager::unlockAll(OperationContext* txn, const std::string& processID) {
- fassertFailed(34367); // Only supported for CSRS
-}
-}
diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h b/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h
deleted file mode 100644
index 3f0d00db115..00000000000
--- a/src/mongo/s/catalog/legacy/legacy_dist_lock_manager.h
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <map>
-#include <memory>
-
-#include "mongo/bson/oid.h"
-#include "mongo/client/connection_string.h"
-#include "mongo/s/catalog/dist_lock_manager.h"
-#include "mongo/s/catalog/legacy/distlock.h"
-#include "mongo/s/catalog/legacy/legacy_dist_lock_pinger.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/stdx/thread.h"
-
-namespace mongo {
-
-class DistributedLock;
-
-class LegacyDistLockManager : public DistLockManager {
-public:
- explicit LegacyDistLockManager(ConnectionString configServer, const std::string& processId);
-
- virtual ~LegacyDistLockManager() = default;
-
- virtual void startUp() override;
- virtual void shutDown(OperationContext* txn, bool allowNetworking) override;
-
- virtual std::string getProcessID() override;
-
- virtual StatusWith<DistLockManager::ScopedDistLock> lock(
- OperationContext* txn,
- StringData name,
- StringData whyMessage,
- stdx::chrono::milliseconds waitFor,
- stdx::chrono::milliseconds lockTryInterval) override;
-
- virtual void unlockAll(OperationContext* txn, const std::string& processID) override;
-
- // For testing only. Must be called before any calls to startUp().
- static void disablePinger() {
- _pingerEnabled = false;
- }
-
-protected:
- virtual void unlock(OperationContext* txn,
- const DistLockHandle& lockHandle) BOOST_NOEXCEPT override;
-
- virtual Status checkStatus(OperationContext* txn, const DistLockHandle& lockHandle) override;
-
-private:
- const ConnectionString _configServer;
-
- // Identifier of this process for determining ownership of distributed locks.
- std::string _processId;
-
- stdx::mutex _mutex;
- stdx::condition_variable _noLocksCV;
- std::map<DistLockHandle, std::unique_ptr<DistributedLock>> _lockMap;
-
- std::unique_ptr<LegacyDistLockPinger> _pinger;
-
- bool _isStopped;
- static bool _pingerEnabled;
-};
-}
diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp b/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp
deleted file mode 100644
index 951c48d67cf..00000000000
--- a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.cpp
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/catalog/legacy/legacy_dist_lock_pinger.h"
-
-#include "mongo/client/connpool.h"
-#include "mongo/s/catalog/legacy/distlock.h"
-#include "mongo/s/catalog/type_lockpings.h"
-#include "mongo/s/catalog/type_locks.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/exit.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-
-using std::string;
-
-namespace {
-string pingThreadId(const ConnectionString& conn, const string& processId) {
- return conn.toString() + "/" + processId;
-}
-}
-
-void LegacyDistLockPinger::_distLockPingThread(ConnectionString addr,
- const string& process,
- Milliseconds sleepTime) {
- setThreadName("LockPinger");
-
- string pingId = pingThreadId(addr, process);
-
- LOG(0) << "creating distributed lock ping thread for " << addr << " and process " << process
- << " (sleeping for " << sleepTime << ")";
-
- static int loops = 0;
- Date_t lastPingTime = jsTime();
- while (!shouldStopPinging(addr, process)) {
- LOG(3) << "distributed lock pinger '" << pingId << "' about to ping.";
-
- Date_t pingTime;
-
- try {
- ScopedDbConnection conn(addr.toString(), 30.0);
-
- pingTime = jsTime();
- const auto elapsed = pingTime - lastPingTime;
- if (elapsed > 10 * sleepTime) {
- warning() << "Lock pinger for addr: " << addr << ", proc: " << process
- << " was inactive for " << elapsed;
- }
-
- lastPingTime = pingTime;
-
- // Refresh the entry corresponding to this process in the lockpings collection.
- conn->update(LockpingsType::ConfigNS,
- BSON(LockpingsType::process(process)),
- BSON("$set" << BSON(LockpingsType::ping(pingTime))),
- true);
-
- string err = conn->getLastError();
- if (!err.empty()) {
- warning() << "pinging failed for distributed lock pinger '" << pingId << "'."
- << causedBy(err);
- conn.done();
-
- if (!shouldStopPinging(addr, process)) {
- waitTillNextPingTime(sleepTime);
- }
- continue;
- }
-
- // Remove really old entries from the lockpings collection if they're not
- // holding a lock. This may happen if an instance of a process was taken down
- // and no new instance came up to replace it for a quite a while.
- // NOTE this is NOT the same as the standard take-over mechanism, which forces
- // the lock entry.
- BSONObj fieldsToReturn = BSON(LocksType::state() << 1 << LocksType::process() << 1);
- auto activeLocks = conn->query(LocksType::ConfigNS,
- BSON(LocksType::state() << NE << LocksType::UNLOCKED));
-
- uassert(16060,
- str::stream() << "cannot query locks collection on config server "
- << conn.getHost(),
- activeLocks.get());
-
- std::set<string> pids;
- while (activeLocks->more()) {
- BSONObj lock = activeLocks->nextSafe();
-
- if (!lock[LocksType::process()].eoo()) {
- pids.insert(lock[LocksType::process()].str());
- } else {
- warning() << "found incorrect lock document during lock ping cleanup: "
- << lock.toString();
- }
- }
-
- // This can potentially delete ping entries that are actually active (if the clock
- // of another pinger is too skewed). This is still fine as the lock logic only
- // checks if there is a change in the ping document and the document going away
- // is a valid change.
- Date_t fourDays = pingTime - stdx::chrono::hours{4 * 24};
- conn->remove(LockpingsType::ConfigNS,
- BSON(LockpingsType::process() << NIN << pids << LockpingsType::ping() << LT
- << fourDays));
- err = conn->getLastError();
-
- if (!err.empty()) {
- warning() << "ping cleanup for distributed lock pinger '" << pingId << " failed."
- << causedBy(err);
- conn.done();
-
- if (!shouldStopPinging(addr, process)) {
- waitTillNextPingTime(sleepTime);
- }
- continue;
- }
-
- LOG(1 - (loops % 10 == 0 ? 1 : 0)) << "cluster " << addr << " pinged successfully at "
- << pingTime << " by distributed lock pinger '"
- << pingId << "', sleeping for " << sleepTime;
-
- // Remove old locks, if possible
- // Make sure no one else is adding to this list at the same time
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- int numOldLocks = _unlockList.size();
- if (numOldLocks > 0) {
- LOG(0) << "trying to delete " << _unlockList.size()
- << " old lock entries for process " << process;
- }
-
- bool removed = false;
- for (auto iter = _unlockList.begin(); iter != _unlockList.end();
- iter = (removed ? _unlockList.erase(iter) : ++iter)) {
- removed = false;
- try {
- // Got DistLockHandle from lock, so we don't need to specify _id again
- conn->update(LocksType::ConfigNS,
- BSON(LocksType::lockID(*iter)),
- BSON("$set" << BSON(LocksType::state(LocksType::UNLOCKED))));
-
- // Either the update went through or it didn't,
- // either way we're done trying to unlock.
- LOG(0) << "handled late remove of old distributed lock with ts " << *iter;
- removed = true;
- } catch (UpdateNotTheSame&) {
- LOG(0) << "partially removed old distributed lock with ts " << *iter;
- removed = true;
- } catch (std::exception& e) {
- warning() << "could not remove old distributed lock with ts " << *iter
- << causedBy(e);
- }
- }
-
- if (numOldLocks > 0 && _unlockList.size() > 0) {
- LOG(0) << "not all old lock entries could be removed for process " << process;
- }
-
- conn.done();
-
- } catch (std::exception& e) {
- warning() << "distributed lock pinger '" << pingId
- << "' detected an exception while pinging." << causedBy(e);
- }
-
- if (!shouldStopPinging(addr, process)) {
- waitTillNextPingTime(sleepTime);
- }
- }
-
- warning() << "removing distributed lock ping thread '" << pingId << "'";
-
- if (shouldStopPinging(addr, process)) {
- acknowledgeStopPing(addr, process);
- }
-}
-
-void LegacyDistLockPinger::distLockPingThread(ConnectionString addr,
- long long clockSkew,
- const std::string& processId,
- stdx::chrono::milliseconds sleepTime) {
- try {
- jsTimeVirtualThreadSkew(clockSkew);
- _distLockPingThread(addr, processId, sleepTime);
- } catch (std::exception& e) {
- error() << "unexpected error while running distributed lock pinger for " << addr
- << ", process " << processId << causedBy(e);
- } catch (...) {
- error() << "unknown error while running distributed lock pinger for " << addr
- << ", process " << processId;
- }
-}
-
-Status LegacyDistLockPinger::startup(const ConnectionString& configServerConnectionString,
- const std::string& processID,
- Milliseconds sleepTime) {
- string pingID = pingThreadId(configServerConnectionString, processID);
-
- {
- // Make sure we don't start multiple threads for a process id.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (_inShutdown) {
- return Status(ErrorCodes::ShutdownInProgress, "shutting down, will not start ping");
- }
-
- // Ignore if we already have a pinging thread for this process.
- if (_seen.count(pingID) > 0) {
- return Status::OK();
- }
-
- stdx::thread thread(stdx::bind(&LegacyDistLockPinger::distLockPingThread,
- this,
- configServerConnectionString,
- getJSTimeVirtualThreadSkew(),
- processID,
- sleepTime));
- _pingThreads.insert(std::make_pair(pingID, std::move(thread)));
-
- _seen.insert(pingID);
- }
-
- return Status::OK();
-}
-
-void LegacyDistLockPinger::addUnlockOID(const DistLockHandle& lockID) {
- // Modifying the lock from some other thread
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _unlockList.push_back(lockID);
-}
-
-bool LegacyDistLockPinger::willUnlockOID(const DistLockHandle& lockID) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return find(_unlockList.begin(), _unlockList.end(), lockID) != _unlockList.end();
-}
-
-void LegacyDistLockPinger::stopPing(const ConnectionString& conn, const string& processId) {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- string pingId = pingThreadId(conn, processId);
-
- verify(_seen.count(pingId) > 0);
- _kill.insert(pingId);
- _pingStoppedCV.notify_all();
- }
-}
-
-void LegacyDistLockPinger::shutdown(bool allowNetworking) {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inShutdown = true;
- _allowNetworkingInShutdown = allowNetworking;
- _pingStoppedCV.notify_all();
- }
-
- // Don't grab _mutex, otherwise will deadlock trying to join. Safe to read
- // _pingThreads since it cannot be modified once _shutdown is true.
- for (auto& idToThread : _pingThreads) {
- if (idToThread.second.joinable()) {
- idToThread.second.join();
- }
- }
-}
-
-bool LegacyDistLockPinger::shouldStopPinging(const ConnectionString& conn,
- const string& processId) {
- if (inShutdown()) {
- return true;
- }
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (_inShutdown) {
- return true;
- }
-
- return _kill.count(pingThreadId(conn, processId)) > 0;
-}
-
-void LegacyDistLockPinger::acknowledgeStopPing(const ConnectionString& addr,
- const string& processId) {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- string pingId = pingThreadId(addr, processId);
-
- _kill.erase(pingId);
- _seen.erase(pingId);
-
- if (!_allowNetworkingInShutdown) {
- return;
- }
- }
-
- try {
- ScopedDbConnection conn(addr.toString(), 30.0);
- conn->remove(LockpingsType::ConfigNS, BSON(LockpingsType::process(processId)));
- } catch (const DBException& ex) {
- warning() << "Error encountered while stopping ping on " << processId << causedBy(ex);
- }
-}
-
-void LegacyDistLockPinger::waitTillNextPingTime(stdx::chrono::milliseconds duration) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _pingStoppedCV.wait_for(lk, duration);
-}
-}
diff --git a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h b/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h
deleted file mode 100644
index 1155ca544ca..00000000000
--- a/src/mongo/s/catalog/legacy/legacy_dist_lock_pinger.h
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <list>
-#include <set>
-#include <string>
-
-#include "mongo/base/status.h"
-#include "mongo/client/dbclientinterface.h"
-#include "mongo/s/catalog/dist_lock_manager.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/concurrency/mutex.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-
-class DistributedLock;
-
-class LegacyDistLockPinger {
-public:
- LegacyDistLockPinger() = default;
-
- /**
- * Starts the pinger thread for a given processID.
- * Note: this pinger does not support being started up after it was stopped, either by a call
- * to stopPing or shutdown.
- */
- Status startup(const ConnectionString& configServerConnectionString,
- const std::string& processID,
- Milliseconds sleepTime);
-
- /**
- * Adds a distributed lock that has the given id to the unlock list. The unlock list
- * contains the list of locks that this pinger will repeatedly attempt to unlock until
- * it succeeds.
- */
- void addUnlockOID(const DistLockHandle& lockID);
-
- /**
- * Returns true if the given lock id is currently in the unlock queue.
- */
- bool willUnlockOID(const DistLockHandle& lockID);
-
- /**
- * For testing only: non-blocking call to stop pinging the given process id.
- */
- void stopPing(const ConnectionString& conn, const std::string& processId);
-
- /**
- * Kills all ping threads and wait for them to cleanup.
- */
- void shutdown(bool allowNetworking);
-
-private:
- /**
- * Helper method for calling _distLockPingThread.
- */
- void distLockPingThread(ConnectionString addr,
- long long clockSkew,
- const std::string& processId,
- Milliseconds sleepTime);
-
- /**
- * Function for repeatedly pinging the process id. Also attempts to unlock all the
- * locks in the unlock list.
- */
- void _distLockPingThread(ConnectionString addr,
- const std::string& process,
- Milliseconds sleepTime);
-
- /**
- * Returns true if a request has been made to stop pinging the give process id.
- */
- bool shouldStopPinging(const ConnectionString& conn, const std::string& processId);
-
- /**
- * Acknowledge the stop ping request and performs the necessary cleanup.
- */
- void acknowledgeStopPing(const ConnectionString& conn, const std::string& processId);
-
- /**
- * Blocks until duration has elapsed or if the ping thread is interrupted.
- */
- void waitTillNextPingTime(Milliseconds duration);
-
- //
- // All member variables are labeled with one of the following codes indicating the
- // synchronization rules for accessing them.
- //
- // (M) Must hold _mutex for access.
-
- stdx::mutex _mutex;
-
- // Triggered everytime a pinger thread is stopped.
- stdx::condition_variable _pingStoppedCV; // (M)
-
- // pingID -> thread
- // This can contain multiple elements in tests, but in tne normal case, this will
- // contain only a single element.
- // Note: can be safely read when _inShutdown is true.
- std::map<std::string, stdx::thread> _pingThreads; // (M*)
-
- // Contains the list of process id to stopPing.
- std::set<std::string> _kill; // (M)
-
- // Contains all of the process id to ping.
- std::set<std::string> _seen; // (M)
-
- // Contains all lock ids to keeping on retrying to unlock until success.
- std::list<DistLockHandle> _unlockList; // (M)
-
- bool _inShutdown = false; // (M)
- bool _allowNetworkingInShutdown = true; // (M)
-};
-}
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
index 52feade2277..53a30764490 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
@@ -48,10 +48,6 @@ public:
explicit CatalogManagerReplicaSet(std::unique_ptr<DistLockManager> distLockManager);
virtual ~CatalogManagerReplicaSet();
- ConfigServerMode getMode() override {
- return ConfigServerMode::CSRS;
- }
-
/**
* Safe to call multiple times as long as they
*/
diff --git a/src/mongo/s/client/sharding_connection_hook.cpp b/src/mongo/s/client/sharding_connection_hook.cpp
index 05bf53722a6..7f325611db9 100644
--- a/src/mongo/s/client/sharding_connection_hook.cpp
+++ b/src/mongo/s/client/sharding_connection_hook.cpp
@@ -136,14 +136,13 @@ void ShardingConnectionHook::onCreate(DBClientBase* conn) {
return _shardingRequestMetadataWriter(_shardedConnections, metadataBob, hostStringData);
});
- // For every SCC created, add a hook that will allow fastest-config-first config reads if
- // the appropriate server options are set.
if (conn->type() == ConnectionString::SYNC) {
- SyncClusterConnection* scc = dynamic_cast<SyncClusterConnection*>(conn);
- if (scc) {
- scc->attachQueryHandler(new SCCFastQueryHandler);
- }
- } else if (conn->type() == ConnectionString::MASTER) {
+ throw UserException(ErrorCodes::UnsupportedFormat,
+ str::stream() << "Unrecognized connection string type: " << conn->type()
+ << ".");
+ }
+
+ if (conn->type() == ConnectionString::MASTER) {
BSONObj isMasterResponse;
if (!conn->runCommand("admin", BSON("ismaster" << 1), isMasterResponse)) {
uassertStatusOK(getStatusFromCommandResult(isMasterResponse));
@@ -163,12 +162,6 @@ void ShardingConnectionHook::onCreate(DBClientBase* conn) {
<< ". Expected either 0 or 1",
configServerModeNumber == 0 || configServerModeNumber == 1);
- BSONElement setName = isMasterResponse["setName"];
- status = grid.forwardingCatalogManager()->scheduleReplaceCatalogManagerIfNeeded(
- configServerModeNumber == 0 ? CatalogManager::ConfigServerMode::SCCC
- : CatalogManager::ConfigServerMode::CSRS,
- setName.type() == String ? setName.valueStringData() : StringData(),
- static_cast<DBClientConnection*>(conn)->getServerHostAndPort());
uassertStatusOK(status);
}
}
diff --git a/src/mongo/s/client/sharding_network_connection_hook.cpp b/src/mongo/s/client/sharding_network_connection_hook.cpp
index 6652f6a19e1..0b03b2cdd82 100644
--- a/src/mongo/s/client/sharding_network_connection_hook.cpp
+++ b/src/mongo/s/client/sharding_network_connection_hook.cpp
@@ -58,6 +58,7 @@ Status ShardingNetworkConnectionHook::validateHostImpl(
long long configServerModeNumber;
auto status = bsonExtractIntegerField(isMasterReply.data, "configsvr", &configServerModeNumber);
+ // TODO SERVER-22320 fix should collapse the switch to only NoSuchKey handling
switch (status.code()) {
case ErrorCodes::OK: {
@@ -67,12 +68,7 @@ Status ShardingNetworkConnectionHook::validateHostImpl(
str::stream() << "Surprised to discover that " << remoteHost.toString()
<< " believes it is a config server"};
}
- using ConfigServerMode = CatalogManager::ConfigServerMode;
- const BSONElement setName = isMasterReply.data["setName"];
- return grid.forwardingCatalogManager()->scheduleReplaceCatalogManagerIfNeeded(
- (configServerModeNumber == 0 ? ConfigServerMode::SCCC : ConfigServerMode::CSRS),
- (setName.type() == String ? setName.valueStringData() : StringData()),
- remoteHost);
+ return Status::OK();
}
case ErrorCodes::NoSuchKey: {
// The ismaster response indicates that remoteHost is not a config server, or that
diff --git a/src/mongo/s/commands/cluster_is_master_cmd.cpp b/src/mongo/s/commands/cluster_is_master_cmd.cpp
index 14a697808be..50f70f01352 100644
--- a/src/mongo/s/commands/cluster_is_master_cmd.cpp
+++ b/src/mongo/s/commands/cluster_is_master_cmd.cpp
@@ -30,7 +30,6 @@
#include "mongo/db/commands.h"
#include "mongo/db/wire_version.h"
-#include "mongo/s/catalog/forwarding_catalog_manager.h"
#include "mongo/s/grid.h"
#include "mongo/s/write_ops/batched_command_request.h"
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index 116bfba0139..0efd5353366 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -462,7 +462,7 @@ public:
map<BSONObj, int> chunkSizes;
{
// Take distributed lock to prevent split / migration.
- auto scopedDistLock = grid.forwardingCatalogManager()->distLock(
+ auto scopedDistLock = grid.catalogManager(txn)->distLock(
txn, finalColLong, "mr-post-process", kNoDistLockTimeout);
if (!scopedDistLock.isOK()) {
diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp
index 2f0735c4d1f..9cfc87c7c58 100644
--- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp
@@ -147,8 +147,8 @@ public:
<< " to: " << toShard->toString();
string whyMessage(str::stream() << "Moving primary shard of " << dbname);
- auto scopedDistLock =
- grid.forwardingCatalogManager()->distLock(txn, dbname + "-movePrimary", whyMessage);
+ auto scopedDistLock = grid.catalogManager(txn)->distLock(
+ txn, dbname + "-movePrimary", whyMessage, DistLockManager::kSingleLockAttemptTimeout);
if (!scopedDistLock.isOK()) {
return appendCommandStatus(result, scopedDistLock.getStatus());
@@ -161,8 +161,6 @@ public:
BSONObj moveStartDetails =
_buildMoveEntry(dbname, fromShard->toString(), toShard->toString(), shardedColls);
- uassertStatusOK(scopedDistLock.getValue().checkForPendingCatalogChange());
-
auto catalogManager = grid.catalogManager(txn);
catalogManager->logChange(txn, "movePrimary.start", dbname, moveStartDetails);
@@ -202,8 +200,6 @@ public:
return false;
}
- uassertStatusOK(scopedDistLock.getValue().checkForPendingCatalogChange());
-
const string oldPrimary = fromShard->getConnString().toString();
ScopedDbConnection fromconn(fromShard->getConnString());
@@ -241,7 +237,6 @@ public:
try {
log() << "movePrimary dropping cloned collection " << el.String() << " on "
<< oldPrimary;
- uassertStatusOK(scopedDistLock.getValue().checkForPendingCatalogChange());
fromconn->dropCollection(el.String());
} catch (DBException& e) {
e.addContext(str::stream()
diff --git a/src/mongo/s/d_merge.cpp b/src/mongo/s/d_merge.cpp
index 07aabc79a46..6d3e4235cf2 100644
--- a/src/mongo/s/d_merge.cpp
+++ b/src/mongo/s/d_merge.cpp
@@ -70,7 +70,8 @@ bool mergeChunks(OperationContext* txn,
// Get the distributed lock
string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey << " to "
<< maxKey;
- auto scopedDistLock = grid.forwardingCatalogManager()->distLock(txn, nss.ns(), whyMessage);
+ auto scopedDistLock = grid.catalogManager(txn)->distLock(
+ txn, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout);
if (!scopedDistLock.isOK()) {
*errMsg = stream() << "could not acquire collection lock for " << nss.ns()
diff --git a/src/mongo/s/d_sharding_server_status.cpp b/src/mongo/s/d_sharding_server_status.cpp
index e8e9a2a60f3..dc78d2defa4 100644
--- a/src/mongo/s/d_sharding_server_status.cpp
+++ b/src/mongo/s/d_sharding_server_status.cpp
@@ -67,10 +67,7 @@ BSONObj ShardingServerStatus::generateSection(OperationContext* txn,
BSONObjBuilder result;
result.append("configsvrConnectionString", shardingState->getConfigServer(txn).toString());
- auto catalogManager = grid.catalogManager(txn);
- if (catalogManager->getMode() == CatalogManager::ConfigServerMode::CSRS) {
- grid.shardRegistry()->getConfigOpTime().append(&result, "lastSeenConfigServerOpTime");
- }
+ grid.shardRegistry()->getConfigOpTime().append(&result, "lastSeenConfigServerOpTime");
return result.obj();
}
diff --git a/src/mongo/s/d_split.cpp b/src/mongo/s/d_split.cpp
index e2d06e61853..5fd9af4ce63 100644
--- a/src/mongo/s/d_split.cpp
+++ b/src/mongo/s/d_split.cpp
@@ -652,7 +652,8 @@ public:
const string whyMessage(str::stream() << "splitting chunk [" << min << ", " << max
<< ") in " << nss.toString());
- auto scopedDistLock = grid.forwardingCatalogManager()->distLock(txn, nss.ns(), whyMessage);
+ auto scopedDistLock = grid.catalogManager(txn)->distLock(
+ txn, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout);
if (!scopedDistLock.isOK()) {
errmsg = str::stream() << "could not acquire collection lock for " << nss.toString()
<< " to split chunk [" << min << "," << max << ")"
diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp
index 1407c0edde9..94108ecdb37 100644
--- a/src/mongo/s/grid.cpp
+++ b/src/mongo/s/grid.cpp
@@ -34,7 +34,7 @@
#include "mongo/base/status_with.h"
#include "mongo/s/catalog/catalog_cache.h"
-#include "mongo/s/catalog/forwarding_catalog_manager.h"
+#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/catalog/type_settings.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/util/log.h"
@@ -46,7 +46,7 @@ Grid grid;
Grid::Grid() : _allowLocalShard(true) {}
-void Grid::init(std::unique_ptr<ForwardingCatalogManager> catalogManager,
+void Grid::init(std::unique_ptr<CatalogManager> catalogManager,
std::unique_ptr<ShardRegistry> shardRegistry,
std::unique_ptr<ClusterCursorManager> cursorManager) {
invariant(!_catalogManager);
@@ -136,12 +136,4 @@ void Grid::clearForUnitTests() {
_cursorManager.reset();
}
-ForwardingCatalogManager* Grid::forwardingCatalogManager() {
- return _catalogManager.get();
-}
-
-CatalogManager* Grid::catalogManager(OperationContext* txn) {
- return _catalogManager->getCatalogManagerToUse(txn);
-}
-
} // namespace mongo
diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h
index 6e09dbdc0e1..63e55687392 100644
--- a/src/mongo/s/grid.h
+++ b/src/mongo/s/grid.h
@@ -31,7 +31,7 @@
#include <string>
#include <vector>
-#include "mongo/s/catalog/forwarding_catalog_manager.h"
+#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/stdx/memory.h"
@@ -62,7 +62,7 @@ public:
* NOTE: Unit-tests are allowed to call it more than once, provided they reset the object's
* state using clearForUnitTests.
*/
- void init(std::unique_ptr<ForwardingCatalogManager> catalogManager,
+ void init(std::unique_ptr<CatalogManager> catalogManager,
std::unique_ptr<ShardRegistry> shardRegistry,
std::unique_ptr<ClusterCursorManager> cursorManager);
@@ -97,19 +97,14 @@ public:
* Returns a pointer to a CatalogManager to use for accessing catalog data stored on the config
* servers.
*/
- CatalogManager* catalogManager(OperationContext* txn);
-
- /**
- * Returns a direct pointer to the ForwardingCatalogManager. This should only be used for
- * calling methods that are specific to the ForwardingCatalogManager and not part of the generic
- * CatalogManager interface, such as for taking the distributed lock and scheduling replacement
- * of the underlying CatalogManager that the ForwardingCatalogManager is delegating to.
- */
- ForwardingCatalogManager* forwardingCatalogManager();
+ CatalogManager* catalogManager(OperationContext* txn) {
+ return _catalogManager.get();
+ }
CatalogCache* catalogCache() {
return _catalogCache.get();
}
+
ShardRegistry* shardRegistry() {
return _shardRegistry.get();
}
@@ -128,7 +123,7 @@ public:
void clearForUnitTests();
private:
- std::unique_ptr<ForwardingCatalogManager> _catalogManager;
+ std::unique_ptr<CatalogManager> _catalogManager;
std::unique_ptr<CatalogCache> _catalogCache;
std::unique_ptr<ShardRegistry> _shardRegistry;
std::unique_ptr<ClusterCursorManager> _cursorManager;
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index a3b2c365de4..948e12b2af6 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -132,67 +132,6 @@ std::unique_ptr<LiteParsedQuery> transformQueryForShards(const LiteParsedQuery&
lpq.isAllowPartialResults());
}
-/**
- * Runs a find command against the "config" shard in SyncClusterConnection (SCCC) mode. Special
- * handling is required for SCCC since the config shard's NS targeter is only available if the
- * config servers are in CSRS mode.
- *
- * 'query' is the query to run against the config shard. 'shard' must represent the config shard.
- *
- * On success, fills out 'results' with the documents returned from the config shard and returns the
- * cursor id which should be handed back to the client.
- *
- * TODO: This should not be required for 3.4, since the config server mode must be config server
- * replica set (CSRS) in order to upgrade.
- */
-StatusWith<CursorId> runConfigServerQuerySCCC(const CanonicalQuery& query,
- const Shard& shard,
- std::vector<BSONObj>* results) {
- BSONObj findCommand = query.getParsed().asFindCommand();
-
- // XXX: This is a temporary hack. We use ScopedDbConnection and query the $cmd namespace
- // explicitly because this gives us the particular host that the command ran on via
- // originalHost(). We need to know the host that the remote cursor was established on in order
- // to issue getMore or killCursors operations against this remote cursor.
- ScopedDbConnection conn(shard.getConnString());
- auto cursor = conn->query(str::stream() << query.nss().db() << ".$cmd",
- findCommand,
- -1, // nToReturn
- 0, // nToSkip
- nullptr, // fieldsToReturn
- 0); // options
- if (!cursor || !cursor->more()) {
- return {ErrorCodes::OperationFailed, "failed to run find command against config shard"};
- }
- BSONObj result = cursor->nextSafe().getOwned();
- conn.done();
-
- auto status = Command::getStatusFromCommandResult(result);
- if (ErrorCodes::SendStaleConfig == status || ErrorCodes::RecvStaleConfig == status) {
- throw RecvStaleConfigException("find command failed because of stale config", result);
- }
-
- auto executorPool = grid.shardRegistry()->getExecutorPool();
- auto transformedResult = storePossibleCursor(HostAndPort(cursor->originalHost()),
- result,
- executorPool->getArbitraryExecutor(),
- grid.getCursorManager());
- if (!transformedResult.isOK()) {
- return transformedResult.getStatus();
- }
-
- auto outgoingCursorResponse = CursorResponse::parseFromBSON(transformedResult.getValue());
- if (!outgoingCursorResponse.isOK()) {
- return outgoingCursorResponse.getStatus();
- }
-
- for (const auto& doc : outgoingCursorResponse.getValue().getBatch()) {
- results->push_back(doc.getOwned());
- }
-
- return outgoingCursorResponse.getValue().getCursorId();
-}
-
StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
const CanonicalQuery& query,
const ReadPreferenceSetting& readPref,
@@ -251,21 +190,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
// Use read pref to target a particular host from each shard. Also construct the find command
// that we will forward to each shard.
for (const auto& shard : shards) {
- // The unified targeting logic only works for config server replica sets, so we need special
- // handling for querying config server content with legacy 3-host config servers.
- if (shard->isConfig() && shard->getConnString().type() == ConnectionString::SYNC) {
- invariant(shards.size() == 1U);
- try {
- return runConfigServerQuerySCCC(query, *shard, results);
- } catch (const DBException& e) {
- if (e.getCode() != ErrorCodes::IncompatibleCatalogManager) {
- throw;
- }
- grid.forwardingCatalogManager()->waitForCatalogManagerChange(txn);
- // Fall through to normal code path now that the catalog manager mode has been
- // swapped and the config servers are a normal replica set.
- }
- }
+ invariant(!shard->isConfig() || shard->getConnString().type() != ConnectionString::SYNC);
// Build the find command, and attach shard version if necessary.
BSONObjBuilder cmdBuilder;
diff --git a/src/mongo/s/s_sharding_server_status.cpp b/src/mongo/s/s_sharding_server_status.cpp
index 809521c6fa2..0ef408cef96 100644
--- a/src/mongo/s/s_sharding_server_status.cpp
+++ b/src/mongo/s/s_sharding_server_status.cpp
@@ -65,10 +65,7 @@ BSONObj ShardingServerStatus::generateSection(OperationContext* txn,
result.append("configsvrConnectionString",
grid.shardRegistry()->getConfigServerConnectionString().toString());
- auto catalogManager = grid.catalogManager(txn);
- if (catalogManager->getMode() == CatalogManager::ConfigServerMode::CSRS) {
- grid.shardRegistry()->getConfigOpTime().append(&result, "lastSeenConfigServerOpTime");
- }
+ grid.shardRegistry()->getConfigOpTime().append(&result, "lastSeenConfigServerOpTime");
return result.obj();
}
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 6cb3aff4857..8803ae0e235 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -59,7 +59,7 @@
#include "mongo/db/wire_version.h"
#include "mongo/platform/process_id.h"
#include "mongo/s/balance.h"
-#include "mongo/s/catalog/forwarding_catalog_manager.h"
+#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/sharding_connection_hook.h"
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 7d5da250958..0ad8aa554f5 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -48,11 +48,13 @@
#include "mongo/rpc/metadata/config_server_metadata.h"
#include "mongo/rpc/metadata/metadata_hook.h"
#include "mongo/rpc/metadata/config_server_metadata.h"
-#include "mongo/s/catalog/forwarding_catalog_manager.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/sharding_network_connection_hook.h"
#include "mongo/s/cluster_last_error_info.h"
#include "mongo/s/grid.h"
+#include "mongo/s/catalog/replset/catalog_manager_replica_set.h"
+#include "mongo/s/catalog/replset/dist_lock_catalog_impl.h"
+#include "mongo/s/catalog/replset/replset_dist_lock_manager.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
@@ -68,6 +70,28 @@ using executor::NetworkInterfaceThreadPool;
using executor::TaskExecutorPool;
using executor::ThreadPoolTaskExecutor;
+
+std::unique_ptr<CatalogManager> makeCatalogManager(ServiceContext* service,
+ ShardRegistry* shardRegistry,
+ const HostAndPort& thisHost) {
+ std::unique_ptr<SecureRandom> rng(SecureRandom::create());
+ std::string distLockProcessId = str::stream()
+ << thisHost.toString() << ':'
+ << durationCount<Seconds>(service->getClockSource()->now().toDurationSinceEpoch()) << ':'
+ << static_cast<int32_t>(rng->nextInt64());
+
+ auto distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry);
+ auto distLockManager =
+ stdx::make_unique<ReplSetDistLockManager>(service,
+ distLockProcessId,
+ std::move(distLockCatalog),
+ ReplSetDistLockManager::kDistLockPingInterval,
+ ReplSetDistLockManager::kDistLockExpirationTime);
+
+ return stdx::make_unique<CatalogManagerReplicaSet>(std::move(distLockManager));
+}
+
+
// Same logic as sharding_connection_hook.cpp.
class ShardingEgressMetadataHook final : public rpc::EgressMetadataHook {
public:
@@ -153,6 +177,11 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(std::unique_ptr<NetworkIn
Status initializeGlobalShardingState(OperationContext* txn,
const ConnectionString& configCS,
bool allowNetworking) {
+ if (configCS.type() == ConnectionString::SYNC) {
+ return {ErrorCodes::UnsupportedFormat,
+ "SYNC config server connection string is not allowed."};
+ }
+
SyncClusterConnection::setConnectionValidationHook(
[](const HostAndPort& target, const executor::RemoteCommandResponse& isMasterReply) {
return ShardingNetworkConnectionHook::validateHostImpl(target, isMasterReply);
@@ -170,12 +199,9 @@ Status initializeGlobalShardingState(OperationContext* txn,
"NetworkInterfaceASIO-ShardRegistry-TaskExecutor")),
configCS));
- std::unique_ptr<ForwardingCatalogManager> catalogManager =
- stdx::make_unique<ForwardingCatalogManager>(
- getGlobalServiceContext(),
- configCS,
- shardRegistry.get(),
- HostAndPort(getHostName(), serverGlobalParams.port));
+ auto catalogManager = makeCatalogManager(getGlobalServiceContext(),
+ shardRegistry.get(),
+ HostAndPort(getHostName(), serverGlobalParams.port));
shardRegistry->startup();
grid.init(std::move(catalogManager),
@@ -193,12 +219,6 @@ Status initializeGlobalShardingState(OperationContext* txn,
return Status::OK();
} catch (const DBException& ex) {
Status status = ex.toStatus();
- if (status == ErrorCodes::ConfigServersInconsistent) {
- // Legacy catalog manager can return ConfigServersInconsistent. When that happens
- // we should immediately fail initialization. For all other failures we should
- // retry.
- return status;
- }
if (status == ErrorCodes::ReplicaSetNotFound) {
// ReplicaSetNotFound most likely means we've been waiting for the config replica
// set to come up for so long that the ReplicaSetMonitor stopped monitoring the set.
diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp
index e51df67a8a5..c5bc4158f7d 100644
--- a/src/mongo/s/sharding_test_fixture.cpp
+++ b/src/mongo/s/sharding_test_fixture.cpp
@@ -131,9 +131,7 @@ void ShardingTestFixture::setUp() {
// For now initialize the global grid object. All sharding objects will be accessible
// from there until we get rid of it.
- auto shardRegistryPtr = shardRegistry.get();
- grid.init(stdx::make_unique<ForwardingCatalogManager>(
- _service.get(), std::move(cm), shardRegistryPtr, HostAndPort("somehost")),
+ grid.init(std::move(cm),
std::move(shardRegistry),
stdx::make_unique<ClusterCursorManager>(_service->getClockSource()));
}
diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp
index 7f1a9f48c9d..5914fc7b460 100644
--- a/src/mongo/s/strategy.cpp
+++ b/src/mongo/s/strategy.cpp
@@ -200,7 +200,6 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) {
return;
int loops = 5;
- bool cmChangeAttempted = false;
while (true) {
try {
@@ -254,20 +253,13 @@ void Strategy::clientCommandOp(OperationContext* txn, Request& request) {
if (loops < 4)
versionManager.forceRemoteCheckShardVersionCB(txn, staleNS);
} catch (const DBException& e) {
- if (e.getCode() == ErrorCodes::IncompatibleCatalogManager) {
- fassert(28791, !cmChangeAttempted);
- cmChangeAttempted = true;
-
- grid.forwardingCatalogManager()->waitForCatalogManagerChange(txn);
- } else {
- OpQueryReplyBuilder reply;
- {
- BSONObjBuilder builder(reply.bufBuilderForResults());
- Command::appendCommandStatus(builder, e.toStatus());
- }
- reply.sendCommandReply(request.p(), request.m());
- return;
+ OpQueryReplyBuilder reply;
+ {
+ BSONObjBuilder builder(reply.bufBuilderForResults());
+ Command::appendCommandStatus(builder, e.toStatus());
}
+ reply.sendCommandReply(request.p(), request.m());
+ return;
}
}
}