summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-03-02 14:19:29 -0500
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-03-06 11:14:56 -0500
commita88165fb9f7e038b8ba0a30408a47d312a8cfb43 (patch)
tree414dc299c4ac368ba911c18767bcf12dfa226567
parent5a3082bc52cdcf8769021aa9256daabd43162a53 (diff)
downloadmongo-a88165fb9f7e038b8ba0a30408a47d312a8cfb43.tar.gz
SERVER-28163 make cluster_get_last_error_cmd.cpp use ARS instead of DBClientMultiCommand
-rw-r--r--src/mongo/s/async_requests_sender.cpp20
-rw-r--r--src/mongo/s/async_requests_sender.h12
-rw-r--r--src/mongo/s/commands/cluster_get_last_error_cmd.cpp169
-rw-r--r--src/mongo/s/write_ops/SConscript1
-rw-r--r--src/mongo/s/write_ops/batch_downconvert.cpp112
-rw-r--r--src/mongo/s/write_ops/batch_downconvert.h16
6 files changed, 160 insertions, 170 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index a3163050208..3c3cd194f78 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -53,7 +53,7 @@ const int kMaxNumFailedHostRetryAttempts = 3;
AsyncRequestsSender::AsyncRequestsSender(OperationContext* txn,
executor::TaskExecutor* executor,
- std::string db,
+ StringData db,
const std::vector<AsyncRequestsSender::Request>& requests,
const ReadPreferenceSetting& readPreference,
bool allowPartialResults)
@@ -82,6 +82,8 @@ AsyncRequestsSender::~AsyncRequestsSender() {
std::vector<AsyncRequestsSender::Response> AsyncRequestsSender::waitForResponses(
OperationContext* txn) {
+ invariant(!_remotes.empty());
+
// Until all remotes have received a response or error, keep scheduling retries and waiting on
// outstanding requests.
while (!_done()) {
@@ -98,11 +100,16 @@ std::vector<AsyncRequestsSender::Response> AsyncRequestsSender::waitForResponses
invariant(remote.swResponse);
if (remote.swResponse->isOK()) {
invariant(remote.shardHostAndPort);
- responses.emplace_back(remote.swResponse->getValue(), *remote.shardHostAndPort);
+ responses.emplace_back(std::move(remote.swResponse->getValue()),
+ std::move(*remote.shardHostAndPort));
} else {
- responses.emplace_back(remote.swResponse->getStatus());
+ responses.emplace_back(std::move(remote.swResponse->getStatus()),
+ std::move(remote.shardHostAndPort));
}
}
+
+ _remotes.clear();
+
return responses;
}
@@ -196,7 +203,7 @@ Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* txn, size_
}
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _db, remote.cmdObj, _metadataObj, txn);
+ remote.getTargetHost(), _db.toString(), remote.cmdObj, _metadataObj, txn);
auto callbackStatus = _executor->scheduleRemoteCommand(
request,
@@ -298,9 +305,10 @@ AsyncRequestsSender::Request::Request(ShardId shardId, BSONObj cmdObj)
: shardId(shardId), cmdObj(cmdObj) {}
AsyncRequestsSender::Response::Response(executor::RemoteCommandResponse response, HostAndPort hp)
- : swResponse(response), shardHostAndPort(hp) {}
+ : swResponse(std::move(response)), shardHostAndPort(std::move(hp)) {}
-AsyncRequestsSender::Response::Response(Status status) : swResponse(status) {}
+AsyncRequestsSender::Response::Response(Status status, boost::optional<HostAndPort> hp)
+ : swResponse(std::move(status)), shardHostAndPort(std::move(hp)) {}
AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj)
: shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {}
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index 6bbf1f9d0f3..daf48c558aa 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -89,13 +89,13 @@ public:
Response(executor::RemoteCommandResponse response, HostAndPort hp);
// Constructor that specifies the reason the response was not successfully received.
- Response(Status status);
+ Response(Status status, boost::optional<HostAndPort> hp);
// The response or error from the remote.
StatusWith<executor::RemoteCommandResponse> swResponse;
- // The exact host on which the remote command was run. Is unset if swResponse has a non-OK
- // status.
+ // The exact host on which the remote command was run. Is unset if the shard could not be
+ // found or no shard hosts matching the readPreference could be found.
boost::optional<HostAndPort> shardHostAndPort;
};
@@ -105,7 +105,7 @@ public:
*/
AsyncRequestsSender(OperationContext* txn,
executor::TaskExecutor* executor,
- std::string db,
+ StringData db,
const std::vector<AsyncRequestsSender::Request>& requests,
const ReadPreferenceSetting& readPreference,
bool allowPartialResults = false);
@@ -119,6 +119,8 @@ public:
* If we were killed, returns immediately.
* If we were interrupted, returns when any outstanding requests have completed.
* Otherwise, returns when each remote has received a response or error.
+ *
+ * Must only be called once.
*/
std::vector<Response> waitForResponses(OperationContext* txn);
@@ -259,7 +261,7 @@ private:
BSONObj _metadataObj;
// The database against which the commands are run.
- const std::string _db;
+ const StringData _db;
// The readPreference to use for all requests.
ReadPreferenceSetting _readPreference;
diff --git a/src/mongo/s/commands/cluster_get_last_error_cmd.cpp b/src/mongo/s/commands/cluster_get_last_error_cmd.cpp
index d1f01d4ebc0..2b4e386b312 100644
--- a/src/mongo/s/commands/cluster_get_last_error_cmd.cpp
+++ b/src/mongo/s/commands/cluster_get_last_error_cmd.cpp
@@ -26,6 +26,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
#include "mongo/platform/basic.h"
#include <vector>
@@ -34,15 +36,149 @@
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/lasterror.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/async_requests_sender.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_last_error_info.h"
-#include "mongo/s/commands/dbclient_multi_command.h"
#include "mongo/s/grid.h"
#include "mongo/s/write_ops/batch_downconvert.h"
+#include "mongo/util/log.h"
namespace mongo {
namespace {
+using std::vector;
+
+// Adds a wOpTime and a wElectionId field to a set of gle options
+BSONObj buildGLECmdWithOpTime(const BSONObj& gleOptions,
+ const repl::OpTime& opTime,
+ const OID& electionId) {
+ BSONObjBuilder builder;
+ BSONObjIterator it(gleOptions);
+
+ for (int i = 0; it.more(); ++i) {
+ BSONElement el = it.next();
+
+ // Make sure first element is getLastError : 1
+ if (i == 0) {
+ StringData elName(el.fieldName());
+ if (!elName.equalCaseInsensitive("getLastError")) {
+ builder.append("getLastError", 1);
+ }
+ }
+
+ builder.append(el);
+ }
+ opTime.append(&builder, "wOpTime");
+ builder.appendOID("wElectionId", const_cast<OID*>(&electionId));
+ return builder.obj();
+}
+
+/**
+ * Uses GLE and the shard hosts and opTimes last written by write commands to enforce a
+ * write concern across the previously used shards.
+ *
+ * Returns OK with the LegacyWCResponses containing only write concern error information
+ * Returns !OK if there was an error getting a GLE response
+ */
+Status enforceLegacyWriteConcern(OperationContext* txn,
+ StringData dbName,
+ const BSONObj& options,
+ const HostOpTimeMap& hostOpTimes,
+ std::vector<LegacyWCResponse>* legacyWCResponses) {
+ if (hostOpTimes.empty()) {
+ return Status::OK();
+ }
+
+ // Assemble requests
+ std::vector<AsyncRequestsSender::Request> requests;
+ for (HostOpTimeMap::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end(); ++it) {
+ const ConnectionString& shardConnStr = it->first;
+ const auto& hot = it->second;
+ const repl::OpTime& opTime = hot.opTime;
+ const OID& electionId = hot.electionId;
+
+ auto swShard = Grid::get(txn)->shardRegistry()->getShard(txn, shardConnStr.toString());
+ if (!swShard.isOK()) {
+ return swShard.getStatus();
+ }
+
+ LOG(3) << "enforcing write concern " << options << " on " << shardConnStr.toString()
+ << " at opTime " << opTime.getTimestamp().toStringPretty() << " with electionID "
+ << electionId;
+
+ BSONObj gleCmd = buildGLECmdWithOpTime(options, opTime, electionId);
+ requests.emplace_back(swShard.getValue()->getId(), gleCmd);
+ }
+
+ // Send the requests and wait to receive all the responses.
+
+ const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
+ AsyncRequestsSender ars(
+ txn, Grid::get(txn)->getExecutorPool()->getArbitraryExecutor(), dbName, requests, readPref);
+ auto responses = ars.waitForResponses(txn);
+
+ // Parse the responses.
+
+ vector<Status> failedStatuses;
+ for (const auto& response : responses) {
+ // Return immediately if we failed to contact a shard.
+ if (!response.shardHostAndPort) {
+ invariant(!response.swResponse.isOK());
+ return response.swResponse.getStatus();
+ }
+
+ // We successfully contacted the shard, but it returned some error.
+ if (!response.swResponse.isOK()) {
+ failedStatuses.push_back(std::move(response.swResponse.getStatus()));
+ continue;
+ }
+
+ BSONObj gleResponse = stripNonWCInfo(response.swResponse.getValue().data);
+
+ // Use the downconversion tools to determine if this GLE response is ok, a
+ // write concern error, or an unknown error we should immediately abort for.
+ GLEErrors errors;
+ Status extractStatus = extractGLEErrors(gleResponse, &errors);
+ if (!extractStatus.isOK()) {
+ failedStatuses.push_back(extractStatus);
+ continue;
+ }
+
+ LegacyWCResponse wcResponse;
+ invariant(response.shardHostAndPort);
+ wcResponse.shardHost = response.shardHostAndPort->toString();
+ wcResponse.gleResponse = gleResponse;
+ if (errors.wcError.get()) {
+ wcResponse.errToReport = errors.wcError->getErrMessage();
+ }
+
+ legacyWCResponses->push_back(wcResponse);
+ }
+
+ if (failedStatuses.empty()) {
+ return Status::OK();
+ }
+
+ StringBuilder builder;
+ builder << "could not enforce write concern";
+
+ for (vector<Status>::const_iterator it = failedStatuses.begin(); it != failedStatuses.end();
+ ++it) {
+ const Status& failedStatus = *it;
+ if (it == failedStatuses.begin()) {
+ builder << causedBy(failedStatus.toString());
+ } else {
+ builder << ":: and ::" << failedStatus.toString();
+ }
+ }
+
+ return Status(failedStatuses.size() == 1u ? failedStatuses.front().code()
+ : ErrorCodes::MultipleErrorsOccurred,
+ builder.str());
+}
+
+
class GetLastErrorCmd : public Command {
public:
GetLastErrorCmd() : Command("getLastError", false, "getlasterror") {}
@@ -101,38 +237,9 @@ public:
// For compatibility with 2.4 sharded GLE, we always enforce the write concern
// across all shards.
const HostOpTimeMap hostOpTimes(ClusterLastErrorInfo::get(cc()).getPrevHostOpTimes());
- HostOpTimeMap resolvedHostOpTimes;
-
- Status status(Status::OK());
- for (HostOpTimeMap::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end();
- ++it) {
- const ConnectionString& shardEndpoint = it->first;
- const HostOpTime& hot = it->second;
-
- const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
- auto shardStatus = grid.shardRegistry()->getShard(txn, shardEndpoint.toString());
- if (!shardStatus.isOK()) {
- status = shardStatus.getStatus();
- break;
- }
- auto shard = shardStatus.getValue();
- auto swHostAndPort = shard->getTargeter()->findHostNoWait(readPref);
- if (!swHostAndPort.isOK()) {
- status = swHostAndPort.getStatus();
- break;
- }
-
- ConnectionString resolvedHost(swHostAndPort.getValue());
- resolvedHostOpTimes[resolvedHost] = hot;
- }
-
- DBClientMultiCommand dispatcher;
std::vector<LegacyWCResponse> wcResponses;
- if (status.isOK()) {
- status = enforceLegacyWriteConcern(
- &dispatcher, dbname, cmdObj, resolvedHostOpTimes, &wcResponses);
- }
+ auto status = enforceLegacyWriteConcern(txn, dbname, cmdObj, hostOpTimes, &wcResponses);
// Don't forget about our last hosts, reset the client info
ClusterLastErrorInfo::get(cc()).disableForCommand();
diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript
index d424182714b..b9f701f7b77 100644
--- a/src/mongo/s/write_ops/SConscript
+++ b/src/mongo/s/write_ops/SConscript
@@ -35,6 +35,7 @@ env.Library(
LIBDEPS=[
'batch_write_types',
'$BUILD_DIR/mongo/client/connection_string',
+ '$BUILD_DIR/mongo/s/async_requests_sender',
'$BUILD_DIR/mongo/s/client/sharding_client',
'$BUILD_DIR/mongo/s/coreshard',
],
diff --git a/src/mongo/s/write_ops/batch_downconvert.cpp b/src/mongo/s/write_ops/batch_downconvert.cpp
index 2d792351b60..325e1f90b92 100644
--- a/src/mongo/s/write_ops/batch_downconvert.cpp
+++ b/src/mongo/s/write_ops/batch_downconvert.cpp
@@ -26,23 +26,17 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
-
#include "mongo/platform/basic.h"
#include "mongo/s/write_ops/batch_downconvert.h"
#include "mongo/bson/util/builder.h"
#include "mongo/db/write_concern_options.h"
-#include "mongo/s/client/multi_command_dispatch.h"
#include "mongo/util/assert_util.h"
-#include "mongo/util/log.h"
namespace mongo {
-using std::endl;
using std::string;
-using std::vector;
Status extractGLEErrors(const BSONObj& gleResponse, GLEErrors* errors) {
// DRAGONS
@@ -184,110 +178,4 @@ BSONObj stripNonWCInfo(const BSONObj& gleResponse) {
return builder.obj();
}
-// Adds a wOpTime and a wElectionId field to a set of gle options
-static BSONObj buildGLECmdWithOpTime(const BSONObj& gleOptions,
- const repl::OpTime& opTime,
- const OID& electionId) {
- BSONObjBuilder builder;
- BSONObjIterator it(gleOptions);
-
- for (int i = 0; it.more(); ++i) {
- BSONElement el = it.next();
-
- // Make sure first element is getLastError : 1
- if (i == 0) {
- StringData elName(el.fieldName());
- if (!elName.equalCaseInsensitive("getLastError")) {
- builder.append("getLastError", 1);
- }
- }
-
- builder.append(el);
- }
- opTime.append(&builder, "wOpTime");
- builder.appendOID("wElectionId", const_cast<OID*>(&electionId));
- return builder.obj();
-}
-
-Status enforceLegacyWriteConcern(MultiCommandDispatch* dispatcher,
- StringData dbName,
- const BSONObj& options,
- const HostOpTimeMap& hostOpTimes,
- vector<LegacyWCResponse>* legacyWCResponses) {
- if (hostOpTimes.empty()) {
- return Status::OK();
- }
-
- for (HostOpTimeMap::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end(); ++it) {
- const ConnectionString& shardEndpoint = it->first;
- const HostOpTime hot = it->second;
- const repl::OpTime& opTime = hot.opTime;
- const OID& electionId = hot.electionId;
-
- LOG(3) << "enforcing write concern " << options << " on " << shardEndpoint.toString()
- << " at opTime " << opTime.getTimestamp().toStringPretty() << " with electionID "
- << electionId;
-
- BSONObj gleCmd = buildGLECmdWithOpTime(options, opTime, electionId);
- dispatcher->addCommand(shardEndpoint, dbName, gleCmd);
- }
-
- dispatcher->sendAll();
-
- vector<Status> failedStatuses;
-
- while (dispatcher->numPending() > 0) {
- ConnectionString shardEndpoint;
- RawBSONSerializable gleResponseSerial;
-
- Status dispatchStatus = dispatcher->recvAny(&shardEndpoint, &gleResponseSerial);
- if (!dispatchStatus.isOK()) {
- // We need to get all responses before returning
- failedStatuses.push_back(dispatchStatus);
- continue;
- }
-
- BSONObj gleResponse = stripNonWCInfo(gleResponseSerial.toBSON());
-
- // Use the downconversion tools to determine if this GLE response is ok, a
- // write concern error, or an unknown error we should immediately abort for.
- GLEErrors errors;
- Status extractStatus = extractGLEErrors(gleResponse, &errors);
- if (!extractStatus.isOK()) {
- failedStatuses.push_back(extractStatus);
- continue;
- }
-
- LegacyWCResponse wcResponse;
- wcResponse.shardHost = shardEndpoint.toString();
- wcResponse.gleResponse = gleResponse;
- if (errors.wcError.get()) {
- wcResponse.errToReport = errors.wcError->getErrMessage();
- }
-
- legacyWCResponses->push_back(wcResponse);
- }
-
- if (failedStatuses.empty()) {
- return Status::OK();
- }
-
- StringBuilder builder;
- builder << "could not enforce write concern";
-
- for (vector<Status>::const_iterator it = failedStatuses.begin(); it != failedStatuses.end();
- ++it) {
- const Status& failedStatus = *it;
- if (it == failedStatuses.begin()) {
- builder << causedBy(failedStatus.toString());
- } else {
- builder << ":: and ::" << failedStatus.toString();
- }
- }
-
- return Status(failedStatuses.size() == 1u ? failedStatuses.front().code()
- : ErrorCodes::MultipleErrorsOccurred,
- builder.str());
-}
-
} // namespace mongo
diff --git a/src/mongo/s/write_ops/batch_downconvert.h b/src/mongo/s/write_ops/batch_downconvert.h
index 76f87d93db1..dd33b8c1924 100644
--- a/src/mongo/s/write_ops/batch_downconvert.h
+++ b/src/mongo/s/write_ops/batch_downconvert.h
@@ -29,7 +29,6 @@
#pragma once
#include <string>
-#include <vector>
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
@@ -41,8 +40,6 @@
namespace mongo {
-class MultiCommandDispatch;
-
// Used for reporting legacy write concern responses
struct LegacyWCResponse {
std::string shardHost;
@@ -50,19 +47,6 @@ struct LegacyWCResponse {
std::string errToReport;
};
-/**
- * Uses GLE and the shard hosts and opTimes last written by write commands to enforce a
- * write concern across the previously used shards.
- *
- * Returns OK with the LegacyWCResponses containing only write concern error information
- * Returns !OK if there was an error getting a GLE response
- */
-Status enforceLegacyWriteConcern(MultiCommandDispatch* dispatcher,
- StringData dbName,
- const BSONObj& options,
- const HostOpTimeMap& hostOpTimes,
- std::vector<LegacyWCResponse>* wcResponses);
-
//
// Below exposed for testing only
//