diff options
author | Spencer T Brody <spencer@mongodb.com> | 2015-09-10 20:03:28 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2015-09-17 11:55:12 -0400 |
commit | 0e0a4e5e8a70ce1f6208c613d2897186a0b8a1c3 (patch) | |
tree | e57ce15fe586d5a64d41d6e3e405240b23765c3b /src | |
parent | e44bf3dc351624ff26968a8006dfa385ffc83516 (diff) | |
download | mongo-0e0a4e5e8a70ce1f6208c613d2897186a0b8a1c3.tar.gz |
SERVER-19855 Advance catalog manager optime on shards based on information sent from mongos
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/dbcommands.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/query/lite_parsed_query.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/set_shard_version_command.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.h | 10 | ||||
-rw-r--r-- | src/mongo/s/catalog/catalog_manager.h | 9 | ||||
-rw-r--r-- | src/mongo/s/catalog/catalog_manager_mock.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog/catalog_manager_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog/forwarding_catalog_manager.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/catalog/forwarding_catalog_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/catalog/legacy/catalog_manager_legacy.h | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set.h | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 1 |
15 files changed, 104 insertions, 13 deletions
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 62899dda392..6bf243d8822 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -1251,10 +1251,45 @@ void Command::execCommand(OperationContext* txn, CurOp::get(txn)->setMaxTimeMicros(static_cast<unsigned long long>(maxTimeMS) * 1000); - // Handle shard version that may have been sent along with the command. - OperationShardVersion::get(txn).initializeFromCommand( - NamespaceString(command->parseNs(dbname, request.getCommandArgs())), - request.getCommandArgs()); + { + // Handle shard version and config optime information that may have been sent along with + // the command. + auto& operationShardVersion = OperationShardVersion::get(txn); + invariant(!operationShardVersion.hasShardVersion()); + + auto commandNS = NamespaceString(command->parseNs(dbname, request.getCommandArgs())); + operationShardVersion.initializeFromCommand(commandNS, request.getCommandArgs()); + + auto optimeStatus = repl::OpTime::parseFromBSON(request.getCommandArgs()); + if (optimeStatus.isOK()) { + auto shardingState = ShardingState::get(txn); + if (shardingState->enabled()) { + // TODO(spencer): Do this unconditionally once all nodes are sharding aware + // by default. + shardingState->advanceConfigOpTime(txn, optimeStatus.getValue()); + } else { + massert( + 28807, + "Received a command with sharding chunk information but this node is not " + "sharding aware", + command->name == "setShardVersion"); + } + } else if (optimeStatus != ErrorCodes::NoSuchKey) { + uassertStatusOK(optimeStatus.getStatus()); + } else { + // If there was top-level shard version information then there must have been + // config optime information as well. a 3.0 mongos won't have shard version info + // at the top level (they have it in a nested "metadata" field) so it won't cause + // a problem here. + massert(28813, + str::stream() + << "Received command with chunk version information but no config " + "server optime: " << request.getCommandArgs().jsonString(), + !operationShardVersion.hasShardVersion() || + ChunkVersion::isIgnoredVersion( + operationShardVersion.getShardVersion(commandNS))); + } + } // Can throw txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. diff --git a/src/mongo/db/query/lite_parsed_query.cpp b/src/mongo/db/query/lite_parsed_query.cpp index 9adfa9c4c90..b7b6068ae33 100644 --- a/src/mongo/db/query/lite_parsed_query.cpp +++ b/src/mongo/db/query/lite_parsed_query.cpp @@ -96,7 +96,7 @@ const char kAwaitDataField[] = "awaitData"; const char kPartialResultsField[] = "allowPartialResults"; const char kTermField[] = "term"; const char kOptionsField[] = "options"; -const char kShardVersionField[] = "shardVersion"; +const char kConfigOpTimeField[] = "ts"; } // namespace @@ -356,6 +356,8 @@ StatusWith<unique_ptr<LiteParsedQuery>> LiteParsedQuery::makeFromFindCommand(Nam continue; } else if (str::equals(fieldName, kShardVersionField)) { // Shard version parsing is handled elsewhere. + } else if (str::equals(fieldName, kConfigOpTimeField)) { + // Config server optime parsing is handled along with shard versioning elsewhere. } else if (str::equals(fieldName, kTermField)) { Status status = checkFieldType(el, NumberLong); if (!status.isOK()) { diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp index 7700c86eac5..8b0381ce98e 100644 --- a/src/mongo/db/s/set_shard_version_command.cpp +++ b/src/mongo/db/s/set_shard_version_command.cpp @@ -98,6 +98,8 @@ public: Client* client = txn->getClient(); LastError::get(client).disable(); + ShardingState* shardingState = ShardingState::get(txn); + const bool authoritative = cmdObj.getBoolField("authoritative"); const bool noConnectionVersioning = cmdObj.getBoolField("noConnectionVersioning"); @@ -119,7 +121,7 @@ public: if (cmdObj["shard"].type() == String) { // The shard host is also sent when using setShardVersion, report this host if there is // an error - ShardingState::get(txn)->setShardName(cmdObj["shard"].String()); + shardingState->setShardName(cmdObj["shard"].String()); } // Handle initial shard connection @@ -153,9 +155,11 @@ public: uassertStatusOK(ChunkVersionAndOpTime::parseFromBSONForSetShardVersion(cmdObj)); const auto& version = verAndOpTime.getVersion(); + shardingState->advanceConfigOpTime(txn, verAndOpTime.getOpTime()); + // step 3 const ChunkVersion oldVersion = info->getVersion(ns); - const ChunkVersion globalVersion = ShardingState::get(txn)->getVersion(ns); + const ChunkVersion globalVersion = shardingState->getVersion(ns); oldVersion.addToBSON(result, "oldVersion"); @@ -207,9 +211,9 @@ public: // TODO: Refactor all of this if (version < globalVersion && version.hasEqualEpoch(globalVersion)) { - while (ShardingState::get(txn)->inCriticalMigrateSection()) { + while (shardingState->inCriticalMigrateSection()) { log() << "waiting till out of critical section"; - ShardingState::get(txn)->waitTillNotInCriticalSection(10); + shardingState->waitTillNotInCriticalSection(10); } errmsg = str::stream() << "shard global version for collection is higher " << "than trying to set to '" << ns << "'"; @@ -223,9 +227,9 @@ public: if (!globalVersion.isSet() && !authoritative) { // Needed b/c when the last chunk is moved off a shard, // the version gets reset to zero, which should require a reload. - while (ShardingState::get(txn)->inCriticalMigrateSection()) { + while (shardingState->inCriticalMigrateSection()) { log() << "waiting till out of critical section"; - ShardingState::get(txn)->waitTillNotInCriticalSection(10); + shardingState->waitTillNotInCriticalSection(10); } // need authoritative for first look @@ -239,8 +243,7 @@ public: } ChunkVersion currVersion; - Status status = - ShardingState::get(txn)->refreshMetadataIfNeeded(txn, ns, version, &currVersion); + Status status = shardingState->refreshMetadataIfNeeded(txn, ns, version, &currVersion); if (!status.isOK()) { // The reload itself was interrupted or confused here diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 2c712868faf..e20ad57b4ee 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -38,6 +38,7 @@ #include "mongo/db/client.h" #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/metadata_loader.h" #include "mongo/db/s/operation_shard_version.h" @@ -190,6 +191,10 @@ void ShardingState::setShardName(const string& name) { } } +void ShardingState::advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) { + grid.catalogManager(txn)->advanceConfigOpTime(txn, opTime); +} + void ShardingState::clearCollectionMetadata() { stdx::lock_guard<stdx::mutex> lk(_mutex); _collMetadata.clear(); diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index bdb3736787a..986d1f20286 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -52,6 +52,10 @@ class OperationContext; class ServiceContext; class Status; +namespace repl { +class OpTime; +} // namespace repl + /** * Represents the sharding state for the running instance. One per instance. */ @@ -101,6 +105,12 @@ public: void setShardName(const std::string& shardName); /** + * Causes the catalog manager to advance its optime so subsequent reads from the config servers + * see the latest data. + */ + void advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime); + + /** * Clears the collection metadata cache after step down. */ void clearCollectionMetadata(); diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h index 66025e65c5a..8b796582cb6 100644 --- a/src/mongo/s/catalog/catalog_manager.h +++ b/src/mongo/s/catalog/catalog_manager.h @@ -105,6 +105,15 @@ public: virtual void shutDown(OperationContext* txn, bool allowNetworking = true) = 0; /** + * If the newly specified optime is newer than the one the catalog manager already knows, the + * one in the catalog manager will be advanced. Otherwise, it remains the same. + * + * This method is only applicable to catalog managers, which support the OpTime concept (such as + * the replica set-based implementation) and is a no-op otherwise. + */ + virtual void advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) = 0; + + /** * Returns what type of catalog manager this is - CSRS for the CatalogManagerReplicaSet and * SCCC for the CatalogManagerLegacy. */ diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp index 03df26db390..f8de3dd049f 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/catalog_manager_mock.cpp @@ -51,6 +51,8 @@ Status CatalogManagerMock::startup(OperationContext* txn, bool allowNetworking) void CatalogManagerMock::shutDown(OperationContext* txn, bool allowNetworking) {} +void CatalogManagerMock::advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) {} + Status CatalogManagerMock::shardCollection(OperationContext* txn, const string& ns, const ShardKeyPattern& fieldsAndOrder, diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h index 98350b8abe1..b8939ed764b 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.h +++ b/src/mongo/s/catalog/catalog_manager_mock.h @@ -49,6 +49,8 @@ public: void shutDown(OperationContext* txn, bool allowNetworking) override; + void advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) override; + Status shardCollection(OperationContext* txn, const std::string& ns, const ShardKeyPattern& fieldsAndOrder, diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.cpp b/src/mongo/s/catalog/forwarding_catalog_manager.cpp index 98469e9d32d..a382b5eac52 100644 --- a/src/mongo/s/catalog/forwarding_catalog_manager.cpp +++ b/src/mongo/s/catalog/forwarding_catalog_manager.cpp @@ -356,6 +356,14 @@ void ForwardingCatalogManager::shutDown(OperationContext* txn, bool allowNetwork }); } +void ForwardingCatalogManager::advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) { + retry(txn, + [&] { + _actual->advanceConfigOpTime(txn, opTime); + return 1; + }); +} + Status ForwardingCatalogManager::enableSharding(OperationContext* txn, const std::string& dbName) { return retry(txn, [&] { return _actual->enableSharding(txn, dbName); }); } diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.h b/src/mongo/s/catalog/forwarding_catalog_manager.h index 2e8d30e481b..47286688e62 100644 --- a/src/mongo/s/catalog/forwarding_catalog_manager.h +++ b/src/mongo/s/catalog/forwarding_catalog_manager.h @@ -119,6 +119,8 @@ private: void shutDown(OperationContext* txn, bool allowNetworking = true) override; + void advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) override; + Status enableSharding(OperationContext* txn, const std::string& dbName) override; Status shardCollection(OperationContext* txn, diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index d1d9376b0d4..28bc0e4c3db 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -261,6 +261,10 @@ void CatalogManagerLegacy::shutDown(OperationContext* txn, bool allowNetworking) _distLockManager->shutDown(allowNetworking); } +void CatalogManagerLegacy::advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) { + invariant(opTime.isNull()); +} + Status CatalogManagerLegacy::shardCollection(OperationContext* txn, const string& ns, const ShardKeyPattern& fieldsAndOrder, diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h index a9e530c5fc8..2e543212f67 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h @@ -62,6 +62,8 @@ public: void shutDown(OperationContext* txn, bool allowNetworking) override; + void advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) override; + Status shardCollection(OperationContext* txn, const std::string& ns, const ShardKeyPattern& fieldsAndOrder, diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 8ae9117c538..3420ab45505 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -131,6 +131,10 @@ void CatalogManagerReplicaSet::shutDown(OperationContext* txn, bool allowNetwork _distLockManager->shutDown(allowNetworking); } +void CatalogManagerReplicaSet::advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) { + _updateLastSeenConfigOpTime(opTime); +} + Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn, const string& ns, const ShardKeyPattern& fieldsAndOrder, diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index 88d6ed4a3e7..e41dfc5128c 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -59,6 +59,8 @@ public: void shutDown(OperationContext* txn, bool allowNetworking) override; + void advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) override; + Status shardCollection(OperationContext* txn, const std::string& ns, const ShardKeyPattern& fieldsAndOrder, diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index abc496f7a37..7d3c96ecc4e 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -255,6 +255,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, if (chunkManager) { auto shardVersion = chunkManager->getVersion(shard->getId()); cmdBuilder.appendArray(LiteParsedQuery::kShardVersionField, shardVersion.toBSON()); + chunkManager->getConfigOpTime().append(&cmdBuilder); } params.remotes.emplace_back(std::move(hostAndPort.getValue()), cmdBuilder.obj()); |