diff options
author | jannaerin <golden.janna@gmail.com> | 2019-07-18 00:23:44 -0400 |
---|---|---|
committer | jannaerin <golden.janna@gmail.com> | 2019-08-08 17:32:26 -0400 |
commit | a70f6ddb8817f7b77b4597bba1a854548c4dbf12 (patch) | |
tree | 62e2a4ad3a2b179bddadc8331dd63808a430ef5f | |
parent | d101a617bada9252a4f0a29b8f615ee62abb979b (diff) | |
download | mongo-a70f6ddb8817f7b77b4597bba1a854548c4dbf12.tar.gz |
SERVER-41949 Attach the databaseVersion on the write path on mongos"
24 files changed, 438 insertions, 106 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding.yml b/buildscripts/resmokeconfig/suites/sharding.yml index f3e764bbf0c..ce952617352 100644 --- a/buildscripts/resmokeconfig/suites/sharding.yml +++ b/buildscripts/resmokeconfig/suites/sharding.yml @@ -3,9 +3,6 @@ test_kind: js_test selector: roots: - jstests/sharding/*.js - exclude_files: - # Aggregation with $merge will fail until SERVER-41949 is finished. - - jstests/sharding/merge_with_move_primary.js executor: config: diff --git a/buildscripts/resmokeconfig/suites/sharding_auth.yml b/buildscripts/resmokeconfig/suites/sharding_auth.yml index 21e31d4b64b..5f361dd80cb 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth.yml @@ -32,8 +32,6 @@ selector: - jstests/sharding/migration_critical_section_concurrency.js # SERVER-21713 # Runs with auth enabled. - jstests/sharding/mongod_returns_no_cluster_time_without_keys.js - # Aggregation with $merge will fail until SERVER-41949 is finished. - - jstests/sharding/merge_with_move_primary.js executor: config: diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml index 1ad84f7888a..25cf393d692 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml @@ -32,8 +32,6 @@ selector: - jstests/sharding/migration_critical_section_concurrency.js # SERVER-21713 # Runs with auth enabled. - jstests/sharding/mongod_returns_no_cluster_time_without_keys.js - # Aggregation with $merge will fail until SERVER-41949 is finished. - - jstests/sharding/merge_with_move_primary.js executor: config: diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml index a302bc5d92b..342c82a6f0b 100644 --- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml +++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml @@ -202,13 +202,12 @@ selector: - jstests/sharding/sharding_statistics_server_status.js # setShardVersion is not robust during config server stepdown. - jstests/sharding/mongos_no_detect_sharding.js - # Aggregation with $merge will fail until SERVER-41949 is finished. - - jstests/sharding/merge_with_move_primary.js # TODO (SERVER-42143): The tests below need transactional support for refineCollectionShardKey. - jstests/sharding/refine_collection_shard_key_basic.js - jstests/sharding/refine_collection_shard_key_jumbo.js - jstests/sharding/refine_collection_shard_key_drops_chunks.js + executor: config: shell_options: diff --git a/buildscripts/resmokeconfig/suites/sharding_ese.yml b/buildscripts/resmokeconfig/suites/sharding_ese.yml index e65283c6db0..d8d620384a1 100644 --- a/buildscripts/resmokeconfig/suites/sharding_ese.yml +++ b/buildscripts/resmokeconfig/suites/sharding_ese.yml @@ -7,9 +7,6 @@ test_kind: js_test selector: roots: - jstests/sharding/*.js - exclude_files: - # Aggregation with $merge will fail until SERVER-41949 is finished. - - jstests/sharding/merge_with_move_primary.js executor: config: diff --git a/buildscripts/resmokeconfig/suites/sharding_ese_gcm.yml b/buildscripts/resmokeconfig/suites/sharding_ese_gcm.yml index a02f5d66113..23e5a9b091d 100644 --- a/buildscripts/resmokeconfig/suites/sharding_ese_gcm.yml +++ b/buildscripts/resmokeconfig/suites/sharding_ese_gcm.yml @@ -7,9 +7,6 @@ test_kind: js_test selector: roots: - jstests/sharding/*.js - exclude_files: - # Aggregation with $merge will fail until SERVER-41949 is finished. - - jstests/sharding/merge_with_move_primary.js executor: config: diff --git a/buildscripts/resmokeconfig/suites/sharding_rs_matching_disabled.yml b/buildscripts/resmokeconfig/suites/sharding_rs_matching_disabled.yml index df1687716b4..f159ca35e52 100644 --- a/buildscripts/resmokeconfig/suites/sharding_rs_matching_disabled.yml +++ b/buildscripts/resmokeconfig/suites/sharding_rs_matching_disabled.yml @@ -3,9 +3,6 @@ test_kind: js_test selector: roots: - jstests/sharding/*.js - exclude_files: - # Aggregation with $merge will fail until SERVER-41949 is finished. - - jstests/sharding/merge_with_move_primary.js executor: config: diff --git a/buildscripts/resmokeconfig/suites/sharding_rs_matching_match_busiest_node.yml b/buildscripts/resmokeconfig/suites/sharding_rs_matching_match_busiest_node.yml index 40ebba8c625..5aefdbbe4ec 100644 --- a/buildscripts/resmokeconfig/suites/sharding_rs_matching_match_busiest_node.yml +++ b/buildscripts/resmokeconfig/suites/sharding_rs_matching_match_busiest_node.yml @@ -3,9 +3,6 @@ test_kind: js_test selector: roots: - jstests/sharding/*.js - exclude_files: - # Aggregation with $merge will fail until SERVER-41949 is finished. - - jstests/sharding/merge_with_move_primary.js executor: config: diff --git a/jstests/sharding/database_and_shard_versioning_all_commands.js b/jstests/sharding/database_and_shard_versioning_all_commands.js index 3dca22294de..aa597087f56 100644 --- a/jstests/sharding/database_and_shard_versioning_all_commands.js +++ b/jstests/sharding/database_and_shard_versioning_all_commands.js @@ -156,7 +156,7 @@ let testCases = { }, delete: { skipProfilerCheck: true, - sendsDbVersion: false, + sendsDbVersion: true, // The profiler extracts the individual deletes from the 'deletes' array, and so loses // the overall delete command's attached shardVersion, though one is sent. sendsShardVersion: true, @@ -231,7 +231,7 @@ let testCases = { grantRolesToUser: {skip: "always targets the config server"}, hostInfo: {skip: "executes locally on mongos (not sent to any remote node)"}, insert: { - sendsDbVersion: false, + sendsDbVersion: true, sendsShardVersion: true, command: {insert: collName, documents: [{_id: 1}]}, cleanUp: function(mongosConn) { @@ -411,7 +411,7 @@ let testCases = { stopRecordingTraffic: {skip: "executes locally on mongos (not sent to any remote node)"}, update: { skipProfilerCheck: true, - sendsDbVersion: false, + sendsDbVersion: true, // The profiler extracts the individual updates from the 'updates' array, and so loses // the overall update command's attached shardVersion, though one is sent. sendsShardVersion: true, diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index e321fb3cfa6..e5b6d25396b 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -93,7 +93,8 @@ void serializeReply(OperationContext* opCtx, if (continueOnError && !result.results.empty()) { const auto& lastResult = result.results.back(); if (lastResult == ErrorCodes::StaleConfig || - lastResult == ErrorCodes::CannotImplicitlyCreateCollection) { + lastResult == ErrorCodes::CannotImplicitlyCreateCollection || + lastResult == ErrorCodes::StaleDbVersion) { // For ordered:false commands we need to duplicate these error results for all ops // after we stopped. See handleError() in write_ops_exec.cpp for more info. auto err = result.results.back(); diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index d648e60239f..d495bb89f35 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -256,7 +256,7 @@ bool handleError(OperationContext* opCtx, return false; } - if (ex.extraInfo<StaleConfigInfo>()) { + if (ex.extraInfo<StaleConfigInfo>() || ex.extraInfo<StaleDbRoutingVersion>()) { if (!opCtx->getClient()->isInDirectClient()) { auto& oss = OperationShardingState::get(opCtx); oss.setShardingOperationFailedStatus(ex.toStatus()); diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index ea3b28b50a9..ac6198dd79f 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -317,14 +317,22 @@ Status MongoInterfaceStandalone::appendQueryExecStats(OperationContext* opCtx, } BSONObj MongoInterfaceStandalone::getCollectionOptions(const NamespaceString& nss) { - const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); - if (infos.empty()) { - return BSONObj(); + std::list<BSONObj> infos; + + try { + infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); + if (infos.empty()) { + return BSONObj(); + } + } catch (const DBException& e) { + uasserted(ErrorCodes::CommandFailed, e.reason()); } + const auto& infoObj = infos.front(); uassert(ErrorCodes::CommandNotSupportedOnView, str::stream() << nss.toString() << " is a view, not a collection", infoObj["type"].valueStringData() != "view"_sd); + return infoObj.getObjectField("options").getOwned(); } diff --git a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp index c75cba94c62..625f62a51f3 100644 --- a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp +++ b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp @@ -78,6 +78,14 @@ ScopedOperationCompletionShardingActions::~ScopedOperationCompletionShardingActi if (!handleMismatchStatus.isOK()) log() << "Failed to handle stale version exception" << causedBy(redact(handleMismatchStatus)); + } else if (auto staleInfo = status->extraInfo<StaleDbRoutingVersion>()) { + auto handleMismatchStatus = onDbVersionMismatchNoExcept(_opCtx, + staleInfo->getDb(), + staleInfo->getVersionReceived(), + staleInfo->getVersionWanted()); + if (!handleMismatchStatus.isOK()) + log() << "Failed to handle database version exception" + << causedBy(redact(handleMismatchStatus)); } else if (auto cannotImplicitCreateCollInfo = status->extraInfo<CannotImplicitlyCreateCollectionInfo>()) { if (ShardingState::get(_opCtx)->enabled()) { diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index 92e29547705..a86b5f43737 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -43,17 +43,22 @@ namespace mongo { class OperationContext; /** - * Combines a shard and the version which that shard should be using + * Combines a shard, the shard version, and database version that the shard should be using */ struct ShardEndpoint { - ShardEndpoint(const ShardId& shardName, const ChunkVersion& shardVersion) - : shardName(shardName), shardVersion(shardVersion) {} + ShardEndpoint(const ShardId& shardName, + const ChunkVersion& shardVersion, + const boost::optional<DatabaseVersion> dbVersion = boost::none) + : shardName(shardName), shardVersion(shardVersion), databaseVersion(dbVersion) {} ShardEndpoint(const ShardEndpoint& other) - : shardName(other.shardName), shardVersion(other.shardVersion) {} + : shardName(other.shardName), shardVersion(other.shardVersion) { + databaseVersion = other.databaseVersion; + } ShardId shardName; ChunkVersion shardVersion; + boost::optional<DatabaseVersion> databaseVersion; }; /** @@ -143,8 +148,19 @@ public: * * If stale responses are is noted, we must not have noted that we cannot target. */ - virtual void noteStaleResponse(const ShardEndpoint& endpoint, - const StaleConfigInfo& staleInfo) = 0; + virtual void noteStaleShardResponse(const ShardEndpoint& endpoint, + const StaleConfigInfo& staleInfo) = 0; + + /** + * Informs the targeter of stale db routing version responses for this db from an endpoint, + * with further information available in the returned staleInfo. + * + * Any stale responses noted here will be taken into account on the next refresh. + * + * If stale responses are is noted, we must not have noted that we cannot target. + */ + virtual void noteStaleDbResponse(const ShardEndpoint& endpoint, + const StaleDbRoutingVersion& staleInfo) = 0; /** * Refreshes the targeting metadata for the namespace if needed, based on previously-noted diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index b06b0c1c63b..bf1c8a0acd2 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -68,18 +68,28 @@ WriteErrorDetail errorFromStatus(const Status& status) { return error; } -// Helper to note several stale errors from a response -void noteStaleResponses(const std::vector<ShardError>& staleErrors, NSTargeter* targeter) { +// Helper to note several stale shard errors from a response +void noteStaleShardResponses(const std::vector<ShardError>& staleErrors, NSTargeter* targeter) { for (const auto& error : staleErrors) { LOG(4) << "Noting stale config response " << error.error.getErrInfo() << " from shard " << error.endpoint.shardName; - targeter->noteStaleResponse( + targeter->noteStaleShardResponse( error.endpoint, StaleConfigInfo::parseFromCommandError( error.error.isErrInfoSet() ? error.error.getErrInfo() : BSONObj())); } } +// Helper to note several stale db errors from a response +void noteStaleDbResponses(const std::vector<ShardError>& staleErrors, NSTargeter* targeter) { + for (const auto& error : staleErrors) { + LOG(4) << "Noting stale database response " << error.error.toBSON() << " from shard " + << error.endpoint.shardName; + targeter->noteStaleDbResponse( + error.endpoint, StaleDbRoutingVersion::parseFromCommandError(error.error.toBSON())); + } +} + bool hasTransientTransactionError(const BatchedCommandResponse& response) { if (!response.isErrorLabelsSet()) { return false; @@ -297,6 +307,7 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, if (responseStatus.isOK()) { TrackedErrors trackedErrors; trackedErrors.startTracking(ErrorCodes::StaleShardVersion); + trackedErrors.startTracking(ErrorCodes::StaleDbVersion); trackedErrors.startTracking(ErrorCodes::CannotImplicitlyCreateCollection); LOG(4) << "Write results received from " << shardHost.toString() << ": " @@ -329,11 +340,20 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, } // Note if anything was stale - const auto& staleErrors = + const auto& staleShardErrors = trackedErrors.getErrors(ErrorCodes::StaleShardVersion); - if (!staleErrors.empty()) { - noteStaleResponses(staleErrors, &targeter); - ++stats->numStaleBatches; + const auto& staleDbErrors = trackedErrors.getErrors(ErrorCodes::StaleDbVersion); + + if (!staleShardErrors.empty()) { + invariant(staleDbErrors.empty()); + noteStaleShardResponses(staleShardErrors, &targeter); + ++stats->numStaleShardBatches; + } + + if (!staleDbErrors.empty()) { + invariant(staleShardErrors.empty()); + noteStaleDbResponses(staleDbErrors, &targeter); + ++stats->numStaleDbBatches; } const auto& cannotImplicitlyCreateErrors = diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h index f61926889b8..c729561630a 100644 --- a/src/mongo/s/write_ops/batch_write_exec.h +++ b/src/mongo/s/write_ops/batch_write_exec.h @@ -86,7 +86,11 @@ typedef std::map<ConnectionString, HostOpTime> HostOpTimeMap; class BatchWriteExecStats { public: BatchWriteExecStats() - : numRounds(0), numTargetErrors(0), numResolveErrors(0), numStaleBatches(0) {} + : numRounds(0), + numTargetErrors(0), + numResolveErrors(0), + numStaleShardBatches(0), + numStaleDbBatches(0) {} void noteWriteAt(const HostAndPort& host, repl::OpTime opTime, const OID& electionId); void noteTargetedShard(const ShardId& shardId); @@ -102,8 +106,10 @@ public: int numTargetErrors; // Number of times host resolution failed int numResolveErrors; - // Number of stale batches - int numStaleBatches; + // Number of stale batches due to StaleShardVersion + int numStaleShardBatches; + // Number of stale batches due to StaleDbVersion + int numStaleDbBatches; private: std::set<ShardId> _targetedShards; diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index b171341c64c..c04ed84bcad 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -98,6 +98,52 @@ BSONObj expectInsertsReturnStaleVersionErrorsBase(const NamespaceString& nss, return staleResponse.toBSON(); } +BSONObj expectInsertsReturnStaleDbVersionErrorsBase(const NamespaceString& nss, + const std::vector<BSONObj>& expected, + const executor::RemoteCommandRequest& request) { + ASSERT_EQUALS(nss.db(), request.dbname); + + const auto opMsgRequest(OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj)); + const auto actualBatchedInsert(BatchedCommandRequest::parseInsert(opMsgRequest)); + ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().ns()); + + const auto& inserted = actualBatchedInsert.getInsertRequest().getDocuments(); + ASSERT_EQUALS(expected.size(), inserted.size()); + + auto itInserted = inserted.begin(); + auto itExpected = expected.begin(); + + for (; itInserted != inserted.end(); itInserted++, itExpected++) { + ASSERT_BSONOBJ_EQ(*itExpected, *itInserted); + } + + BSONObjBuilder staleResponse; + staleResponse.append("ok", 1); + staleResponse.append("n", 0); + + // Report a stale db version error for each write in the batch. + int i = 0; + std::vector<BSONObj> errors; + for (itInserted = inserted.begin(); itInserted != inserted.end(); ++itInserted) { + BSONObjBuilder errorBuilder; + errorBuilder.append("index", i); + errorBuilder.append("code", int(ErrorCodes::StaleDbVersion)); + + auto dbVersion = databaseVersion::makeNew(); + errorBuilder.append("db", nss.db()); + errorBuilder.append("vReceived", dbVersion.toBSON()); + errorBuilder.append("vWanted", databaseVersion::makeIncremented(dbVersion).toBSON()); + + errorBuilder.append("errmsg", "mock stale db version"); + + errors.push_back(errorBuilder.obj()); + ++i; + } + staleResponse.append("writeErrors", errors); + + return staleResponse.obj(); +} + /** * Mimics a single shard backend for a particular collection which can be initialized with a * set of write command results to return. @@ -174,6 +220,12 @@ public: }); } + virtual void expectInsertsReturnStaleDbVersionErrors(const std::vector<BSONObj>& expected) { + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + return expectInsertsReturnStaleDbVersionErrorsBase(nss, expected, request); + }); + } + void expectInsertsReturnError(const std::vector<BSONObj>& expected, const BatchedCommandResponse& errResponse) { onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { @@ -319,7 +371,7 @@ TEST_F(BatchWriteExecTest, SingleOpError) { // Test retryable errors // -TEST_F(BatchWriteExecTest, StaleOp) { +TEST_F(BatchWriteExecTest, StaleShardOp) { BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); insertOp.setWriteCommandBase([] { @@ -339,7 +391,7 @@ TEST_F(BatchWriteExecTest, StaleOp) { BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); ASSERT(response.getOk()); - ASSERT_EQUALS(1, stats.numStaleBatches); + ASSERT_EQUALS(1, stats.numStaleShardBatches); }); const std::vector<BSONObj> expected{BSON("x" << 1)}; @@ -350,7 +402,7 @@ TEST_F(BatchWriteExecTest, StaleOp) { future.default_timed_get(); } -TEST_F(BatchWriteExecTest, MultiStaleOp) { +TEST_F(BatchWriteExecTest, MultiStaleShardOp) { BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); insertOp.setWriteCommandBase([] { @@ -369,7 +421,7 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) { BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); ASSERT(response.getOk()); - ASSERT_EQUALS(3, stats.numStaleBatches); + ASSERT_EQUALS(3, stats.numStaleShardBatches); }); const std::vector<BSONObj> expected{BSON("x" << 1)}; @@ -384,7 +436,7 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) { future.default_timed_get(); } -TEST_F(BatchWriteExecTest, TooManyStaleOp) { +TEST_F(BatchWriteExecTest, TooManyStaleShardOp) { // Retry op in exec too many times (without refresh) b/c of stale config (the mock nsTargeter // doesn't report progress on refresh). We should report a no progress error for everything in // the batch. @@ -410,7 +462,7 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) { ASSERT_EQUALS(response.getErrDetailsAt(0)->toStatus().code(), ErrorCodes::NoProgressMade); ASSERT_EQUALS(response.getErrDetailsAt(1)->toStatus().code(), ErrorCodes::NoProgressMade); - ASSERT_EQUALS(stats.numStaleBatches, (1 + kMaxRoundsWithoutProgress)); + ASSERT_EQUALS(stats.numStaleShardBatches, (1 + kMaxRoundsWithoutProgress)); }); // Return multiple StaleShardVersion errors @@ -421,6 +473,108 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) { future.default_timed_get(); } +TEST_F(BatchWriteExecTest, StaleDbOp) { + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + insertOp.setDocuments({BSON("x" << 1)}); + return insertOp; + }()); + request.setWriteConcern(BSONObj()); + + // Execute request + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + ASSERT(response.getOk()); + + ASSERT_EQUALS(1, stats.numStaleDbBatches); + }); + + const std::vector<BSONObj> expected{BSON("x" << 1)}; + + expectInsertsReturnStaleDbVersionErrors(expected); + expectInsertsReturnSuccess(expected); + + future.default_timed_get(); +} + +TEST_F(BatchWriteExecTest, MultiStaleDbOp) { + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + insertOp.setDocuments({BSON("x" << 1)}); + return insertOp; + }()); + request.setWriteConcern(BSONObj()); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + ASSERT(response.getOk()); + + ASSERT_EQUALS(3, stats.numStaleDbBatches); + }); + + const std::vector<BSONObj> expected{BSON("x" << 1)}; + + // Return multiple StaleDbVersion errors, but less than the give-up number + for (int i = 0; i < 3; i++) { + expectInsertsReturnStaleDbVersionErrors(expected); + } + + expectInsertsReturnSuccess(expected); + + future.default_timed_get(); +} + +TEST_F(BatchWriteExecTest, TooManyStaleDbOp) { + // Retry op in exec too many times (without refresh) b/c of stale config (the mock nsTargeter + // doesn't report progress on refresh). We should report a no progress error for everything in + // the batch. + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)}); + return insertOp; + }()); + request.setWriteConcern(BSONObj()); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + ASSERT(response.getOk()); + ASSERT_EQ(0, response.getN()); + ASSERT(response.isErrDetailsSet()); + ASSERT_EQUALS(response.getErrDetailsAt(0)->toStatus().code(), ErrorCodes::NoProgressMade); + ASSERT_EQUALS(response.getErrDetailsAt(1)->toStatus().code(), ErrorCodes::NoProgressMade); + + ASSERT_EQUALS(stats.numStaleDbBatches, (1 + kMaxRoundsWithoutProgress)); + }); + + // Return multiple StaleDbVersion errors + for (int i = 0; i < (1 + kMaxRoundsWithoutProgress); i++) { + expectInsertsReturnStaleDbVersionErrors({BSON("x" << 1), BSON("x" << 2)}); + } + + future.default_timed_get(); +} + TEST_F(BatchWriteExecTest, RetryableWritesLargeBatch) { // A retryable error without a txnNumber is not retried. diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 5a4c9ac1253..bba83f7ad6e 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -37,6 +37,9 @@ #include "mongo/base/error_codes.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/write_ops_parsers.h" +#include "mongo/db/s/database_sharding_state.h" +#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/grid.h" #include "mongo/s/transaction_router.h" #include "mongo/util/transitional_tools_do_not_use/vector_spooling.h" @@ -521,7 +524,15 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest( return wcb; }()); - request.setShardVersion(targetedBatch.getEndpoint().shardVersion); + + auto shardVersion = targetedBatch.getEndpoint().shardVersion; + request.setShardVersion(shardVersion); + + auto dbVersion = targetedBatch.getEndpoint().databaseVersion; + invariant((shardVersion == ChunkVersion::UNSHARDED() && dbVersion) || + (shardVersion != ChunkVersion::UNSHARDED() && !dbVersion)); + if (dbVersion) + request.setDbVersion(dbVersion.get()); if (_clientRequest.hasWriteConcern()) { if (_clientRequest.isVerboseWC()) { @@ -878,7 +889,21 @@ bool EndpointComp::operator()(const ShardEndpoint* endpointA, return shardVersionDiff < 0; } - return endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()) < 0; + const long epochDiff = endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch()); + if (epochDiff) { + return epochDiff < 0; + } + + if (endpointA->databaseVersion && endpointB->databaseVersion) { + if (endpointA->databaseVersion->getUuid() < endpointB->databaseVersion->getUuid()) + return true; + + return (endpointA->databaseVersion->getLastMod() - + endpointB->databaseVersion->getLastMod() < + 0); + } + + return false; } void TrackedErrors::startTracking(int errCode) { diff --git a/src/mongo/s/write_ops/batched_command_request.cpp b/src/mongo/s/write_ops/batched_command_request.cpp index dd196625513..ff719d81f07 100644 --- a/src/mongo/s/write_ops/batched_command_request.cpp +++ b/src/mongo/s/write_ops/batched_command_request.cpp @@ -46,6 +46,11 @@ BatchedCommandRequest constructBatchedCommandRequest(const OpMsgRequest& request auto chunkVersion = ChunkVersion::parseFromCommand(request.body); if (chunkVersion != ErrorCodes::NoSuchKey) { batchRequest.setShardVersion(uassertStatusOK(std::move(chunkVersion))); + if (chunkVersion == ChunkVersion::UNSHARDED()) { + auto dbVersion = DatabaseVersion::parse(IDLParserErrorContext("BatchedCommandRequest"), + request.body); + batchRequest.setDbVersion(std::move(dbVersion)); + } } auto writeConcernField = request.body[kWriteConcern]; @@ -121,6 +126,10 @@ void BatchedCommandRequest::serialize(BSONObjBuilder* builder) const { _shardVersion->appendToCommand(builder); } + if (_dbVersion) { + builder->append("databaseVersion", _dbVersion->toBSON()); + } + if (_writeConcern) { builder->append(kWriteConcern, *_writeConcern); } diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h index 7b7700b5df4..59e6edc28ca 100644 --- a/src/mongo/s/write_ops/batched_command_request.h +++ b/src/mongo/s/write_ops/batched_command_request.h @@ -35,6 +35,7 @@ #include "mongo/db/ops/write_ops.h" #include "mongo/rpc/op_msg.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/database_version_helpers.h" namespace mongo { @@ -115,6 +116,19 @@ public: return *_shardVersion; } + void setDbVersion(DatabaseVersion dbVersion) { + _dbVersion = std::move(dbVersion); + } + + bool hasDbVersion() const { + return _dbVersion.is_initialized(); + } + + const DatabaseVersion& getDbVersion() const { + invariant(_dbVersion); + return *_dbVersion; + } + void setRuntimeConstants(RuntimeConstants runtimeConstants) { invariant(_updateReq); _updateReq->setRuntimeConstants(std::move(runtimeConstants)); @@ -179,6 +193,7 @@ private: std::unique_ptr<write_ops::Delete> _deleteReq; boost::optional<ChunkVersion> _shardVersion; + boost::optional<DatabaseVersion> _dbVersion; boost::optional<BSONObj> _writeConcern; bool _allowImplicitCollectionCreation = true; diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index d6b49027c1f..68b752d9e19 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -40,6 +40,7 @@ #include "mongo/db/query/collation/collation_index_key.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/database_version_helpers.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/log.h" @@ -210,7 +211,7 @@ bool isExactIdQuery(OperationContext* opCtx, return cq.isOK() && isExactIdQuery(opCtx, *cq.getValue(), manager); } // -// Utilities to compare shard versions +// Utilities to compare shard and db versions // /** @@ -297,22 +298,37 @@ CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routing return finalResult; } +CompareResult compareDbVersions(const CachedCollectionRoutingInfo& routingInfo, + const boost::optional<DatabaseVersion>& remoteDbVersion) { + DatabaseVersion cachedDbVersion = routingInfo.db().databaseVersion(); + + // Db may have been dropped + if (!remoteDbVersion || (cachedDbVersion.getUuid() != remoteDbVersion->getUuid())) { + return CompareResult_Unknown; + } + + if (cachedDbVersion.getLastMod() < remoteDbVersion->getLastMod()) { + return CompareResult_LT; + } + + return CompareResult_GTE; +} + /** * Whether or not the manager/primary pair is different from the other manager/primary pair. */ bool isMetadataDifferent(const std::shared_ptr<ChunkManager>& managerA, - const std::shared_ptr<Shard>& primaryA, + const DatabaseVersion dbVersionA, const std::shared_ptr<ChunkManager>& managerB, - const std::shared_ptr<Shard>& primaryB) { - if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) || - (!primaryA && primaryB)) + const DatabaseVersion dbVersionB) { + if ((managerA && !managerB) || (!managerA && managerB)) return true; if (managerA) { return managerA->getVersion() != managerB->getVersion(); } - return primaryA->getId() != primaryB->getId(); + return databaseVersion::equal(dbVersionA, dbVersionB); } /** @@ -320,10 +336,10 @@ bool isMetadataDifferent(const std::shared_ptr<ChunkManager>& managerA, * of the metadata. */ bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA, - const std::shared_ptr<Shard>& primaryA, + const DatabaseVersion dbVersionA, const std::shared_ptr<ChunkManager>& managerB, - const std::shared_ptr<Shard>& primaryB) { - if (isMetadataDifferent(managerA, primaryA, managerB, primaryB)) + const DatabaseVersion dbVersionB) { + if (isMetadataDifferent(managerA, dbVersionA, managerB, dbVersionB)) return true; if (managerA) { @@ -401,7 +417,9 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::targetInsert(OperationContext* o << "; no metadata found"); } - return ShardEndpoint(_routingInfo->db().primary()->getId(), ChunkVersion::UNSHARDED()); + return ShardEndpoint(_routingInfo->db().primary()->getId(), + ChunkVersion::UNSHARDED(), + _routingInfo->db().databaseVersion()); } return Status::OK(); @@ -430,8 +448,9 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetUpdate( str::stream() << "could not target update on " << getNS().ns() << "; no metadata found"}; } - return std::vector<ShardEndpoint>{ - {_routingInfo->db().primaryId(), ChunkVersion::UNSHARDED()}}; + return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(), + ChunkVersion::UNSHARDED(), + _routingInfo->db().databaseVersion()}}; } const auto& shardKeyPattern = _routingInfo->cm()->getShardKeyPattern(); @@ -590,22 +609,22 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery( << "; no metadata found"}; } + if (!_routingInfo->cm()) { + return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(), + ChunkVersion::UNSHARDED(), + _routingInfo->db().databaseVersion()}}; + } + std::set<ShardId> shardIds; - if (_routingInfo->cm()) { - try { - _routingInfo->cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds); - } catch (const DBException& ex) { - return ex.toStatus(); - } - } else { - shardIds.insert(_routingInfo->db().primary()->getId()); + try { + _routingInfo->cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds); + } catch (const DBException& ex) { + return ex.toStatus(); } std::vector<ShardEndpoint> endpoints; for (auto&& shardId : shardIds) { - const auto version = _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) - : ChunkVersion::UNSHARDED(); - endpoints.emplace_back(std::move(shardId), version); + endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId)); } return endpoints; @@ -630,19 +649,20 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetCollection() << "; metadata not found"}; } - std::set<ShardId> shardIds; - if (_routingInfo->cm()) { - _routingInfo->cm()->getAllShardIds(&shardIds); - } else { - shardIds.insert(_routingInfo->db().primary()->getId()); + if (!_routingInfo->cm()) { + return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(), + ChunkVersion::UNSHARDED(), + _routingInfo->db().databaseVersion()}}; } + std::set<ShardId> shardIds; + _routingInfo->cm()->getAllShardIds(&shardIds); + std::vector<ShardEndpoint> endpoints; for (auto&& shardId : shardIds) { - const auto version = _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) - : ChunkVersion::UNSHARDED(); - endpoints.emplace_back(std::move(shardId), version); + endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId)); } + return endpoints; } @@ -657,11 +677,13 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetAllShards( std::vector<ShardId> shardIds; Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); + // This function is only called if doing a multi write that targets more than one shard. This + // implies the collection is sharded, so we should always have a chunk manager. + invariant(_routingInfo->cm()); + std::vector<ShardEndpoint> endpoints; for (auto&& shardId : shardIds) { - const auto version = _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId) - : ChunkVersion::UNSHARDED(); - endpoints.emplace_back(std::move(shardId), version); + endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId)); } return endpoints; @@ -669,12 +691,14 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetAllShards( void ChunkManagerTargeter::noteCouldNotTarget() { dassert(_remoteShardVersions.empty()); + dassert(!_remoteDbVersion); _needsTargetingRefresh = true; } -void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint, - const StaleConfigInfo& staleInfo) { +void ChunkManagerTargeter::noteStaleShardResponse(const ShardEndpoint& endpoint, + const StaleConfigInfo& staleInfo) { dassert(!_needsTargetingRefresh); + dassert(!_remoteDbVersion); ChunkVersion remoteShardVersion; if (!staleInfo.getVersionWanted()) { @@ -703,6 +727,28 @@ void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint, } } +void ChunkManagerTargeter::noteStaleDbResponse(const ShardEndpoint& endpoint, + const StaleDbRoutingVersion& staleInfo) { + dassert(!_needsTargetingRefresh); + dassert(_remoteShardVersions.empty()); + + DatabaseVersion remoteDbVersion; + if (!staleInfo.getVersionWanted()) { + // If we don't have a vWanted sent, assume the version is higher than our current version. + remoteDbVersion = _routingInfo->db().databaseVersion(); + remoteDbVersion = databaseVersion::makeIncremented(remoteDbVersion); + } else { + remoteDbVersion = *staleInfo.getVersionWanted(); + } + + if (!_remoteDbVersion || + (_remoteDbVersion->getUuid() == remoteDbVersion.getUuid() && + _remoteDbVersion->getLastMod() < remoteDbVersion.getLastMod()) || + (_remoteDbVersion->getUuid() != remoteDbVersion.getUuid())) { + _remoteDbVersion = remoteDbVersion; + } +} + Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) { bool dummy; if (!wasChanged) { @@ -713,13 +759,14 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC LOG(4) << "ChunkManagerTargeter checking if refresh is needed, needsTargetingRefresh(" << _needsTargetingRefresh << ") remoteShardVersions empty (" - << _remoteShardVersions.empty() << ")"; + << _remoteShardVersions.empty() << ")" + << ") remoteDbVersion empty (" << !_remoteDbVersion << ")"; // // Did we have any stale config or targeting errors at all? // - if (!_needsTargetingRefresh && _remoteShardVersions.empty()) { + if (!_needsTargetingRefresh && _remoteShardVersions.empty() && !_remoteDbVersion) { return Status::OK(); } @@ -728,7 +775,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC // auto lastManager = _routingInfo->cm(); - auto lastPrimary = _routingInfo->db().primary(); + auto lastDbVersion = _routingInfo->db().databaseVersion(); auto initStatus = init(opCtx); if (!initStatus.isOK()) { @@ -747,20 +794,19 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC _needsTargetingRefresh = false; // If we couldn't target, we might need to refresh if we haven't remotely refreshed - // the - // metadata since we last got it from the cache. + // the metadata since we last got it from the cache. bool alreadyRefreshed = wasMetadataRefreshed( - lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->db().primary()); + lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); // If didn't already refresh the targeting information, refresh it if (!alreadyRefreshed) { // To match previous behavior, we just need an incremental refresh here - return _refreshNow(opCtx); + return _refreshShardVersionNow(opCtx); } *wasChanged = isMetadataDifferent( - lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->db().primary()); + lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); return Status::OK(); } else if (!_remoteShardVersions.empty()) { // If we got stale shard versions from remote shards, we may need to refresh @@ -775,21 +821,48 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC if (result == CompareResult_Unknown || result == CompareResult_LT) { // Our current shard versions aren't all comparable to the old versions, maybe drop - return _refreshNow(opCtx); + return _refreshShardVersionNow(opCtx); + } + + *wasChanged = isMetadataDifferent( + lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); + return Status::OK(); + } else if (_remoteDbVersion) { + // If we got stale dbversions from remote shards, we may need to refresh + // NOTE: Not sure yet if this can happen simultaneously with targeting issues + + CompareResult result = compareDbVersions(*_routingInfo, _remoteDbVersion); + + LOG(4) << "ChunkManagerTargeter database versions comparison result: " << (int)result; + + // Reset the version + _remoteDbVersion = boost::none; + + if (result == CompareResult_Unknown || result == CompareResult_LT) { + // Our current db version isn't always comparable to the old version, it may have been + // dropped + return _refreshDbVersionNow(opCtx); } *wasChanged = isMetadataDifferent( - lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->db().primary()); + lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); return Status::OK(); } MONGO_UNREACHABLE; } -Status ChunkManagerTargeter::_refreshNow(OperationContext* opCtx) { +Status ChunkManagerTargeter::_refreshShardVersionNow(OperationContext* opCtx) { Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(*_routingInfo)); return init(opCtx); } +Status ChunkManagerTargeter::_refreshDbVersionNow(OperationContext* opCtx) { + Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion( + _nss.db(), std::move(_routingInfo->db().databaseVersion())); + + return init(opCtx); +} + } // namespace mongo diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h index 2ace173e625..1ea977ef75f 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.h +++ b/src/mongo/s/write_ops/chunk_manager_targeter.h @@ -94,8 +94,11 @@ public: void noteCouldNotTarget() override; - void noteStaleResponse(const ShardEndpoint& endpoint, - const StaleConfigInfo& staleInfo) override; + void noteStaleShardResponse(const ShardEndpoint& endpoint, + const StaleConfigInfo& staleInfo) override; + + void noteStaleDbResponse(const ShardEndpoint& endpoint, + const StaleDbRoutingVersion& staleInfo) override; /** * Replaces the targeting information with the latest information from the cache. If this @@ -114,7 +117,12 @@ private: /** * Performs an actual refresh from the config server. */ - Status _refreshNow(OperationContext* opCtx); + Status _refreshShardVersionNow(OperationContext* opCtx); + + /** + * Performs an actual refresh from the config server. + */ + Status _refreshDbVersionNow(OperationContext* opCtx); /** * Returns a vector of ShardEndpoints where a document might need to be placed. @@ -165,6 +173,9 @@ private: // Map of shard->remote shard version reported from stale errors ShardVersionMap _remoteShardVersions; + + // remote db version reported from stale errors + boost::optional<DatabaseVersion> _remoteDbVersion; }; } // namespace mongo diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h index 61380771d2a..210406d327d 100644 --- a/src/mongo/s/write_ops/mock_ns_targeter.h +++ b/src/mongo/s/write_ops/mock_ns_targeter.h @@ -120,8 +120,13 @@ public: // No-op } - void noteStaleResponse(const ShardEndpoint& endpoint, - const StaleConfigInfo& staleInfo) override { + void noteStaleShardResponse(const ShardEndpoint& endpoint, + const StaleConfigInfo& staleInfo) override { + // No-op + } + + void noteStaleDbResponse(const ShardEndpoint& endpoint, + const StaleDbRoutingVersion& staleInfo) override { // No-op } diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index fad332f928c..595e8091a76 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -114,7 +114,8 @@ size_t WriteOp::getNumTargeted() { static bool isRetryErrCode(int errCode) { return errCode == ErrorCodes::StaleShardVersion || - errCode == ErrorCodes::CannotImplicitlyCreateCollection; + errCode == ErrorCodes::CannotImplicitlyCreateCollection || + errCode == ErrorCodes::StaleDbVersion; } static bool errorsAllSame(const vector<ChildWriteOp const*>& errOps) { |