diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-02 12:24:53 -0500 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-02 13:41:51 -0500 |
commit | 83c520fd457589b3cc28f063cb7dd7c9fdc56f29 (patch) | |
tree | 45dff5023fa720e51761438074b05ca65105217c /src/mongo/s | |
parent | cb2fdf468435d7a5c7582069d4026f1d4e935755 (diff) | |
download | mongo-83c520fd457589b3cc28f063cb7dd7c9fdc56f29.tar.gz |
SERVER-28161 make ClusterWrite::explain path use ARS instead of DBClientMultiCommand
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/commands/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_explain.cpp | 13 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_explain.h | 13 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_write_cmd.cpp | 70 |
4 files changed, 60 insertions, 37 deletions
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 7cb7a7147e3..a5e3387fe63 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -89,6 +89,7 @@ env.Library( '$BUILD_DIR/mongo/db/pipeline/aggregation', '$BUILD_DIR/mongo/db/views/views', '$BUILD_DIR/mongo/rpc/client_metadata', + '$BUILD_DIR/mongo/s/async_requests_sender', '$BUILD_DIR/mongo/s/client/parallel', '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/query/cluster_query', diff --git a/src/mongo/s/commands/cluster_explain.cpp b/src/mongo/s/commands/cluster_explain.cpp index 96b78d23f0a..a4268b26f6c 100644 --- a/src/mongo/s/commands/cluster_explain.cpp +++ b/src/mongo/s/commands/cluster_explain.cpp @@ -100,6 +100,19 @@ bool appendElementsIfRoom(BSONObjBuilder* bob, const BSONObj& toAppend) { } // namespace // static +void ClusterExplain::wrapAsExplainForOP_COMMAND(const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + BSONObjBuilder* explainBuilder) { + explainBuilder->append("explain", cmdObj); + explainBuilder->append("verbosity", ExplainCommon::verbosityString(verbosity)); + + // Propagate readConcern + if (auto readConcern = cmdObj["readConcern"]) { + explainBuilder->append(readConcern); + } +} + +// static void ClusterExplain::wrapAsExplain(const BSONObj& cmdObj, ExplainCommon::Verbosity verbosity, const rpc::ServerSelectionMetadata& serverSelectionMetadata, diff --git a/src/mongo/s/commands/cluster_explain.h b/src/mongo/s/commands/cluster_explain.h index e353407a140..600f176337f 100644 --- a/src/mongo/s/commands/cluster_explain.h +++ b/src/mongo/s/commands/cluster_explain.h @@ -49,6 +49,19 @@ class ServerSelectionMetadata; class ClusterExplain { public: /** + * Given the BSON specification for a command, 'cmdObj', wraps the object in order to produce + * the BSON for an explain of that command, at the given verbosity level 'verbosity.' + * + * Adds the result to the BSONObjBuidler 'out'. + * + * Unlike wrapAsExplain, does not downconvert the command to OP_QUERY. Should be used for paths + * that send the command over the NetworkInterfaceASIO rather than DBClient. + */ + static void wrapAsExplainForOP_COMMAND(const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + BSONObjBuilder* explainBuilder); + + /** * Given the BSON specification for a command, 'cmdObj', wraps the object in order to * produce the BSON for an explain of that command, at the given verbosity level * 'verbosity' and according to the metadata in 'serverSelectionMetadata'. diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index c25c15b5545..270510163d5 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -31,19 +31,17 @@ #include "mongo/base/error_codes.h" #include "mongo/base/owned_pointer_vector.h" #include "mongo/client/remote_command_targeter.h" -#include "mongo/db/client.h" -#include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/commands/write_commands/write_commands_common.h" #include "mongo/db/lasterror.h" #include "mongo/db/stats/counters.h" -#include "mongo/db/stats/counters.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/async_requests_sender.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/commands/chunk_manager_targeter.h" #include "mongo/s/commands/cluster_explain.h" #include "mongo/s/commands/cluster_write.h" -#include "mongo/s/commands/dbclient_multi_command.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/batch_upconvert.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -113,9 +111,7 @@ public: } BSONObjBuilder explainCmdBob; - int options = 0; - ClusterExplain::wrapAsExplain( - cmdObj, verbosity, serverSelectionMetadata, &explainCmdBob, &options); + ClusterExplain::wrapAsExplainForOP_COMMAND(cmdObj, verbosity, &explainCmdBob); // We will time how long it takes to run the commands on the shards. Timer timer; @@ -258,53 +254,53 @@ private: return status; } - DBClientMultiCommand dispatcher; + auto shardRegistry = Grid::get(txn)->shardRegistry(); // Assemble requests + std::vector<AsyncRequestsSender::Request> requests; for (vector<ShardEndpoint*>::const_iterator it = endpoints.begin(); it != endpoints.end(); ++it) { const ShardEndpoint* endpoint = *it; - const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet()); - auto shardStatus = grid.shardRegistry()->getShard(txn, endpoint->shardName); + auto shardStatus = shardRegistry->getShard(txn, endpoint->shardName); if (!shardStatus.isOK()) { return shardStatus.getStatus(); } - auto swHostAndPort = shardStatus.getValue()->getTargeter()->findHostNoWait(readPref); - if (!swHostAndPort.isOK()) { - return swHostAndPort.getStatus(); - } - - ConnectionString host(swHostAndPort.getValue()); - dispatcher.addCommand(host, dbName, command); + requests.emplace_back(shardStatus.getValue()->getId(), command); } - // Errors reported when recv'ing responses - dispatcher.sendAll(); - Status dispatchStatus = Status::OK(); + // Send the requests and wait to receive all the responses. + + const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet()); + AsyncRequestsSender ars(txn, + Grid::get(txn)->getExecutorPool()->getArbitraryExecutor(), + dbName, + requests, + readPref); + auto responses = ars.waitForResponses(txn); - // Recv responses - while (dispatcher.numPending() > 0) { - ConnectionString host; - RawBSONSerializable response; + // Parse the responses. - Status status = dispatcher.recvAny(&host, &response); - if (!status.isOK()) { - // We always need to recv() all the sent operations - dispatchStatus = status; - continue; + Status dispatchStatus = Status::OK(); + for (const auto& response : responses) { + if (!response.swResponse.isOK()) { + dispatchStatus = std::move(response.swResponse.getStatus()); + break; } Strategy::CommandResult result; - result.target = host; - { - auto shardStatus = grid.shardRegistry()->getShard(txn, host.toString()); - if (!shardStatus.isOK()) { - return shardStatus.getStatus(); - } - result.shardTargetId = shardStatus.getValue()->getId(); + + // If the response status was OK, the response must contain which host was targeted. + invariant(response.shardHostAndPort); + result.target = ConnectionString(std::move(*response.shardHostAndPort)); + + auto shardStatus = shardRegistry->getShard(txn, result.target.toString()); + if (!shardStatus.isOK()) { + return shardStatus.getStatus(); } - result.result = response.toBSON(); + result.shardTargetId = shardStatus.getValue()->getId(); + + result.result = std::move(response.swResponse.getValue().data); results->push_back(result); } |