diff options
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 20 | ||||
-rw-r--r-- | src/mongo/s/async_requests_sender.h | 12 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_get_last_error_cmd.cpp | 169 | ||||
-rw-r--r-- | src/mongo/s/write_ops/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_downconvert.cpp | 112 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_downconvert.h | 16 |
6 files changed, 160 insertions, 170 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index a3163050208..3c3cd194f78 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -53,7 +53,7 @@ const int kMaxNumFailedHostRetryAttempts = 3; AsyncRequestsSender::AsyncRequestsSender(OperationContext* txn, executor::TaskExecutor* executor, - std::string db, + StringData db, const std::vector<AsyncRequestsSender::Request>& requests, const ReadPreferenceSetting& readPreference, bool allowPartialResults) @@ -82,6 +82,8 @@ AsyncRequestsSender::~AsyncRequestsSender() { std::vector<AsyncRequestsSender::Response> AsyncRequestsSender::waitForResponses( OperationContext* txn) { + invariant(!_remotes.empty()); + // Until all remotes have received a response or error, keep scheduling retries and waiting on // outstanding requests. while (!_done()) { @@ -98,11 +100,16 @@ std::vector<AsyncRequestsSender::Response> AsyncRequestsSender::waitForResponses invariant(remote.swResponse); if (remote.swResponse->isOK()) { invariant(remote.shardHostAndPort); - responses.emplace_back(remote.swResponse->getValue(), *remote.shardHostAndPort); + responses.emplace_back(std::move(remote.swResponse->getValue()), + std::move(*remote.shardHostAndPort)); } else { - responses.emplace_back(remote.swResponse->getStatus()); + responses.emplace_back(std::move(remote.swResponse->getStatus()), + std::move(remote.shardHostAndPort)); } } + + _remotes.clear(); + return responses; } @@ -196,7 +203,7 @@ Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* txn, size_ } executor::RemoteCommandRequest request( - remote.getTargetHost(), _db, remote.cmdObj, _metadataObj, txn); + remote.getTargetHost(), _db.toString(), remote.cmdObj, _metadataObj, txn); auto callbackStatus = _executor->scheduleRemoteCommand( request, @@ -298,9 +305,10 @@ AsyncRequestsSender::Request::Request(ShardId shardId, BSONObj cmdObj) : shardId(shardId), cmdObj(cmdObj) {} AsyncRequestsSender::Response::Response(executor::RemoteCommandResponse response, HostAndPort hp) - : swResponse(response), shardHostAndPort(hp) {} + : swResponse(std::move(response)), shardHostAndPort(std::move(hp)) {} -AsyncRequestsSender::Response::Response(Status status) : swResponse(status) {} +AsyncRequestsSender::Response::Response(Status status, boost::optional<HostAndPort> hp) + : swResponse(std::move(status)), shardHostAndPort(std::move(hp)) {} AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj) : shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {} diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index 6bbf1f9d0f3..daf48c558aa 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -89,13 +89,13 @@ public: Response(executor::RemoteCommandResponse response, HostAndPort hp); // Constructor that specifies the reason the response was not successfully received. - Response(Status status); + Response(Status status, boost::optional<HostAndPort> hp); // The response or error from the remote. StatusWith<executor::RemoteCommandResponse> swResponse; - // The exact host on which the remote command was run. Is unset if swResponse has a non-OK - // status. + // The exact host on which the remote command was run. Is unset if the shard could not be + // found or no shard hosts matching the readPreference could be found. boost::optional<HostAndPort> shardHostAndPort; }; @@ -105,7 +105,7 @@ public: */ AsyncRequestsSender(OperationContext* txn, executor::TaskExecutor* executor, - std::string db, + StringData db, const std::vector<AsyncRequestsSender::Request>& requests, const ReadPreferenceSetting& readPreference, bool allowPartialResults = false); @@ -119,6 +119,8 @@ public: * If we were killed, returns immediately. * If we were interrupted, returns when any outstanding requests have completed. * Otherwise, returns when each remote has received a response or error. + * + * Must only be called once. */ std::vector<Response> waitForResponses(OperationContext* txn); @@ -259,7 +261,7 @@ private: BSONObj _metadataObj; // The database against which the commands are run. - const std::string _db; + const StringData _db; // The readPreference to use for all requests. ReadPreferenceSetting _readPreference; diff --git a/src/mongo/s/commands/cluster_get_last_error_cmd.cpp b/src/mongo/s/commands/cluster_get_last_error_cmd.cpp index d1f01d4ebc0..2b4e386b312 100644 --- a/src/mongo/s/commands/cluster_get_last_error_cmd.cpp +++ b/src/mongo/s/commands/cluster_get_last_error_cmd.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + #include "mongo/platform/basic.h" #include <vector> @@ -34,15 +36,149 @@ #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/lasterror.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/async_requests_sender.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_last_error_info.h" -#include "mongo/s/commands/dbclient_multi_command.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/batch_downconvert.h" +#include "mongo/util/log.h" namespace mongo { namespace { +using std::vector; + +// Adds a wOpTime and a wElectionId field to a set of gle options +BSONObj buildGLECmdWithOpTime(const BSONObj& gleOptions, + const repl::OpTime& opTime, + const OID& electionId) { + BSONObjBuilder builder; + BSONObjIterator it(gleOptions); + + for (int i = 0; it.more(); ++i) { + BSONElement el = it.next(); + + // Make sure first element is getLastError : 1 + if (i == 0) { + StringData elName(el.fieldName()); + if (!elName.equalCaseInsensitive("getLastError")) { + builder.append("getLastError", 1); + } + } + + builder.append(el); + } + opTime.append(&builder, "wOpTime"); + builder.appendOID("wElectionId", const_cast<OID*>(&electionId)); + return builder.obj(); +} + +/** + * Uses GLE and the shard hosts and opTimes last written by write commands to enforce a + * write concern across the previously used shards. + * + * Returns OK with the LegacyWCResponses containing only write concern error information + * Returns !OK if there was an error getting a GLE response + */ +Status enforceLegacyWriteConcern(OperationContext* txn, + StringData dbName, + const BSONObj& options, + const HostOpTimeMap& hostOpTimes, + std::vector<LegacyWCResponse>* legacyWCResponses) { + if (hostOpTimes.empty()) { + return Status::OK(); + } + + // Assemble requests + std::vector<AsyncRequestsSender::Request> requests; + for (HostOpTimeMap::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end(); ++it) { + const ConnectionString& shardConnStr = it->first; + const auto& hot = it->second; + const repl::OpTime& opTime = hot.opTime; + const OID& electionId = hot.electionId; + + auto swShard = Grid::get(txn)->shardRegistry()->getShard(txn, shardConnStr.toString()); + if (!swShard.isOK()) { + return swShard.getStatus(); + } + + LOG(3) << "enforcing write concern " << options << " on " << shardConnStr.toString() + << " at opTime " << opTime.getTimestamp().toStringPretty() << " with electionID " + << electionId; + + BSONObj gleCmd = buildGLECmdWithOpTime(options, opTime, electionId); + requests.emplace_back(swShard.getValue()->getId(), gleCmd); + } + + // Send the requests and wait to receive all the responses. + + const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet()); + AsyncRequestsSender ars( + txn, Grid::get(txn)->getExecutorPool()->getArbitraryExecutor(), dbName, requests, readPref); + auto responses = ars.waitForResponses(txn); + + // Parse the responses. + + vector<Status> failedStatuses; + for (const auto& response : responses) { + // Return immediately if we failed to contact a shard. + if (!response.shardHostAndPort) { + invariant(!response.swResponse.isOK()); + return response.swResponse.getStatus(); + } + + // We successfully contacted the shard, but it returned some error. + if (!response.swResponse.isOK()) { + failedStatuses.push_back(std::move(response.swResponse.getStatus())); + continue; + } + + BSONObj gleResponse = stripNonWCInfo(response.swResponse.getValue().data); + + // Use the downconversion tools to determine if this GLE response is ok, a + // write concern error, or an unknown error we should immediately abort for. + GLEErrors errors; + Status extractStatus = extractGLEErrors(gleResponse, &errors); + if (!extractStatus.isOK()) { + failedStatuses.push_back(extractStatus); + continue; + } + + LegacyWCResponse wcResponse; + invariant(response.shardHostAndPort); + wcResponse.shardHost = response.shardHostAndPort->toString(); + wcResponse.gleResponse = gleResponse; + if (errors.wcError.get()) { + wcResponse.errToReport = errors.wcError->getErrMessage(); + } + + legacyWCResponses->push_back(wcResponse); + } + + if (failedStatuses.empty()) { + return Status::OK(); + } + + StringBuilder builder; + builder << "could not enforce write concern"; + + for (vector<Status>::const_iterator it = failedStatuses.begin(); it != failedStatuses.end(); + ++it) { + const Status& failedStatus = *it; + if (it == failedStatuses.begin()) { + builder << causedBy(failedStatus.toString()); + } else { + builder << ":: and ::" << failedStatus.toString(); + } + } + + return Status(failedStatuses.size() == 1u ? failedStatuses.front().code() + : ErrorCodes::MultipleErrorsOccurred, + builder.str()); +} + + class GetLastErrorCmd : public Command { public: GetLastErrorCmd() : Command("getLastError", false, "getlasterror") {} @@ -101,38 +237,9 @@ public: // For compatibility with 2.4 sharded GLE, we always enforce the write concern // across all shards. const HostOpTimeMap hostOpTimes(ClusterLastErrorInfo::get(cc()).getPrevHostOpTimes()); - HostOpTimeMap resolvedHostOpTimes; - - Status status(Status::OK()); - for (HostOpTimeMap::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end(); - ++it) { - const ConnectionString& shardEndpoint = it->first; - const HostOpTime& hot = it->second; - - const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet()); - auto shardStatus = grid.shardRegistry()->getShard(txn, shardEndpoint.toString()); - if (!shardStatus.isOK()) { - status = shardStatus.getStatus(); - break; - } - auto shard = shardStatus.getValue(); - auto swHostAndPort = shard->getTargeter()->findHostNoWait(readPref); - if (!swHostAndPort.isOK()) { - status = swHostAndPort.getStatus(); - break; - } - - ConnectionString resolvedHost(swHostAndPort.getValue()); - resolvedHostOpTimes[resolvedHost] = hot; - } - - DBClientMultiCommand dispatcher; std::vector<LegacyWCResponse> wcResponses; - if (status.isOK()) { - status = enforceLegacyWriteConcern( - &dispatcher, dbname, cmdObj, resolvedHostOpTimes, &wcResponses); - } + auto status = enforceLegacyWriteConcern(txn, dbname, cmdObj, hostOpTimes, &wcResponses); // Don't forget about our last hosts, reset the client info ClusterLastErrorInfo::get(cc()).disableForCommand(); diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript index d424182714b..b9f701f7b77 100644 --- a/src/mongo/s/write_ops/SConscript +++ b/src/mongo/s/write_ops/SConscript @@ -35,6 +35,7 @@ env.Library( LIBDEPS=[
'batch_write_types',
'$BUILD_DIR/mongo/client/connection_string',
+ '$BUILD_DIR/mongo/s/async_requests_sender',
'$BUILD_DIR/mongo/s/client/sharding_client',
'$BUILD_DIR/mongo/s/coreshard',
],
diff --git a/src/mongo/s/write_ops/batch_downconvert.cpp b/src/mongo/s/write_ops/batch_downconvert.cpp index 2d792351b60..325e1f90b92 100644 --- a/src/mongo/s/write_ops/batch_downconvert.cpp +++ b/src/mongo/s/write_ops/batch_downconvert.cpp @@ -26,23 +26,17 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - #include "mongo/platform/basic.h" #include "mongo/s/write_ops/batch_downconvert.h" #include "mongo/bson/util/builder.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/client/multi_command_dispatch.h" #include "mongo/util/assert_util.h" -#include "mongo/util/log.h" namespace mongo { -using std::endl; using std::string; -using std::vector; Status extractGLEErrors(const BSONObj& gleResponse, GLEErrors* errors) { // DRAGONS @@ -184,110 +178,4 @@ BSONObj stripNonWCInfo(const BSONObj& gleResponse) { return builder.obj(); } -// Adds a wOpTime and a wElectionId field to a set of gle options -static BSONObj buildGLECmdWithOpTime(const BSONObj& gleOptions, - const repl::OpTime& opTime, - const OID& electionId) { - BSONObjBuilder builder; - BSONObjIterator it(gleOptions); - - for (int i = 0; it.more(); ++i) { - BSONElement el = it.next(); - - // Make sure first element is getLastError : 1 - if (i == 0) { - StringData elName(el.fieldName()); - if (!elName.equalCaseInsensitive("getLastError")) { - builder.append("getLastError", 1); - } - } - - builder.append(el); - } - opTime.append(&builder, "wOpTime"); - builder.appendOID("wElectionId", const_cast<OID*>(&electionId)); - return builder.obj(); -} - -Status enforceLegacyWriteConcern(MultiCommandDispatch* dispatcher, - StringData dbName, - const BSONObj& options, - const HostOpTimeMap& hostOpTimes, - vector<LegacyWCResponse>* legacyWCResponses) { - if (hostOpTimes.empty()) { - return Status::OK(); - } - - for (HostOpTimeMap::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end(); ++it) { - const ConnectionString& shardEndpoint = it->first; - const HostOpTime hot = it->second; - const repl::OpTime& opTime = hot.opTime; - const OID& electionId = hot.electionId; - - LOG(3) << "enforcing write concern " << options << " on " << shardEndpoint.toString() - << " at opTime " << opTime.getTimestamp().toStringPretty() << " with electionID " - << electionId; - - BSONObj gleCmd = buildGLECmdWithOpTime(options, opTime, electionId); - dispatcher->addCommand(shardEndpoint, dbName, gleCmd); - } - - dispatcher->sendAll(); - - vector<Status> failedStatuses; - - while (dispatcher->numPending() > 0) { - ConnectionString shardEndpoint; - RawBSONSerializable gleResponseSerial; - - Status dispatchStatus = dispatcher->recvAny(&shardEndpoint, &gleResponseSerial); - if (!dispatchStatus.isOK()) { - // We need to get all responses before returning - failedStatuses.push_back(dispatchStatus); - continue; - } - - BSONObj gleResponse = stripNonWCInfo(gleResponseSerial.toBSON()); - - // Use the downconversion tools to determine if this GLE response is ok, a - // write concern error, or an unknown error we should immediately abort for. - GLEErrors errors; - Status extractStatus = extractGLEErrors(gleResponse, &errors); - if (!extractStatus.isOK()) { - failedStatuses.push_back(extractStatus); - continue; - } - - LegacyWCResponse wcResponse; - wcResponse.shardHost = shardEndpoint.toString(); - wcResponse.gleResponse = gleResponse; - if (errors.wcError.get()) { - wcResponse.errToReport = errors.wcError->getErrMessage(); - } - - legacyWCResponses->push_back(wcResponse); - } - - if (failedStatuses.empty()) { - return Status::OK(); - } - - StringBuilder builder; - builder << "could not enforce write concern"; - - for (vector<Status>::const_iterator it = failedStatuses.begin(); it != failedStatuses.end(); - ++it) { - const Status& failedStatus = *it; - if (it == failedStatuses.begin()) { - builder << causedBy(failedStatus.toString()); - } else { - builder << ":: and ::" << failedStatus.toString(); - } - } - - return Status(failedStatuses.size() == 1u ? failedStatuses.front().code() - : ErrorCodes::MultipleErrorsOccurred, - builder.str()); -} - } // namespace mongo diff --git a/src/mongo/s/write_ops/batch_downconvert.h b/src/mongo/s/write_ops/batch_downconvert.h index 76f87d93db1..dd33b8c1924 100644 --- a/src/mongo/s/write_ops/batch_downconvert.h +++ b/src/mongo/s/write_ops/batch_downconvert.h @@ -29,7 +29,6 @@ #pragma once #include <string> -#include <vector> #include "mongo/base/string_data.h" #include "mongo/bson/bsonobj.h" @@ -41,8 +40,6 @@ namespace mongo { -class MultiCommandDispatch; - // Used for reporting legacy write concern responses struct LegacyWCResponse { std::string shardHost; @@ -50,19 +47,6 @@ struct LegacyWCResponse { std::string errToReport; }; -/** - * Uses GLE and the shard hosts and opTimes last written by write commands to enforce a - * write concern across the previously used shards. - * - * Returns OK with the LegacyWCResponses containing only write concern error information - * Returns !OK if there was an error getting a GLE response - */ -Status enforceLegacyWriteConcern(MultiCommandDispatch* dispatcher, - StringData dbName, - const BSONObj& options, - const HostOpTimeMap& hostOpTimes, - std::vector<LegacyWCResponse>* wcResponses); - // // Below exposed for testing only // |