summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2019-07-18 00:23:44 -0400
committerjannaerin <golden.janna@gmail.com>2019-08-08 17:32:26 -0400
commita70f6ddb8817f7b77b4597bba1a854548c4dbf12 (patch)
tree62e2a4ad3a2b179bddadc8331dd63808a430ef5f /src
parentd101a617bada9252a4f0a29b8f615ee62abb979b (diff)
downloadmongo-a70f6ddb8817f7b77b4597bba1a854548c4dbf12.tar.gz
SERVER-41949 Attach the databaseVersion on the write path on mongos"
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp3
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp2
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp14
-rw-r--r--src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp8
-rw-r--r--src/mongo/s/ns_targeter.h28
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp34
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.h12
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp166
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp29
-rw-r--r--src/mongo/s/write_ops/batched_command_request.cpp9
-rw-r--r--src/mongo/s/write_ops/batched_command_request.h15
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.cpp167
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.h17
-rw-r--r--src/mongo/s/write_ops/mock_ns_targeter.h9
-rw-r--r--src/mongo/s/write_ops/write_op.cpp3
15 files changed, 434 insertions, 82 deletions
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index e321fb3cfa6..e5b6d25396b 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -93,7 +93,8 @@ void serializeReply(OperationContext* opCtx,
if (continueOnError && !result.results.empty()) {
const auto& lastResult = result.results.back();
if (lastResult == ErrorCodes::StaleConfig ||
- lastResult == ErrorCodes::CannotImplicitlyCreateCollection) {
+ lastResult == ErrorCodes::CannotImplicitlyCreateCollection ||
+ lastResult == ErrorCodes::StaleDbVersion) {
// For ordered:false commands we need to duplicate these error results for all ops
// after we stopped. See handleError() in write_ops_exec.cpp for more info.
auto err = result.results.back();
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index d648e60239f..d495bb89f35 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -256,7 +256,7 @@ bool handleError(OperationContext* opCtx,
return false;
}
- if (ex.extraInfo<StaleConfigInfo>()) {
+ if (ex.extraInfo<StaleConfigInfo>() || ex.extraInfo<StaleDbRoutingVersion>()) {
if (!opCtx->getClient()->isInDirectClient()) {
auto& oss = OperationShardingState::get(opCtx);
oss.setShardingOperationFailedStatus(ex.toStatus());
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index ea3b28b50a9..ac6198dd79f 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -317,14 +317,22 @@ Status MongoInterfaceStandalone::appendQueryExecStats(OperationContext* opCtx,
}
BSONObj MongoInterfaceStandalone::getCollectionOptions(const NamespaceString& nss) {
- const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
- if (infos.empty()) {
- return BSONObj();
+ std::list<BSONObj> infos;
+
+ try {
+ infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
+ if (infos.empty()) {
+ return BSONObj();
+ }
+ } catch (const DBException& e) {
+ uasserted(ErrorCodes::CommandFailed, e.reason());
}
+
const auto& infoObj = infos.front();
uassert(ErrorCodes::CommandNotSupportedOnView,
str::stream() << nss.toString() << " is a view, not a collection",
infoObj["type"].valueStringData() != "view"_sd);
+
return infoObj.getObjectField("options").getOwned();
}
diff --git a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp
index c75cba94c62..625f62a51f3 100644
--- a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp
+++ b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp
@@ -78,6 +78,14 @@ ScopedOperationCompletionShardingActions::~ScopedOperationCompletionShardingActi
if (!handleMismatchStatus.isOK())
log() << "Failed to handle stale version exception"
<< causedBy(redact(handleMismatchStatus));
+ } else if (auto staleInfo = status->extraInfo<StaleDbRoutingVersion>()) {
+ auto handleMismatchStatus = onDbVersionMismatchNoExcept(_opCtx,
+ staleInfo->getDb(),
+ staleInfo->getVersionReceived(),
+ staleInfo->getVersionWanted());
+ if (!handleMismatchStatus.isOK())
+ log() << "Failed to handle database version exception"
+ << causedBy(redact(handleMismatchStatus));
} else if (auto cannotImplicitCreateCollInfo =
status->extraInfo<CannotImplicitlyCreateCollectionInfo>()) {
if (ShardingState::get(_opCtx)->enabled()) {
diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h
index 92e29547705..a86b5f43737 100644
--- a/src/mongo/s/ns_targeter.h
+++ b/src/mongo/s/ns_targeter.h
@@ -43,17 +43,22 @@ namespace mongo {
class OperationContext;
/**
- * Combines a shard and the version which that shard should be using
+ * Combines a shard, the shard version, and database version that the shard should be using
*/
struct ShardEndpoint {
- ShardEndpoint(const ShardId& shardName, const ChunkVersion& shardVersion)
- : shardName(shardName), shardVersion(shardVersion) {}
+ ShardEndpoint(const ShardId& shardName,
+ const ChunkVersion& shardVersion,
+ const boost::optional<DatabaseVersion> dbVersion = boost::none)
+ : shardName(shardName), shardVersion(shardVersion), databaseVersion(dbVersion) {}
ShardEndpoint(const ShardEndpoint& other)
- : shardName(other.shardName), shardVersion(other.shardVersion) {}
+ : shardName(other.shardName), shardVersion(other.shardVersion) {
+ databaseVersion = other.databaseVersion;
+ }
ShardId shardName;
ChunkVersion shardVersion;
+ boost::optional<DatabaseVersion> databaseVersion;
};
/**
@@ -143,8 +148,19 @@ public:
*
* If stale responses are is noted, we must not have noted that we cannot target.
*/
- virtual void noteStaleResponse(const ShardEndpoint& endpoint,
- const StaleConfigInfo& staleInfo) = 0;
+ virtual void noteStaleShardResponse(const ShardEndpoint& endpoint,
+ const StaleConfigInfo& staleInfo) = 0;
+
+ /**
+ * Informs the targeter of stale db routing version responses for this db from an endpoint,
+ * with further information available in the returned staleInfo.
+ *
+ * Any stale responses noted here will be taken into account on the next refresh.
+ *
+ * If stale responses are is noted, we must not have noted that we cannot target.
+ */
+ virtual void noteStaleDbResponse(const ShardEndpoint& endpoint,
+ const StaleDbRoutingVersion& staleInfo) = 0;
/**
* Refreshes the targeting metadata for the namespace if needed, based on previously-noted
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index b06b0c1c63b..bf1c8a0acd2 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -68,18 +68,28 @@ WriteErrorDetail errorFromStatus(const Status& status) {
return error;
}
-// Helper to note several stale errors from a response
-void noteStaleResponses(const std::vector<ShardError>& staleErrors, NSTargeter* targeter) {
+// Helper to note several stale shard errors from a response
+void noteStaleShardResponses(const std::vector<ShardError>& staleErrors, NSTargeter* targeter) {
for (const auto& error : staleErrors) {
LOG(4) << "Noting stale config response " << error.error.getErrInfo() << " from shard "
<< error.endpoint.shardName;
- targeter->noteStaleResponse(
+ targeter->noteStaleShardResponse(
error.endpoint,
StaleConfigInfo::parseFromCommandError(
error.error.isErrInfoSet() ? error.error.getErrInfo() : BSONObj()));
}
}
+// Helper to note several stale db errors from a response
+void noteStaleDbResponses(const std::vector<ShardError>& staleErrors, NSTargeter* targeter) {
+ for (const auto& error : staleErrors) {
+ LOG(4) << "Noting stale database response " << error.error.toBSON() << " from shard "
+ << error.endpoint.shardName;
+ targeter->noteStaleDbResponse(
+ error.endpoint, StaleDbRoutingVersion::parseFromCommandError(error.error.toBSON()));
+ }
+}
+
bool hasTransientTransactionError(const BatchedCommandResponse& response) {
if (!response.isErrorLabelsSet()) {
return false;
@@ -297,6 +307,7 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
if (responseStatus.isOK()) {
TrackedErrors trackedErrors;
trackedErrors.startTracking(ErrorCodes::StaleShardVersion);
+ trackedErrors.startTracking(ErrorCodes::StaleDbVersion);
trackedErrors.startTracking(ErrorCodes::CannotImplicitlyCreateCollection);
LOG(4) << "Write results received from " << shardHost.toString() << ": "
@@ -329,11 +340,20 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
}
// Note if anything was stale
- const auto& staleErrors =
+ const auto& staleShardErrors =
trackedErrors.getErrors(ErrorCodes::StaleShardVersion);
- if (!staleErrors.empty()) {
- noteStaleResponses(staleErrors, &targeter);
- ++stats->numStaleBatches;
+ const auto& staleDbErrors = trackedErrors.getErrors(ErrorCodes::StaleDbVersion);
+
+ if (!staleShardErrors.empty()) {
+ invariant(staleDbErrors.empty());
+ noteStaleShardResponses(staleShardErrors, &targeter);
+ ++stats->numStaleShardBatches;
+ }
+
+ if (!staleDbErrors.empty()) {
+ invariant(staleShardErrors.empty());
+ noteStaleDbResponses(staleDbErrors, &targeter);
+ ++stats->numStaleDbBatches;
}
const auto& cannotImplicitlyCreateErrors =
diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h
index f61926889b8..c729561630a 100644
--- a/src/mongo/s/write_ops/batch_write_exec.h
+++ b/src/mongo/s/write_ops/batch_write_exec.h
@@ -86,7 +86,11 @@ typedef std::map<ConnectionString, HostOpTime> HostOpTimeMap;
class BatchWriteExecStats {
public:
BatchWriteExecStats()
- : numRounds(0), numTargetErrors(0), numResolveErrors(0), numStaleBatches(0) {}
+ : numRounds(0),
+ numTargetErrors(0),
+ numResolveErrors(0),
+ numStaleShardBatches(0),
+ numStaleDbBatches(0) {}
void noteWriteAt(const HostAndPort& host, repl::OpTime opTime, const OID& electionId);
void noteTargetedShard(const ShardId& shardId);
@@ -102,8 +106,10 @@ public:
int numTargetErrors;
// Number of times host resolution failed
int numResolveErrors;
- // Number of stale batches
- int numStaleBatches;
+ // Number of stale batches due to StaleShardVersion
+ int numStaleShardBatches;
+ // Number of stale batches due to StaleDbVersion
+ int numStaleDbBatches;
private:
std::set<ShardId> _targetedShards;
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index b171341c64c..c04ed84bcad 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -98,6 +98,52 @@ BSONObj expectInsertsReturnStaleVersionErrorsBase(const NamespaceString& nss,
return staleResponse.toBSON();
}
+BSONObj expectInsertsReturnStaleDbVersionErrorsBase(const NamespaceString& nss,
+ const std::vector<BSONObj>& expected,
+ const executor::RemoteCommandRequest& request) {
+ ASSERT_EQUALS(nss.db(), request.dbname);
+
+ const auto opMsgRequest(OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj));
+ const auto actualBatchedInsert(BatchedCommandRequest::parseInsert(opMsgRequest));
+ ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().ns());
+
+ const auto& inserted = actualBatchedInsert.getInsertRequest().getDocuments();
+ ASSERT_EQUALS(expected.size(), inserted.size());
+
+ auto itInserted = inserted.begin();
+ auto itExpected = expected.begin();
+
+ for (; itInserted != inserted.end(); itInserted++, itExpected++) {
+ ASSERT_BSONOBJ_EQ(*itExpected, *itInserted);
+ }
+
+ BSONObjBuilder staleResponse;
+ staleResponse.append("ok", 1);
+ staleResponse.append("n", 0);
+
+ // Report a stale db version error for each write in the batch.
+ int i = 0;
+ std::vector<BSONObj> errors;
+ for (itInserted = inserted.begin(); itInserted != inserted.end(); ++itInserted) {
+ BSONObjBuilder errorBuilder;
+ errorBuilder.append("index", i);
+ errorBuilder.append("code", int(ErrorCodes::StaleDbVersion));
+
+ auto dbVersion = databaseVersion::makeNew();
+ errorBuilder.append("db", nss.db());
+ errorBuilder.append("vReceived", dbVersion.toBSON());
+ errorBuilder.append("vWanted", databaseVersion::makeIncremented(dbVersion).toBSON());
+
+ errorBuilder.append("errmsg", "mock stale db version");
+
+ errors.push_back(errorBuilder.obj());
+ ++i;
+ }
+ staleResponse.append("writeErrors", errors);
+
+ return staleResponse.obj();
+}
+
/**
* Mimics a single shard backend for a particular collection which can be initialized with a
* set of write command results to return.
@@ -174,6 +220,12 @@ public:
});
}
+ virtual void expectInsertsReturnStaleDbVersionErrors(const std::vector<BSONObj>& expected) {
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ return expectInsertsReturnStaleDbVersionErrorsBase(nss, expected, request);
+ });
+ }
+
void expectInsertsReturnError(const std::vector<BSONObj>& expected,
const BatchedCommandResponse& errResponse) {
onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
@@ -319,7 +371,7 @@ TEST_F(BatchWriteExecTest, SingleOpError) {
// Test retryable errors
//
-TEST_F(BatchWriteExecTest, StaleOp) {
+TEST_F(BatchWriteExecTest, StaleShardOp) {
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
insertOp.setWriteCommandBase([] {
@@ -339,7 +391,7 @@ TEST_F(BatchWriteExecTest, StaleOp) {
BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
ASSERT(response.getOk());
- ASSERT_EQUALS(1, stats.numStaleBatches);
+ ASSERT_EQUALS(1, stats.numStaleShardBatches);
});
const std::vector<BSONObj> expected{BSON("x" << 1)};
@@ -350,7 +402,7 @@ TEST_F(BatchWriteExecTest, StaleOp) {
future.default_timed_get();
}
-TEST_F(BatchWriteExecTest, MultiStaleOp) {
+TEST_F(BatchWriteExecTest, MultiStaleShardOp) {
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
insertOp.setWriteCommandBase([] {
@@ -369,7 +421,7 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) {
BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
ASSERT(response.getOk());
- ASSERT_EQUALS(3, stats.numStaleBatches);
+ ASSERT_EQUALS(3, stats.numStaleShardBatches);
});
const std::vector<BSONObj> expected{BSON("x" << 1)};
@@ -384,7 +436,7 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) {
future.default_timed_get();
}
-TEST_F(BatchWriteExecTest, TooManyStaleOp) {
+TEST_F(BatchWriteExecTest, TooManyStaleShardOp) {
// Retry op in exec too many times (without refresh) b/c of stale config (the mock nsTargeter
// doesn't report progress on refresh). We should report a no progress error for everything in
// the batch.
@@ -410,7 +462,7 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) {
ASSERT_EQUALS(response.getErrDetailsAt(0)->toStatus().code(), ErrorCodes::NoProgressMade);
ASSERT_EQUALS(response.getErrDetailsAt(1)->toStatus().code(), ErrorCodes::NoProgressMade);
- ASSERT_EQUALS(stats.numStaleBatches, (1 + kMaxRoundsWithoutProgress));
+ ASSERT_EQUALS(stats.numStaleShardBatches, (1 + kMaxRoundsWithoutProgress));
});
// Return multiple StaleShardVersion errors
@@ -421,6 +473,108 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) {
future.default_timed_get();
}
+TEST_F(BatchWriteExecTest, StaleDbOp) {
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ insertOp.setDocuments({BSON("x" << 1)});
+ return insertOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ // Execute request
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ ASSERT(response.getOk());
+
+ ASSERT_EQUALS(1, stats.numStaleDbBatches);
+ });
+
+ const std::vector<BSONObj> expected{BSON("x" << 1)};
+
+ expectInsertsReturnStaleDbVersionErrors(expected);
+ expectInsertsReturnSuccess(expected);
+
+ future.default_timed_get();
+}
+
+TEST_F(BatchWriteExecTest, MultiStaleDbOp) {
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ insertOp.setDocuments({BSON("x" << 1)});
+ return insertOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ ASSERT(response.getOk());
+
+ ASSERT_EQUALS(3, stats.numStaleDbBatches);
+ });
+
+ const std::vector<BSONObj> expected{BSON("x" << 1)};
+
+ // Return multiple StaleDbVersion errors, but less than the give-up number
+ for (int i = 0; i < 3; i++) {
+ expectInsertsReturnStaleDbVersionErrors(expected);
+ }
+
+ expectInsertsReturnSuccess(expected);
+
+ future.default_timed_get();
+}
+
+TEST_F(BatchWriteExecTest, TooManyStaleDbOp) {
+ // Retry op in exec too many times (without refresh) b/c of stale config (the mock nsTargeter
+ // doesn't report progress on refresh). We should report a no progress error for everything in
+ // the batch.
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)});
+ return insertOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ ASSERT(response.getOk());
+ ASSERT_EQ(0, response.getN());
+ ASSERT(response.isErrDetailsSet());
+ ASSERT_EQUALS(response.getErrDetailsAt(0)->toStatus().code(), ErrorCodes::NoProgressMade);
+ ASSERT_EQUALS(response.getErrDetailsAt(1)->toStatus().code(), ErrorCodes::NoProgressMade);
+
+ ASSERT_EQUALS(stats.numStaleDbBatches, (1 + kMaxRoundsWithoutProgress));
+ });
+
+ // Return multiple StaleDbVersion errors
+ for (int i = 0; i < (1 + kMaxRoundsWithoutProgress); i++) {
+ expectInsertsReturnStaleDbVersionErrors({BSON("x" << 1), BSON("x" << 2)});
+ }
+
+ future.default_timed_get();
+}
+
TEST_F(BatchWriteExecTest, RetryableWritesLargeBatch) {
// A retryable error without a txnNumber is not retried.
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 5a4c9ac1253..bba83f7ad6e 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -37,6 +37,9 @@
#include "mongo/base/error_codes.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/write_ops_parsers.h"
+#include "mongo/db/s/database_sharding_state.h"
+#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/s/grid.h"
#include "mongo/s/transaction_router.h"
#include "mongo/util/transitional_tools_do_not_use/vector_spooling.h"
@@ -521,7 +524,15 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest(
return wcb;
}());
- request.setShardVersion(targetedBatch.getEndpoint().shardVersion);
+
+ auto shardVersion = targetedBatch.getEndpoint().shardVersion;
+ request.setShardVersion(shardVersion);
+
+ auto dbVersion = targetedBatch.getEndpoint().databaseVersion;
+ invariant((shardVersion == ChunkVersion::UNSHARDED() && dbVersion) ||
+ (shardVersion != ChunkVersion::UNSHARDED() && !dbVersion));
+ if (dbVersion)
+ request.setDbVersion(dbVersion.get());
if (_clientRequest.hasWriteConcern()) {
if (_clientRequest.isVerboseWC()) {
@@ -878,7 +889,21 @@ bool EndpointComp::operator()(const ShardEndpoint* endpointA,
return shardVersionDiff < 0;
}
- return endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()) < 0;
+ const long epochDiff = endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch());
+ if (epochDiff) {
+ return epochDiff < 0;
+ }
+
+ if (endpointA->databaseVersion && endpointB->databaseVersion) {
+ if (endpointA->databaseVersion->getUuid() < endpointB->databaseVersion->getUuid())
+ return true;
+
+ return (endpointA->databaseVersion->getLastMod() -
+ endpointB->databaseVersion->getLastMod() <
+ 0);
+ }
+
+ return false;
}
void TrackedErrors::startTracking(int errCode) {
diff --git a/src/mongo/s/write_ops/batched_command_request.cpp b/src/mongo/s/write_ops/batched_command_request.cpp
index dd196625513..ff719d81f07 100644
--- a/src/mongo/s/write_ops/batched_command_request.cpp
+++ b/src/mongo/s/write_ops/batched_command_request.cpp
@@ -46,6 +46,11 @@ BatchedCommandRequest constructBatchedCommandRequest(const OpMsgRequest& request
auto chunkVersion = ChunkVersion::parseFromCommand(request.body);
if (chunkVersion != ErrorCodes::NoSuchKey) {
batchRequest.setShardVersion(uassertStatusOK(std::move(chunkVersion)));
+ if (chunkVersion == ChunkVersion::UNSHARDED()) {
+ auto dbVersion = DatabaseVersion::parse(IDLParserErrorContext("BatchedCommandRequest"),
+ request.body);
+ batchRequest.setDbVersion(std::move(dbVersion));
+ }
}
auto writeConcernField = request.body[kWriteConcern];
@@ -121,6 +126,10 @@ void BatchedCommandRequest::serialize(BSONObjBuilder* builder) const {
_shardVersion->appendToCommand(builder);
}
+ if (_dbVersion) {
+ builder->append("databaseVersion", _dbVersion->toBSON());
+ }
+
if (_writeConcern) {
builder->append(kWriteConcern, *_writeConcern);
}
diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h
index 7b7700b5df4..59e6edc28ca 100644
--- a/src/mongo/s/write_ops/batched_command_request.h
+++ b/src/mongo/s/write_ops/batched_command_request.h
@@ -35,6 +35,7 @@
#include "mongo/db/ops/write_ops.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/s/chunk_version.h"
+#include "mongo/s/database_version_helpers.h"
namespace mongo {
@@ -115,6 +116,19 @@ public:
return *_shardVersion;
}
+ void setDbVersion(DatabaseVersion dbVersion) {
+ _dbVersion = std::move(dbVersion);
+ }
+
+ bool hasDbVersion() const {
+ return _dbVersion.is_initialized();
+ }
+
+ const DatabaseVersion& getDbVersion() const {
+ invariant(_dbVersion);
+ return *_dbVersion;
+ }
+
void setRuntimeConstants(RuntimeConstants runtimeConstants) {
invariant(_updateReq);
_updateReq->setRuntimeConstants(std::move(runtimeConstants));
@@ -179,6 +193,7 @@ private:
std::unique_ptr<write_ops::Delete> _deleteReq;
boost::optional<ChunkVersion> _shardVersion;
+ boost::optional<DatabaseVersion> _dbVersion;
boost::optional<BSONObj> _writeConcern;
bool _allowImplicitCollectionCreation = true;
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
index d6b49027c1f..68b752d9e19 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/query/collation/collation_index_key.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/s/database_version_helpers.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/util/log.h"
@@ -210,7 +211,7 @@ bool isExactIdQuery(OperationContext* opCtx,
return cq.isOK() && isExactIdQuery(opCtx, *cq.getValue(), manager);
}
//
-// Utilities to compare shard versions
+// Utilities to compare shard and db versions
//
/**
@@ -297,22 +298,37 @@ CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routing
return finalResult;
}
+CompareResult compareDbVersions(const CachedCollectionRoutingInfo& routingInfo,
+ const boost::optional<DatabaseVersion>& remoteDbVersion) {
+ DatabaseVersion cachedDbVersion = routingInfo.db().databaseVersion();
+
+ // Db may have been dropped
+ if (!remoteDbVersion || (cachedDbVersion.getUuid() != remoteDbVersion->getUuid())) {
+ return CompareResult_Unknown;
+ }
+
+ if (cachedDbVersion.getLastMod() < remoteDbVersion->getLastMod()) {
+ return CompareResult_LT;
+ }
+
+ return CompareResult_GTE;
+}
+
/**
* Whether or not the manager/primary pair is different from the other manager/primary pair.
*/
bool isMetadataDifferent(const std::shared_ptr<ChunkManager>& managerA,
- const std::shared_ptr<Shard>& primaryA,
+ const DatabaseVersion dbVersionA,
const std::shared_ptr<ChunkManager>& managerB,
- const std::shared_ptr<Shard>& primaryB) {
- if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) ||
- (!primaryA && primaryB))
+ const DatabaseVersion dbVersionB) {
+ if ((managerA && !managerB) || (!managerA && managerB))
return true;
if (managerA) {
return managerA->getVersion() != managerB->getVersion();
}
- return primaryA->getId() != primaryB->getId();
+ return databaseVersion::equal(dbVersionA, dbVersionB);
}
/**
@@ -320,10 +336,10 @@ bool isMetadataDifferent(const std::shared_ptr<ChunkManager>& managerA,
* of the metadata.
*/
bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA,
- const std::shared_ptr<Shard>& primaryA,
+ const DatabaseVersion dbVersionA,
const std::shared_ptr<ChunkManager>& managerB,
- const std::shared_ptr<Shard>& primaryB) {
- if (isMetadataDifferent(managerA, primaryA, managerB, primaryB))
+ const DatabaseVersion dbVersionB) {
+ if (isMetadataDifferent(managerA, dbVersionA, managerB, dbVersionB))
return true;
if (managerA) {
@@ -401,7 +417,9 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::targetInsert(OperationContext* o
<< "; no metadata found");
}
- return ShardEndpoint(_routingInfo->db().primary()->getId(), ChunkVersion::UNSHARDED());
+ return ShardEndpoint(_routingInfo->db().primary()->getId(),
+ ChunkVersion::UNSHARDED(),
+ _routingInfo->db().databaseVersion());
}
return Status::OK();
@@ -430,8 +448,9 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetUpdate(
str::stream() << "could not target update on " << getNS().ns()
<< "; no metadata found"};
}
- return std::vector<ShardEndpoint>{
- {_routingInfo->db().primaryId(), ChunkVersion::UNSHARDED()}};
+ return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(),
+ ChunkVersion::UNSHARDED(),
+ _routingInfo->db().databaseVersion()}};
}
const auto& shardKeyPattern = _routingInfo->cm()->getShardKeyPattern();
@@ -590,22 +609,22 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery(
<< "; no metadata found"};
}
+ if (!_routingInfo->cm()) {
+ return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(),
+ ChunkVersion::UNSHARDED(),
+ _routingInfo->db().databaseVersion()}};
+ }
+
std::set<ShardId> shardIds;
- if (_routingInfo->cm()) {
- try {
- _routingInfo->cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds);
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
- } else {
- shardIds.insert(_routingInfo->db().primary()->getId());
+ try {
+ _routingInfo->cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
std::vector<ShardEndpoint> endpoints;
for (auto&& shardId : shardIds) {
- const auto version = _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
- : ChunkVersion::UNSHARDED();
- endpoints.emplace_back(std::move(shardId), version);
+ endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId));
}
return endpoints;
@@ -630,19 +649,20 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetCollection()
<< "; metadata not found"};
}
- std::set<ShardId> shardIds;
- if (_routingInfo->cm()) {
- _routingInfo->cm()->getAllShardIds(&shardIds);
- } else {
- shardIds.insert(_routingInfo->db().primary()->getId());
+ if (!_routingInfo->cm()) {
+ return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(),
+ ChunkVersion::UNSHARDED(),
+ _routingInfo->db().databaseVersion()}};
}
+ std::set<ShardId> shardIds;
+ _routingInfo->cm()->getAllShardIds(&shardIds);
+
std::vector<ShardEndpoint> endpoints;
for (auto&& shardId : shardIds) {
- const auto version = _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
- : ChunkVersion::UNSHARDED();
- endpoints.emplace_back(std::move(shardId), version);
+ endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId));
}
+
return endpoints;
}
@@ -657,11 +677,13 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetAllShards(
std::vector<ShardId> shardIds;
Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
+ // This function is only called if doing a multi write that targets more than one shard. This
+ // implies the collection is sharded, so we should always have a chunk manager.
+ invariant(_routingInfo->cm());
+
std::vector<ShardEndpoint> endpoints;
for (auto&& shardId : shardIds) {
- const auto version = _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
- : ChunkVersion::UNSHARDED();
- endpoints.emplace_back(std::move(shardId), version);
+ endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId));
}
return endpoints;
@@ -669,12 +691,14 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetAllShards(
void ChunkManagerTargeter::noteCouldNotTarget() {
dassert(_remoteShardVersions.empty());
+ dassert(!_remoteDbVersion);
_needsTargetingRefresh = true;
}
-void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint,
- const StaleConfigInfo& staleInfo) {
+void ChunkManagerTargeter::noteStaleShardResponse(const ShardEndpoint& endpoint,
+ const StaleConfigInfo& staleInfo) {
dassert(!_needsTargetingRefresh);
+ dassert(!_remoteDbVersion);
ChunkVersion remoteShardVersion;
if (!staleInfo.getVersionWanted()) {
@@ -703,6 +727,28 @@ void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint,
}
}
+void ChunkManagerTargeter::noteStaleDbResponse(const ShardEndpoint& endpoint,
+ const StaleDbRoutingVersion& staleInfo) {
+ dassert(!_needsTargetingRefresh);
+ dassert(_remoteShardVersions.empty());
+
+ DatabaseVersion remoteDbVersion;
+ if (!staleInfo.getVersionWanted()) {
+ // If we don't have a vWanted sent, assume the version is higher than our current version.
+ remoteDbVersion = _routingInfo->db().databaseVersion();
+ remoteDbVersion = databaseVersion::makeIncremented(remoteDbVersion);
+ } else {
+ remoteDbVersion = *staleInfo.getVersionWanted();
+ }
+
+ if (!_remoteDbVersion ||
+ (_remoteDbVersion->getUuid() == remoteDbVersion.getUuid() &&
+ _remoteDbVersion->getLastMod() < remoteDbVersion.getLastMod()) ||
+ (_remoteDbVersion->getUuid() != remoteDbVersion.getUuid())) {
+ _remoteDbVersion = remoteDbVersion;
+ }
+}
+
Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) {
bool dummy;
if (!wasChanged) {
@@ -713,13 +759,14 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
LOG(4) << "ChunkManagerTargeter checking if refresh is needed, needsTargetingRefresh("
<< _needsTargetingRefresh << ") remoteShardVersions empty ("
- << _remoteShardVersions.empty() << ")";
+ << _remoteShardVersions.empty() << ")"
+ << ") remoteDbVersion empty (" << !_remoteDbVersion << ")";
//
// Did we have any stale config or targeting errors at all?
//
- if (!_needsTargetingRefresh && _remoteShardVersions.empty()) {
+ if (!_needsTargetingRefresh && _remoteShardVersions.empty() && !_remoteDbVersion) {
return Status::OK();
}
@@ -728,7 +775,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
//
auto lastManager = _routingInfo->cm();
- auto lastPrimary = _routingInfo->db().primary();
+ auto lastDbVersion = _routingInfo->db().databaseVersion();
auto initStatus = init(opCtx);
if (!initStatus.isOK()) {
@@ -747,20 +794,19 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
_needsTargetingRefresh = false;
// If we couldn't target, we might need to refresh if we haven't remotely refreshed
- // the
- // metadata since we last got it from the cache.
+ // the metadata since we last got it from the cache.
bool alreadyRefreshed = wasMetadataRefreshed(
- lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->db().primary());
+ lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion());
// If didn't already refresh the targeting information, refresh it
if (!alreadyRefreshed) {
// To match previous behavior, we just need an incremental refresh here
- return _refreshNow(opCtx);
+ return _refreshShardVersionNow(opCtx);
}
*wasChanged = isMetadataDifferent(
- lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->db().primary());
+ lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion());
return Status::OK();
} else if (!_remoteShardVersions.empty()) {
// If we got stale shard versions from remote shards, we may need to refresh
@@ -775,21 +821,48 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
if (result == CompareResult_Unknown || result == CompareResult_LT) {
// Our current shard versions aren't all comparable to the old versions, maybe drop
- return _refreshNow(opCtx);
+ return _refreshShardVersionNow(opCtx);
+ }
+
+ *wasChanged = isMetadataDifferent(
+ lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion());
+ return Status::OK();
+ } else if (_remoteDbVersion) {
+ // If we got stale dbversions from remote shards, we may need to refresh
+ // NOTE: Not sure yet if this can happen simultaneously with targeting issues
+
+ CompareResult result = compareDbVersions(*_routingInfo, _remoteDbVersion);
+
+ LOG(4) << "ChunkManagerTargeter database versions comparison result: " << (int)result;
+
+ // Reset the version
+ _remoteDbVersion = boost::none;
+
+ if (result == CompareResult_Unknown || result == CompareResult_LT) {
+ // Our current db version isn't always comparable to the old version, it may have been
+ // dropped
+ return _refreshDbVersionNow(opCtx);
}
*wasChanged = isMetadataDifferent(
- lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->db().primary());
+ lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion());
return Status::OK();
}
MONGO_UNREACHABLE;
}
-Status ChunkManagerTargeter::_refreshNow(OperationContext* opCtx) {
+Status ChunkManagerTargeter::_refreshShardVersionNow(OperationContext* opCtx) {
Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(*_routingInfo));
return init(opCtx);
}
+Status ChunkManagerTargeter::_refreshDbVersionNow(OperationContext* opCtx) {
+ Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(
+ _nss.db(), std::move(_routingInfo->db().databaseVersion()));
+
+ return init(opCtx);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h
index 2ace173e625..1ea977ef75f 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.h
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.h
@@ -94,8 +94,11 @@ public:
void noteCouldNotTarget() override;
- void noteStaleResponse(const ShardEndpoint& endpoint,
- const StaleConfigInfo& staleInfo) override;
+ void noteStaleShardResponse(const ShardEndpoint& endpoint,
+ const StaleConfigInfo& staleInfo) override;
+
+ void noteStaleDbResponse(const ShardEndpoint& endpoint,
+ const StaleDbRoutingVersion& staleInfo) override;
/**
* Replaces the targeting information with the latest information from the cache. If this
@@ -114,7 +117,12 @@ private:
/**
* Performs an actual refresh from the config server.
*/
- Status _refreshNow(OperationContext* opCtx);
+ Status _refreshShardVersionNow(OperationContext* opCtx);
+
+ /**
+ * Performs an actual refresh from the config server.
+ */
+ Status _refreshDbVersionNow(OperationContext* opCtx);
/**
* Returns a vector of ShardEndpoints where a document might need to be placed.
@@ -165,6 +173,9 @@ private:
// Map of shard->remote shard version reported from stale errors
ShardVersionMap _remoteShardVersions;
+
+ // remote db version reported from stale errors
+ boost::optional<DatabaseVersion> _remoteDbVersion;
};
} // namespace mongo
diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h
index 61380771d2a..210406d327d 100644
--- a/src/mongo/s/write_ops/mock_ns_targeter.h
+++ b/src/mongo/s/write_ops/mock_ns_targeter.h
@@ -120,8 +120,13 @@ public:
// No-op
}
- void noteStaleResponse(const ShardEndpoint& endpoint,
- const StaleConfigInfo& staleInfo) override {
+ void noteStaleShardResponse(const ShardEndpoint& endpoint,
+ const StaleConfigInfo& staleInfo) override {
+ // No-op
+ }
+
+ void noteStaleDbResponse(const ShardEndpoint& endpoint,
+ const StaleDbRoutingVersion& staleInfo) override {
// No-op
}
diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp
index fad332f928c..595e8091a76 100644
--- a/src/mongo/s/write_ops/write_op.cpp
+++ b/src/mongo/s/write_ops/write_op.cpp
@@ -114,7 +114,8 @@ size_t WriteOp::getNumTargeted() {
static bool isRetryErrCode(int errCode) {
return errCode == ErrorCodes::StaleShardVersion ||
- errCode == ErrorCodes::CannotImplicitlyCreateCollection;
+ errCode == ErrorCodes::CannotImplicitlyCreateCollection ||
+ errCode == ErrorCodes::StaleDbVersion;
}
static bool errorsAllSame(const vector<ChildWriteOp const*>& errOps) {