summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2015-09-10 20:03:28 -0400
committerSpencer T Brody <spencer@mongodb.com>2015-09-17 11:55:12 -0400
commit0e0a4e5e8a70ce1f6208c613d2897186a0b8a1c3 (patch)
treee57ce15fe586d5a64d41d6e3e405240b23765c3b /src
parente44bf3dc351624ff26968a8006dfa385ffc83516 (diff)
downloadmongo-0e0a4e5e8a70ce1f6208c613d2897186a0b8a1c3.tar.gz
SERVER-19855 Advance catalog manager optime on shards based on information sent from mongos
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/dbcommands.cpp43
-rw-r--r--src/mongo/db/query/lite_parsed_query.cpp4
-rw-r--r--src/mongo/db/s/set_shard_version_command.cpp19
-rw-r--r--src/mongo/db/s/sharding_state.cpp5
-rw-r--r--src/mongo/db/s/sharding_state.h10
-rw-r--r--src/mongo/s/catalog/catalog_manager.h9
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.cpp2
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.h2
-rw-r--r--src/mongo/s/catalog/forwarding_catalog_manager.cpp8
-rw-r--r--src/mongo/s/catalog/forwarding_catalog_manager.h2
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp4
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.h2
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp4
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.h2
-rw-r--r--src/mongo/s/query/cluster_find.cpp1
15 files changed, 104 insertions, 13 deletions
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index 62899dda392..6bf243d8822 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -1251,10 +1251,45 @@ void Command::execCommand(OperationContext* txn,
CurOp::get(txn)->setMaxTimeMicros(static_cast<unsigned long long>(maxTimeMS) * 1000);
- // Handle shard version that may have been sent along with the command.
- OperationShardVersion::get(txn).initializeFromCommand(
- NamespaceString(command->parseNs(dbname, request.getCommandArgs())),
- request.getCommandArgs());
+ {
+ // Handle shard version and config optime information that may have been sent along with
+ // the command.
+ auto& operationShardVersion = OperationShardVersion::get(txn);
+ invariant(!operationShardVersion.hasShardVersion());
+
+ auto commandNS = NamespaceString(command->parseNs(dbname, request.getCommandArgs()));
+ operationShardVersion.initializeFromCommand(commandNS, request.getCommandArgs());
+
+ auto optimeStatus = repl::OpTime::parseFromBSON(request.getCommandArgs());
+ if (optimeStatus.isOK()) {
+ auto shardingState = ShardingState::get(txn);
+ if (shardingState->enabled()) {
+ // TODO(spencer): Do this unconditionally once all nodes are sharding aware
+ // by default.
+ shardingState->advanceConfigOpTime(txn, optimeStatus.getValue());
+ } else {
+ massert(
+ 28807,
+ "Received a command with sharding chunk information but this node is not "
+ "sharding aware",
+ command->name == "setShardVersion");
+ }
+ } else if (optimeStatus != ErrorCodes::NoSuchKey) {
+ uassertStatusOK(optimeStatus.getStatus());
+ } else {
+ // If there was top-level shard version information then there must have been
+ // config optime information as well. a 3.0 mongos won't have shard version info
+ // at the top level (they have it in a nested "metadata" field) so it won't cause
+ // a problem here.
+ massert(28813,
+ str::stream()
+ << "Received command with chunk version information but no config "
+ "server optime: " << request.getCommandArgs().jsonString(),
+ !operationShardVersion.hasShardVersion() ||
+ ChunkVersion::isIgnoredVersion(
+ operationShardVersion.getShardVersion(commandNS)));
+ }
+ }
// Can throw
txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
diff --git a/src/mongo/db/query/lite_parsed_query.cpp b/src/mongo/db/query/lite_parsed_query.cpp
index 9adfa9c4c90..b7b6068ae33 100644
--- a/src/mongo/db/query/lite_parsed_query.cpp
+++ b/src/mongo/db/query/lite_parsed_query.cpp
@@ -96,7 +96,7 @@ const char kAwaitDataField[] = "awaitData";
const char kPartialResultsField[] = "allowPartialResults";
const char kTermField[] = "term";
const char kOptionsField[] = "options";
-const char kShardVersionField[] = "shardVersion";
+const char kConfigOpTimeField[] = "ts";
} // namespace
@@ -356,6 +356,8 @@ StatusWith<unique_ptr<LiteParsedQuery>> LiteParsedQuery::makeFromFindCommand(Nam
continue;
} else if (str::equals(fieldName, kShardVersionField)) {
// Shard version parsing is handled elsewhere.
+ } else if (str::equals(fieldName, kConfigOpTimeField)) {
+ // Config server optime parsing is handled along with shard versioning elsewhere.
} else if (str::equals(fieldName, kTermField)) {
Status status = checkFieldType(el, NumberLong);
if (!status.isOK()) {
diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp
index 7700c86eac5..8b0381ce98e 100644
--- a/src/mongo/db/s/set_shard_version_command.cpp
+++ b/src/mongo/db/s/set_shard_version_command.cpp
@@ -98,6 +98,8 @@ public:
Client* client = txn->getClient();
LastError::get(client).disable();
+ ShardingState* shardingState = ShardingState::get(txn);
+
const bool authoritative = cmdObj.getBoolField("authoritative");
const bool noConnectionVersioning = cmdObj.getBoolField("noConnectionVersioning");
@@ -119,7 +121,7 @@ public:
if (cmdObj["shard"].type() == String) {
// The shard host is also sent when using setShardVersion, report this host if there is
// an error
- ShardingState::get(txn)->setShardName(cmdObj["shard"].String());
+ shardingState->setShardName(cmdObj["shard"].String());
}
// Handle initial shard connection
@@ -153,9 +155,11 @@ public:
uassertStatusOK(ChunkVersionAndOpTime::parseFromBSONForSetShardVersion(cmdObj));
const auto& version = verAndOpTime.getVersion();
+ shardingState->advanceConfigOpTime(txn, verAndOpTime.getOpTime());
+
// step 3
const ChunkVersion oldVersion = info->getVersion(ns);
- const ChunkVersion globalVersion = ShardingState::get(txn)->getVersion(ns);
+ const ChunkVersion globalVersion = shardingState->getVersion(ns);
oldVersion.addToBSON(result, "oldVersion");
@@ -207,9 +211,9 @@ public:
// TODO: Refactor all of this
if (version < globalVersion && version.hasEqualEpoch(globalVersion)) {
- while (ShardingState::get(txn)->inCriticalMigrateSection()) {
+ while (shardingState->inCriticalMigrateSection()) {
log() << "waiting till out of critical section";
- ShardingState::get(txn)->waitTillNotInCriticalSection(10);
+ shardingState->waitTillNotInCriticalSection(10);
}
errmsg = str::stream() << "shard global version for collection is higher "
<< "than trying to set to '" << ns << "'";
@@ -223,9 +227,9 @@ public:
if (!globalVersion.isSet() && !authoritative) {
// Needed b/c when the last chunk is moved off a shard,
// the version gets reset to zero, which should require a reload.
- while (ShardingState::get(txn)->inCriticalMigrateSection()) {
+ while (shardingState->inCriticalMigrateSection()) {
log() << "waiting till out of critical section";
- ShardingState::get(txn)->waitTillNotInCriticalSection(10);
+ shardingState->waitTillNotInCriticalSection(10);
}
// need authoritative for first look
@@ -239,8 +243,7 @@ public:
}
ChunkVersion currVersion;
- Status status =
- ShardingState::get(txn)->refreshMetadataIfNeeded(txn, ns, version, &currVersion);
+ Status status = shardingState->refreshMetadataIfNeeded(txn, ns, version, &currVersion);
if (!status.isOK()) {
// The reload itself was interrupted or confused here
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 2c712868faf..e20ad57b4ee 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/client.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/metadata_loader.h"
#include "mongo/db/s/operation_shard_version.h"
@@ -190,6 +191,10 @@ void ShardingState::setShardName(const string& name) {
}
}
+void ShardingState::advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) {
+ grid.catalogManager(txn)->advanceConfigOpTime(txn, opTime);
+}
+
void ShardingState::clearCollectionMetadata() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_collMetadata.clear();
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index bdb3736787a..986d1f20286 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -52,6 +52,10 @@ class OperationContext;
class ServiceContext;
class Status;
+namespace repl {
+class OpTime;
+} // namespace repl
+
/**
* Represents the sharding state for the running instance. One per instance.
*/
@@ -101,6 +105,12 @@ public:
void setShardName(const std::string& shardName);
/**
+ * Causes the catalog manager to advance its optime so subsequent reads from the config servers
+ * see the latest data.
+ */
+ void advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime);
+
+ /**
* Clears the collection metadata cache after step down.
*/
void clearCollectionMetadata();
diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h
index 66025e65c5a..8b796582cb6 100644
--- a/src/mongo/s/catalog/catalog_manager.h
+++ b/src/mongo/s/catalog/catalog_manager.h
@@ -105,6 +105,15 @@ public:
virtual void shutDown(OperationContext* txn, bool allowNetworking = true) = 0;
/**
+ * If the newly specified optime is newer than the one the catalog manager already knows, the
+ * one in the catalog manager will be advanced. Otherwise, it remains the same.
+ *
+ * This method is only applicable to catalog managers, which support the OpTime concept (such as
+ * the replica set-based implementation) and is a no-op otherwise.
+ */
+ virtual void advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) = 0;
+
+ /**
* Returns what type of catalog manager this is - CSRS for the CatalogManagerReplicaSet and
* SCCC for the CatalogManagerLegacy.
*/
diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp
index 03df26db390..f8de3dd049f 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/catalog_manager_mock.cpp
@@ -51,6 +51,8 @@ Status CatalogManagerMock::startup(OperationContext* txn, bool allowNetworking)
void CatalogManagerMock::shutDown(OperationContext* txn, bool allowNetworking) {}
+void CatalogManagerMock::advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) {}
+
Status CatalogManagerMock::shardCollection(OperationContext* txn,
const string& ns,
const ShardKeyPattern& fieldsAndOrder,
diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h
index 98350b8abe1..b8939ed764b 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.h
+++ b/src/mongo/s/catalog/catalog_manager_mock.h
@@ -49,6 +49,8 @@ public:
void shutDown(OperationContext* txn, bool allowNetworking) override;
+ void advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) override;
+
Status shardCollection(OperationContext* txn,
const std::string& ns,
const ShardKeyPattern& fieldsAndOrder,
diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.cpp b/src/mongo/s/catalog/forwarding_catalog_manager.cpp
index 98469e9d32d..a382b5eac52 100644
--- a/src/mongo/s/catalog/forwarding_catalog_manager.cpp
+++ b/src/mongo/s/catalog/forwarding_catalog_manager.cpp
@@ -356,6 +356,14 @@ void ForwardingCatalogManager::shutDown(OperationContext* txn, bool allowNetwork
});
}
+void ForwardingCatalogManager::advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) {
+ retry(txn,
+ [&] {
+ _actual->advanceConfigOpTime(txn, opTime);
+ return 1;
+ });
+}
+
Status ForwardingCatalogManager::enableSharding(OperationContext* txn, const std::string& dbName) {
return retry(txn, [&] { return _actual->enableSharding(txn, dbName); });
}
diff --git a/src/mongo/s/catalog/forwarding_catalog_manager.h b/src/mongo/s/catalog/forwarding_catalog_manager.h
index 2e8d30e481b..47286688e62 100644
--- a/src/mongo/s/catalog/forwarding_catalog_manager.h
+++ b/src/mongo/s/catalog/forwarding_catalog_manager.h
@@ -119,6 +119,8 @@ private:
void shutDown(OperationContext* txn, bool allowNetworking = true) override;
+ void advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) override;
+
Status enableSharding(OperationContext* txn, const std::string& dbName) override;
Status shardCollection(OperationContext* txn,
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
index d1d9376b0d4..28bc0e4c3db 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
@@ -261,6 +261,10 @@ void CatalogManagerLegacy::shutDown(OperationContext* txn, bool allowNetworking)
_distLockManager->shutDown(allowNetworking);
}
+void CatalogManagerLegacy::advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) {
+ invariant(opTime.isNull());
+}
+
Status CatalogManagerLegacy::shardCollection(OperationContext* txn,
const string& ns,
const ShardKeyPattern& fieldsAndOrder,
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
index a9e530c5fc8..2e543212f67 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
@@ -62,6 +62,8 @@ public:
void shutDown(OperationContext* txn, bool allowNetworking) override;
+ void advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) override;
+
Status shardCollection(OperationContext* txn,
const std::string& ns,
const ShardKeyPattern& fieldsAndOrder,
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index 8ae9117c538..3420ab45505 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -131,6 +131,10 @@ void CatalogManagerReplicaSet::shutDown(OperationContext* txn, bool allowNetwork
_distLockManager->shutDown(allowNetworking);
}
+void CatalogManagerReplicaSet::advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) {
+ _updateLastSeenConfigOpTime(opTime);
+}
+
Status CatalogManagerReplicaSet::shardCollection(OperationContext* txn,
const string& ns,
const ShardKeyPattern& fieldsAndOrder,
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
index 88d6ed4a3e7..e41dfc5128c 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
@@ -59,6 +59,8 @@ public:
void shutDown(OperationContext* txn, bool allowNetworking) override;
+ void advanceConfigOpTime(OperationContext* txn, repl::OpTime opTime) override;
+
Status shardCollection(OperationContext* txn,
const std::string& ns,
const ShardKeyPattern& fieldsAndOrder,
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index abc496f7a37..7d3c96ecc4e 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -255,6 +255,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
if (chunkManager) {
auto shardVersion = chunkManager->getVersion(shard->getId());
cmdBuilder.appendArray(LiteParsedQuery::kShardVersionField, shardVersion.toBSON());
+ chunkManager->getConfigOpTime().append(&cmdBuilder);
}
params.remotes.emplace_back(std::move(hostAndPort.getValue()), cmdBuilder.obj());