diff options
author | Jess Fan <jess.fan@10gen.com> | 2016-07-06 12:07:20 -0400 |
---|---|---|
committer | Jess Fan <jess.fan@10gen.com> | 2016-07-12 15:56:14 -0400 |
commit | 907ed32a3a8bd19f883836013530f645522a75bc (patch) | |
tree | f2bbab0f11c0957ce1b5ffa41dc299a4c61200b0 /src | |
parent | d846997c27c6e2297f59421fbeb3979a1423a9ff (diff) | |
download | mongo-907ed32a3a8bd19f883836013530f645522a75bc.tar.gz |
SERVER-24732 add Shard::runBatchWriteCommand that checks BatchedCommandResponse
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp | 52 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_client_impl.h | 9 | ||||
-rw-r--r-- | src/mongo/s/client/shard.cpp | 35 | ||||
-rw-r--r-- | src/mongo/s/client/shard.h | 32 | ||||
-rw-r--r-- | src/mongo/s/client/shard_local.cpp | 21 | ||||
-rw-r--r-- | src/mongo/s/client/shard_local.h | 8 | ||||
-rw-r--r-- | src/mongo/s/client/shard_remote.cpp | 23 | ||||
-rw-r--r-- | src/mongo/s/client/shard_remote.h | 8 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batched_command_response.h | 8 |
9 files changed, 112 insertions, 84 deletions
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp index 5e11ad1313c..0c9e47d976e 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp @@ -1318,41 +1318,9 @@ void ShardingCatalogClientImpl::writeConfigServerDirect(OperationContext* txn, return; } - _runBatchWriteCommand(txn, batchRequest, batchResponse, Shard::RetryPolicy::kNotIdempotent); -} - -void ShardingCatalogClientImpl::_runBatchWriteCommand(OperationContext* txn, - const BatchedCommandRequest& batchRequest, - BatchedCommandResponse* batchResponse, - Shard::RetryPolicy retryPolicy) { - const std::string dbname = batchRequest.getNS().db().toString(); - invariant(dbname == "config" || dbname == "admin"); - - invariant(batchRequest.sizeWriteOps() == 1); - - const BSONObj cmdObj = batchRequest.toBSON(); - - for (int retry = 1; retry <= kMaxWriteRetry; ++retry) { - auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); - auto response = configShard->runCommand( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - dbname, - cmdObj, - Shard::RetryPolicy::kNoRetry); // We're handling our own retries here. - - Status status = Shard::CommandResponse::processBatchWriteResponse(response, batchResponse); - if (retry < kMaxWriteRetry && configShard->isRetriableError(status.code(), retryPolicy)) { - batchResponse->clear(); - LOG(2) << "Batch write command failed with retriable error and will be retried" - << causedBy(status); - continue; - } - - return; - } - - MONGO_UNREACHABLE; + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); + *batchResponse = + configShard->runBatchWriteCommand(txn, batchRequest, Shard::RetryPolicy::kNotIdempotent); } Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* txn, @@ -1374,8 +1342,8 @@ Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* txn, auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); for (int retry = 1; retry <= kMaxWriteRetry; retry++) { - BatchedCommandResponse response; - _runBatchWriteCommand(txn, request, &response, Shard::RetryPolicy::kNoRetry); + auto response = + configShard->runBatchWriteCommand(txn, request, Shard::RetryPolicy::kNoRetry); Status status = response.toStatus(); @@ -1455,8 +1423,9 @@ StatusWith<bool> ShardingCatalogClientImpl::updateConfigDocument( request.setNS(nss); request.setWriteConcern(writeConcern.toBSON()); - BatchedCommandResponse response; - _runBatchWriteCommand(txn, request, &response, Shard::RetryPolicy::kIdempotent); + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); + auto response = + configShard->runBatchWriteCommand(txn, request, Shard::RetryPolicy::kIdempotent); Status status = response.toStatus(); if (!status.isOK()) { @@ -1486,8 +1455,9 @@ Status ShardingCatalogClientImpl::removeConfigDocuments(OperationContext* txn, request.setNS(nss); request.setWriteConcern(writeConcern.toBSON()); - BatchedCommandResponse response; - _runBatchWriteCommand(txn, request, &response, Shard::RetryPolicy::kIdempotent); + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); + auto response = + configShard->runBatchWriteCommand(txn, request, Shard::RetryPolicy::kIdempotent); return response.toStatus(); } diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h index e98474a10a2..4c983640015 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h @@ -216,15 +216,6 @@ private: int cappedSize); /** - * Executes the specified batch write command on the current config server's primary and retries - * on the specified set of errors using the default retry policy. - */ - void _runBatchWriteCommand(OperationContext* txn, - const BatchedCommandRequest& request, - BatchedCommandResponse* response, - Shard::RetryPolicy retryPolicy); - - /** * Helper method for running a count command against the config server with appropriate * error handling. */ diff --git a/src/mongo/s/client/shard.cpp b/src/mongo/s/client/shard.cpp index f9361be28b0..b186d341d92 100644 --- a/src/mongo/s/client/shard.cpp +++ b/src/mongo/s/client/shard.cpp @@ -31,6 +31,8 @@ #include "mongo/platform/basic.h" #include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/log.h" @@ -103,7 +105,7 @@ StatusWith<Shard::CommandResponse> Shard::runCommand(OperationContext* txn, const BSONObj& cmdObj, RetryPolicy retryPolicy) { for (int retry = 1; retry <= kOnErrorNumRetries; ++retry) { - auto swCmdResponse = _runCommand(txn, readPref, dbName, cmdObj); + auto swCmdResponse = _runCommand(txn, readPref, dbName, cmdObj).commandResponse; auto commandStatus = _getEffectiveCommandStatus(swCmdResponse); if (retry < kOnErrorNumRetries && isRetriableError(commandStatus.code(), retryPolicy)) { @@ -117,6 +119,37 @@ StatusWith<Shard::CommandResponse> Shard::runCommand(OperationContext* txn, MONGO_UNREACHABLE; } +BatchedCommandResponse Shard::runBatchWriteCommand(OperationContext* txn, + const BatchedCommandRequest& batchRequest, + RetryPolicy retryPolicy) { + const std::string dbname = batchRequest.getNS().db().toString(); + invariant(batchRequest.sizeWriteOps() == 1); + + const BSONObj cmdObj = batchRequest.toBSON(); + + for (int retry = 1; retry <= kOnErrorNumRetries; ++retry) { + auto response = + _runCommand(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, dbname, cmdObj); + + BatchedCommandResponse batchResponse; + Status writeStatus = + CommandResponse::processBatchWriteResponse(response.commandResponse, &batchResponse); + + if (!writeStatus.isOK() && response.host) { + updateReplSetMonitor(response.host.get(), writeStatus); + } + + if (retry < kOnErrorNumRetries && isRetriableError(writeStatus.code(), retryPolicy)) { + LOG(2) << "Batch write command failed with retriable error and will be retried" + << causedBy(writeStatus); + continue; + } + + return batchResponse; + } + MONGO_UNREACHABLE; +} + StatusWith<Shard::QueryResponse> Shard::exhaustiveFindOnConfig( OperationContext* txn, const ReadPreferenceSetting& readPref, diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h index d2619f68b08..af4eb97fcd0 100644 --- a/src/mongo/s/client/shard.h +++ b/src/mongo/s/client/shard.h @@ -28,6 +28,8 @@ #pragma once +#include <boost/optional.hpp> + #include "mongo/bson/bsonobj.h" #include "mongo/client/connection_string.h" #include "mongo/client/read_preference.h" @@ -38,6 +40,7 @@ namespace mongo { +class BatchedCommandRequest; class BatchedCommandResponse; class OperationContext; class RemoteCommandTargeter; @@ -145,6 +148,14 @@ public: RetryPolicy retryPolicy); /** + * Executes the specified batch write command on this shard's primary and retries on the + * specified set of errors using the specified retry policy. + */ + BatchedCommandResponse runBatchWriteCommand(OperationContext* txn, + const BatchedCommandRequest& batchRequest, + RetryPolicy retryPolicy); + + /** * Warning: This method exhausts the cursor and pulls all data into memory. * Do not use other than for very small (i.e., admin or metadata) collections. * Performs retries if the query fails in accordance with the kIdempotent RetryPolicy. @@ -172,13 +183,26 @@ public: protected: + struct HostWithResponse { + HostWithResponse(boost::optional<HostAndPort> _host, + StatusWith<CommandResponse> _commandResponse) + : host(std::move(_host)), commandResponse(std::move(_commandResponse)) {} + + boost::optional<HostAndPort> host; + StatusWith<CommandResponse> commandResponse; + }; + Shard(const ShardId& id); private: - virtual StatusWith<CommandResponse> _runCommand(OperationContext* txn, - const ReadPreferenceSetting& readPref, - const std::string& dbname, - const BSONObj& cmdObj) = 0; + /** + * Paired HostWithResponse output exposes RemoteShard's host for updateReplSetMonitor. + * LocalShard will not return a host. + */ + virtual HostWithResponse _runCommand(OperationContext* txn, + const ReadPreferenceSetting& readPref, + const std::string& dbname, + const BSONObj& cmdObj) = 0; virtual StatusWith<QueryResponse> _exhaustiveFindOnConfig( OperationContext* txn, diff --git a/src/mongo/s/client/shard_local.cpp b/src/mongo/s/client/shard_local.cpp index 41711be1f7f..3eca849b2b5 100644 --- a/src/mongo/s/client/shard_local.cpp +++ b/src/mongo/s/client/shard_local.cpp @@ -28,6 +28,8 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding +#include <boost/none_t.hpp> + #include "mongo/platform/basic.h" #include "mongo/s/client/shard_local.h" @@ -118,10 +120,10 @@ repl::OpTime ShardLocal::_getLastOpTime() { return _lastOpTime; } -StatusWith<Shard::CommandResponse> ShardLocal::_runCommand(OperationContext* txn, - const ReadPreferenceSetting& unused, - const std::string& dbName, - const BSONObj& cmdObj) { +Shard::HostWithResponse ShardLocal::_runCommand(OperationContext* txn, + const ReadPreferenceSetting& unused, + const std::string& dbName, + const BSONObj& cmdObj) { repl::OpTime currentOpTimeFromClient = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); ON_BLOCK_EXIT([this, &txn, ¤tOpTimeFromClient] { @@ -141,12 +143,13 @@ StatusWith<Shard::CommandResponse> ShardLocal::_runCommand(OperationContext* txn writeConcernStatus = getWriteConcernStatusFromCommandResult(responseReply); } - return Shard::CommandResponse{std::move(responseReply), - std::move(responseMetadata), - std::move(commandStatus), - std::move(writeConcernStatus)}; + return Shard::HostWithResponse(boost::none, + Shard::CommandResponse{std::move(responseReply), + std::move(responseMetadata), + std::move(commandStatus), + std::move(writeConcernStatus)}); } catch (const DBException& ex) { - return ex.toStatus(); + return Shard::HostWithResponse(boost::none, ex.toStatus()); } } diff --git a/src/mongo/s/client/shard_local.h b/src/mongo/s/client/shard_local.h index 449a5d7a037..27e2edbea40 100644 --- a/src/mongo/s/client/shard_local.h +++ b/src/mongo/s/client/shard_local.h @@ -64,10 +64,10 @@ public: bool unique) override; private: - StatusWith<Shard::CommandResponse> _runCommand(OperationContext* txn, - const ReadPreferenceSetting& unused, - const std::string& dbName, - const BSONObj& cmdObj) final; + Shard::HostWithResponse _runCommand(OperationContext* txn, + const ReadPreferenceSetting& unused, + const std::string& dbName, + const BSONObj& cmdObj) final; StatusWith<Shard::QueryResponse> _exhaustiveFindOnConfig( OperationContext* txn, diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 07a57f4ea71..9f6f9fd6aa0 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -196,16 +196,16 @@ const BSONObj& ShardRemote::_getMetadataForCommand(const ReadPreferenceSetting& } } -StatusWith<Shard::CommandResponse> ShardRemote::_runCommand(OperationContext* txn, - const ReadPreferenceSetting& readPref, - const string& dbName, - const BSONObj& cmdObj) { +Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn, + const ReadPreferenceSetting& readPref, + const string& dbName, + const BSONObj& cmdObj) { const BSONObj cmdWithMaxTimeMS = (isConfig() ? appendMaxTimeToCmdObj(txn, cmdObj) : cmdObj); const auto host = _targeter->findHost(readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(txn)); if (!host.isOK()) { - return host.getStatus(); + return Shard::HostWithResponse(boost::none, host.getStatus()); } RemoteCommandRequest request(host.getValue(), @@ -222,7 +222,7 @@ StatusWith<Shard::CommandResponse> ShardRemote::_runCommand(OperationContext* tx request, [&swResponse](const RemoteCommandCallbackArgs& args) { swResponse = args.response; }); if (!callStatus.isOK()) { - return callStatus.getStatus(); + return Shard::HostWithResponse(host.getValue(), callStatus.getStatus()); } // Block until the command is carried out @@ -234,7 +234,7 @@ StatusWith<Shard::CommandResponse> ShardRemote::_runCommand(OperationContext* tx if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) { LOG(0) << "Operation timed out with status " << swResponse.getStatus(); } - return swResponse.getStatus(); + return Shard::HostWithResponse(host.getValue(), swResponse.getStatus()); } BSONObj responseObj = swResponse.getValue().data.getOwned(); @@ -246,10 +246,11 @@ StatusWith<Shard::CommandResponse> ShardRemote::_runCommand(OperationContext* tx updateReplSetMonitor(host.getValue(), commandStatus); updateReplSetMonitor(host.getValue(), writeConcernStatus); - return CommandResponse(std::move(responseObj), - std::move(responseMetadata), - std::move(commandStatus), - std::move(writeConcernStatus)); + return Shard::HostWithResponse(host.getValue(), + CommandResponse(std::move(responseObj), + std::move(responseMetadata), + std::move(commandStatus), + std::move(writeConcernStatus))); } StatusWith<Shard::QueryResponse> ShardRemote::_exhaustiveFindOnConfig( diff --git a/src/mongo/s/client/shard_remote.h b/src/mongo/s/client/shard_remote.h index 339f0ae1b31..25b610e534b 100644 --- a/src/mongo/s/client/shard_remote.h +++ b/src/mongo/s/client/shard_remote.h @@ -85,10 +85,10 @@ private: */ const BSONObj& _getMetadataForCommand(const ReadPreferenceSetting& readPref); - StatusWith<CommandResponse> _runCommand(OperationContext* txn, - const ReadPreferenceSetting& readPref, - const std::string& dbname, - const BSONObj& cmdObj) final; + Shard::HostWithResponse _runCommand(OperationContext* txn, + const ReadPreferenceSetting& readPref, + const std::string& dbname, + const BSONObj& cmdObj) final; StatusWith<QueryResponse> _exhaustiveFindOnConfig( OperationContext* txn, diff --git a/src/mongo/s/write_ops/batched_command_response.h b/src/mongo/s/write_ops/batched_command_response.h index 58f2631833f..3f0fef9491f 100644 --- a/src/mongo/s/write_ops/batched_command_response.h +++ b/src/mongo/s/write_ops/batched_command_response.h @@ -46,7 +46,6 @@ namespace mongo { * the response side. */ class BatchedCommandResponse : public BSONSerializable { - MONGO_DISALLOW_COPYING(BatchedCommandResponse); public: // @@ -70,6 +69,13 @@ public: BatchedCommandResponse(); virtual ~BatchedCommandResponse(); + // + // BatchedCommandResponse should have a move constructor but not a copy constructor + // + + BatchedCommandResponse(BatchedCommandResponse&&) = default; + BatchedCommandResponse& operator=(BatchedCommandResponse&&) = default; + /** Copies all the fields present in 'this' to 'other'. */ void cloneTo(BatchedCommandResponse* other) const; |