summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJess Fan <jess.fan@10gen.com>2016-07-06 12:07:20 -0400
committerJess Fan <jess.fan@10gen.com>2016-07-12 15:56:14 -0400
commit907ed32a3a8bd19f883836013530f645522a75bc (patch)
treef2bbab0f11c0957ce1b5ffa41dc299a4c61200b0 /src
parentd846997c27c6e2297f59421fbeb3979a1423a9ff (diff)
downloadmongo-907ed32a3a8bd19f883836013530f645522a75bc.tar.gz
SERVER-24732 add Shard::runBatchWriteCommand that checks BatchedCommandResponse
Diffstat (limited to 'src')
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp52
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.h9
-rw-r--r--src/mongo/s/client/shard.cpp35
-rw-r--r--src/mongo/s/client/shard.h32
-rw-r--r--src/mongo/s/client/shard_local.cpp21
-rw-r--r--src/mongo/s/client/shard_local.h8
-rw-r--r--src/mongo/s/client/shard_remote.cpp23
-rw-r--r--src/mongo/s/client/shard_remote.h8
-rw-r--r--src/mongo/s/write_ops/batched_command_response.h8
9 files changed, 112 insertions, 84 deletions
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
index 5e11ad1313c..0c9e47d976e 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
@@ -1318,41 +1318,9 @@ void ShardingCatalogClientImpl::writeConfigServerDirect(OperationContext* txn,
return;
}
- _runBatchWriteCommand(txn, batchRequest, batchResponse, Shard::RetryPolicy::kNotIdempotent);
-}
-
-void ShardingCatalogClientImpl::_runBatchWriteCommand(OperationContext* txn,
- const BatchedCommandRequest& batchRequest,
- BatchedCommandResponse* batchResponse,
- Shard::RetryPolicy retryPolicy) {
- const std::string dbname = batchRequest.getNS().db().toString();
- invariant(dbname == "config" || dbname == "admin");
-
- invariant(batchRequest.sizeWriteOps() == 1);
-
- const BSONObj cmdObj = batchRequest.toBSON();
-
- for (int retry = 1; retry <= kMaxWriteRetry; ++retry) {
- auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard();
- auto response = configShard->runCommand(
- txn,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- dbname,
- cmdObj,
- Shard::RetryPolicy::kNoRetry); // We're handling our own retries here.
-
- Status status = Shard::CommandResponse::processBatchWriteResponse(response, batchResponse);
- if (retry < kMaxWriteRetry && configShard->isRetriableError(status.code(), retryPolicy)) {
- batchResponse->clear();
- LOG(2) << "Batch write command failed with retriable error and will be retried"
- << causedBy(status);
- continue;
- }
-
- return;
- }
-
- MONGO_UNREACHABLE;
+ auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard();
+ *batchResponse =
+ configShard->runBatchWriteCommand(txn, batchRequest, Shard::RetryPolicy::kNotIdempotent);
}
Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* txn,
@@ -1374,8 +1342,8 @@ Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* txn,
auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard();
for (int retry = 1; retry <= kMaxWriteRetry; retry++) {
- BatchedCommandResponse response;
- _runBatchWriteCommand(txn, request, &response, Shard::RetryPolicy::kNoRetry);
+ auto response =
+ configShard->runBatchWriteCommand(txn, request, Shard::RetryPolicy::kNoRetry);
Status status = response.toStatus();
@@ -1455,8 +1423,9 @@ StatusWith<bool> ShardingCatalogClientImpl::updateConfigDocument(
request.setNS(nss);
request.setWriteConcern(writeConcern.toBSON());
- BatchedCommandResponse response;
- _runBatchWriteCommand(txn, request, &response, Shard::RetryPolicy::kIdempotent);
+ auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard();
+ auto response =
+ configShard->runBatchWriteCommand(txn, request, Shard::RetryPolicy::kIdempotent);
Status status = response.toStatus();
if (!status.isOK()) {
@@ -1486,8 +1455,9 @@ Status ShardingCatalogClientImpl::removeConfigDocuments(OperationContext* txn,
request.setNS(nss);
request.setWriteConcern(writeConcern.toBSON());
- BatchedCommandResponse response;
- _runBatchWriteCommand(txn, request, &response, Shard::RetryPolicy::kIdempotent);
+ auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard();
+ auto response =
+ configShard->runBatchWriteCommand(txn, request, Shard::RetryPolicy::kIdempotent);
return response.toStatus();
}
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
index e98474a10a2..4c983640015 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
@@ -216,15 +216,6 @@ private:
int cappedSize);
/**
- * Executes the specified batch write command on the current config server's primary and retries
- * on the specified set of errors using the default retry policy.
- */
- void _runBatchWriteCommand(OperationContext* txn,
- const BatchedCommandRequest& request,
- BatchedCommandResponse* response,
- Shard::RetryPolicy retryPolicy);
-
- /**
* Helper method for running a count command against the config server with appropriate
* error handling.
*/
diff --git a/src/mongo/s/client/shard.cpp b/src/mongo/s/client/shard.cpp
index f9361be28b0..b186d341d92 100644
--- a/src/mongo/s/client/shard.cpp
+++ b/src/mongo/s/client/shard.cpp
@@ -31,6 +31,8 @@
#include "mongo/platform/basic.h"
#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/log.h"
@@ -103,7 +105,7 @@ StatusWith<Shard::CommandResponse> Shard::runCommand(OperationContext* txn,
const BSONObj& cmdObj,
RetryPolicy retryPolicy) {
for (int retry = 1; retry <= kOnErrorNumRetries; ++retry) {
- auto swCmdResponse = _runCommand(txn, readPref, dbName, cmdObj);
+ auto swCmdResponse = _runCommand(txn, readPref, dbName, cmdObj).commandResponse;
auto commandStatus = _getEffectiveCommandStatus(swCmdResponse);
if (retry < kOnErrorNumRetries && isRetriableError(commandStatus.code(), retryPolicy)) {
@@ -117,6 +119,37 @@ StatusWith<Shard::CommandResponse> Shard::runCommand(OperationContext* txn,
MONGO_UNREACHABLE;
}
+BatchedCommandResponse Shard::runBatchWriteCommand(OperationContext* txn,
+ const BatchedCommandRequest& batchRequest,
+ RetryPolicy retryPolicy) {
+ const std::string dbname = batchRequest.getNS().db().toString();
+ invariant(batchRequest.sizeWriteOps() == 1);
+
+ const BSONObj cmdObj = batchRequest.toBSON();
+
+ for (int retry = 1; retry <= kOnErrorNumRetries; ++retry) {
+ auto response =
+ _runCommand(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, dbname, cmdObj);
+
+ BatchedCommandResponse batchResponse;
+ Status writeStatus =
+ CommandResponse::processBatchWriteResponse(response.commandResponse, &batchResponse);
+
+ if (!writeStatus.isOK() && response.host) {
+ updateReplSetMonitor(response.host.get(), writeStatus);
+ }
+
+ if (retry < kOnErrorNumRetries && isRetriableError(writeStatus.code(), retryPolicy)) {
+ LOG(2) << "Batch write command failed with retriable error and will be retried"
+ << causedBy(writeStatus);
+ continue;
+ }
+
+ return batchResponse;
+ }
+ MONGO_UNREACHABLE;
+}
+
StatusWith<Shard::QueryResponse> Shard::exhaustiveFindOnConfig(
OperationContext* txn,
const ReadPreferenceSetting& readPref,
diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h
index d2619f68b08..af4eb97fcd0 100644
--- a/src/mongo/s/client/shard.h
+++ b/src/mongo/s/client/shard.h
@@ -28,6 +28,8 @@
#pragma once
+#include <boost/optional.hpp>
+
#include "mongo/bson/bsonobj.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/read_preference.h"
@@ -38,6 +40,7 @@
namespace mongo {
+class BatchedCommandRequest;
class BatchedCommandResponse;
class OperationContext;
class RemoteCommandTargeter;
@@ -145,6 +148,14 @@ public:
RetryPolicy retryPolicy);
/**
+ * Executes the specified batch write command on this shard's primary and retries on the
+ * specified set of errors using the specified retry policy.
+ */
+ BatchedCommandResponse runBatchWriteCommand(OperationContext* txn,
+ const BatchedCommandRequest& batchRequest,
+ RetryPolicy retryPolicy);
+
+ /**
* Warning: This method exhausts the cursor and pulls all data into memory.
* Do not use other than for very small (i.e., admin or metadata) collections.
* Performs retries if the query fails in accordance with the kIdempotent RetryPolicy.
@@ -172,13 +183,26 @@ public:
protected:
+ struct HostWithResponse {
+ HostWithResponse(boost::optional<HostAndPort> _host,
+ StatusWith<CommandResponse> _commandResponse)
+ : host(std::move(_host)), commandResponse(std::move(_commandResponse)) {}
+
+ boost::optional<HostAndPort> host;
+ StatusWith<CommandResponse> commandResponse;
+ };
+
Shard(const ShardId& id);
private:
- virtual StatusWith<CommandResponse> _runCommand(OperationContext* txn,
- const ReadPreferenceSetting& readPref,
- const std::string& dbname,
- const BSONObj& cmdObj) = 0;
+ /**
+ * Paired HostWithResponse output exposes RemoteShard's host for updateReplSetMonitor.
+ * LocalShard will not return a host.
+ */
+ virtual HostWithResponse _runCommand(OperationContext* txn,
+ const ReadPreferenceSetting& readPref,
+ const std::string& dbname,
+ const BSONObj& cmdObj) = 0;
virtual StatusWith<QueryResponse> _exhaustiveFindOnConfig(
OperationContext* txn,
diff --git a/src/mongo/s/client/shard_local.cpp b/src/mongo/s/client/shard_local.cpp
index 41711be1f7f..3eca849b2b5 100644
--- a/src/mongo/s/client/shard_local.cpp
+++ b/src/mongo/s/client/shard_local.cpp
@@ -28,6 +28,8 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+#include <boost/none_t.hpp>
+
#include "mongo/platform/basic.h"
#include "mongo/s/client/shard_local.h"
@@ -118,10 +120,10 @@ repl::OpTime ShardLocal::_getLastOpTime() {
return _lastOpTime;
}
-StatusWith<Shard::CommandResponse> ShardLocal::_runCommand(OperationContext* txn,
- const ReadPreferenceSetting& unused,
- const std::string& dbName,
- const BSONObj& cmdObj) {
+Shard::HostWithResponse ShardLocal::_runCommand(OperationContext* txn,
+ const ReadPreferenceSetting& unused,
+ const std::string& dbName,
+ const BSONObj& cmdObj) {
repl::OpTime currentOpTimeFromClient =
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
ON_BLOCK_EXIT([this, &txn, &currentOpTimeFromClient] {
@@ -141,12 +143,13 @@ StatusWith<Shard::CommandResponse> ShardLocal::_runCommand(OperationContext* txn
writeConcernStatus = getWriteConcernStatusFromCommandResult(responseReply);
}
- return Shard::CommandResponse{std::move(responseReply),
- std::move(responseMetadata),
- std::move(commandStatus),
- std::move(writeConcernStatus)};
+ return Shard::HostWithResponse(boost::none,
+ Shard::CommandResponse{std::move(responseReply),
+ std::move(responseMetadata),
+ std::move(commandStatus),
+ std::move(writeConcernStatus)});
} catch (const DBException& ex) {
- return ex.toStatus();
+ return Shard::HostWithResponse(boost::none, ex.toStatus());
}
}
diff --git a/src/mongo/s/client/shard_local.h b/src/mongo/s/client/shard_local.h
index 449a5d7a037..27e2edbea40 100644
--- a/src/mongo/s/client/shard_local.h
+++ b/src/mongo/s/client/shard_local.h
@@ -64,10 +64,10 @@ public:
bool unique) override;
private:
- StatusWith<Shard::CommandResponse> _runCommand(OperationContext* txn,
- const ReadPreferenceSetting& unused,
- const std::string& dbName,
- const BSONObj& cmdObj) final;
+ Shard::HostWithResponse _runCommand(OperationContext* txn,
+ const ReadPreferenceSetting& unused,
+ const std::string& dbName,
+ const BSONObj& cmdObj) final;
StatusWith<Shard::QueryResponse> _exhaustiveFindOnConfig(
OperationContext* txn,
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 07a57f4ea71..9f6f9fd6aa0 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -196,16 +196,16 @@ const BSONObj& ShardRemote::_getMetadataForCommand(const ReadPreferenceSetting&
}
}
-StatusWith<Shard::CommandResponse> ShardRemote::_runCommand(OperationContext* txn,
- const ReadPreferenceSetting& readPref,
- const string& dbName,
- const BSONObj& cmdObj) {
+Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn,
+ const ReadPreferenceSetting& readPref,
+ const string& dbName,
+ const BSONObj& cmdObj) {
const BSONObj cmdWithMaxTimeMS = (isConfig() ? appendMaxTimeToCmdObj(txn, cmdObj) : cmdObj);
const auto host =
_targeter->findHost(readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
if (!host.isOK()) {
- return host.getStatus();
+ return Shard::HostWithResponse(boost::none, host.getStatus());
}
RemoteCommandRequest request(host.getValue(),
@@ -222,7 +222,7 @@ StatusWith<Shard::CommandResponse> ShardRemote::_runCommand(OperationContext* tx
request,
[&swResponse](const RemoteCommandCallbackArgs& args) { swResponse = args.response; });
if (!callStatus.isOK()) {
- return callStatus.getStatus();
+ return Shard::HostWithResponse(host.getValue(), callStatus.getStatus());
}
// Block until the command is carried out
@@ -234,7 +234,7 @@ StatusWith<Shard::CommandResponse> ShardRemote::_runCommand(OperationContext* tx
if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) {
LOG(0) << "Operation timed out with status " << swResponse.getStatus();
}
- return swResponse.getStatus();
+ return Shard::HostWithResponse(host.getValue(), swResponse.getStatus());
}
BSONObj responseObj = swResponse.getValue().data.getOwned();
@@ -246,10 +246,11 @@ StatusWith<Shard::CommandResponse> ShardRemote::_runCommand(OperationContext* tx
updateReplSetMonitor(host.getValue(), commandStatus);
updateReplSetMonitor(host.getValue(), writeConcernStatus);
- return CommandResponse(std::move(responseObj),
- std::move(responseMetadata),
- std::move(commandStatus),
- std::move(writeConcernStatus));
+ return Shard::HostWithResponse(host.getValue(),
+ CommandResponse(std::move(responseObj),
+ std::move(responseMetadata),
+ std::move(commandStatus),
+ std::move(writeConcernStatus)));
}
StatusWith<Shard::QueryResponse> ShardRemote::_exhaustiveFindOnConfig(
diff --git a/src/mongo/s/client/shard_remote.h b/src/mongo/s/client/shard_remote.h
index 339f0ae1b31..25b610e534b 100644
--- a/src/mongo/s/client/shard_remote.h
+++ b/src/mongo/s/client/shard_remote.h
@@ -85,10 +85,10 @@ private:
*/
const BSONObj& _getMetadataForCommand(const ReadPreferenceSetting& readPref);
- StatusWith<CommandResponse> _runCommand(OperationContext* txn,
- const ReadPreferenceSetting& readPref,
- const std::string& dbname,
- const BSONObj& cmdObj) final;
+ Shard::HostWithResponse _runCommand(OperationContext* txn,
+ const ReadPreferenceSetting& readPref,
+ const std::string& dbname,
+ const BSONObj& cmdObj) final;
StatusWith<QueryResponse> _exhaustiveFindOnConfig(
OperationContext* txn,
diff --git a/src/mongo/s/write_ops/batched_command_response.h b/src/mongo/s/write_ops/batched_command_response.h
index 58f2631833f..3f0fef9491f 100644
--- a/src/mongo/s/write_ops/batched_command_response.h
+++ b/src/mongo/s/write_ops/batched_command_response.h
@@ -46,7 +46,6 @@ namespace mongo {
* the response side.
*/
class BatchedCommandResponse : public BSONSerializable {
- MONGO_DISALLOW_COPYING(BatchedCommandResponse);
public:
//
@@ -70,6 +69,13 @@ public:
BatchedCommandResponse();
virtual ~BatchedCommandResponse();
+ //
+ // BatchedCommandResponse should have a move constructor but not a copy constructor
+ //
+
+ BatchedCommandResponse(BatchedCommandResponse&&) = default;
+ BatchedCommandResponse& operator=(BatchedCommandResponse&&) = default;
+
/** Copies all the fields present in 'this' to 'other'. */
void cloneTo(BatchedCommandResponse* other) const;