summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-03-02 12:24:53 -0500
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-03-02 13:41:51 -0500
commit83c520fd457589b3cc28f063cb7dd7c9fdc56f29 (patch)
tree45dff5023fa720e51761438074b05ca65105217c /src/mongo/s
parentcb2fdf468435d7a5c7582069d4026f1d4e935755 (diff)
downloadmongo-83c520fd457589b3cc28f063cb7dd7c9fdc56f29.tar.gz
SERVER-28161 make ClusterWrite::explain path use ARS instead of DBClientMultiCommand
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/commands/SConscript1
-rw-r--r--src/mongo/s/commands/cluster_explain.cpp13
-rw-r--r--src/mongo/s/commands/cluster_explain.h13
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp70
4 files changed, 60 insertions, 37 deletions
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 7cb7a7147e3..a5e3387fe63 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -89,6 +89,7 @@ env.Library(
'$BUILD_DIR/mongo/db/pipeline/aggregation',
'$BUILD_DIR/mongo/db/views/views',
'$BUILD_DIR/mongo/rpc/client_metadata',
+ '$BUILD_DIR/mongo/s/async_requests_sender',
'$BUILD_DIR/mongo/s/client/parallel',
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/s/query/cluster_query',
diff --git a/src/mongo/s/commands/cluster_explain.cpp b/src/mongo/s/commands/cluster_explain.cpp
index 96b78d23f0a..a4268b26f6c 100644
--- a/src/mongo/s/commands/cluster_explain.cpp
+++ b/src/mongo/s/commands/cluster_explain.cpp
@@ -100,6 +100,19 @@ bool appendElementsIfRoom(BSONObjBuilder* bob, const BSONObj& toAppend) {
} // namespace
// static
+void ClusterExplain::wrapAsExplainForOP_COMMAND(const BSONObj& cmdObj,
+ ExplainCommon::Verbosity verbosity,
+ BSONObjBuilder* explainBuilder) {
+ explainBuilder->append("explain", cmdObj);
+ explainBuilder->append("verbosity", ExplainCommon::verbosityString(verbosity));
+
+ // Propagate readConcern
+ if (auto readConcern = cmdObj["readConcern"]) {
+ explainBuilder->append(readConcern);
+ }
+}
+
+// static
void ClusterExplain::wrapAsExplain(const BSONObj& cmdObj,
ExplainCommon::Verbosity verbosity,
const rpc::ServerSelectionMetadata& serverSelectionMetadata,
diff --git a/src/mongo/s/commands/cluster_explain.h b/src/mongo/s/commands/cluster_explain.h
index e353407a140..600f176337f 100644
--- a/src/mongo/s/commands/cluster_explain.h
+++ b/src/mongo/s/commands/cluster_explain.h
@@ -49,6 +49,19 @@ class ServerSelectionMetadata;
class ClusterExplain {
public:
/**
+ * Given the BSON specification for a command, 'cmdObj', wraps the object in order to produce
+ * the BSON for an explain of that command, at the given verbosity level 'verbosity.'
+ *
+ * Adds the result to the BSONObjBuidler 'out'.
+ *
+ * Unlike wrapAsExplain, does not downconvert the command to OP_QUERY. Should be used for paths
+ * that send the command over the NetworkInterfaceASIO rather than DBClient.
+ */
+ static void wrapAsExplainForOP_COMMAND(const BSONObj& cmdObj,
+ ExplainCommon::Verbosity verbosity,
+ BSONObjBuilder* explainBuilder);
+
+ /**
* Given the BSON specification for a command, 'cmdObj', wraps the object in order to
* produce the BSON for an explain of that command, at the given verbosity level
* 'verbosity' and according to the metadata in 'serverSelectionMetadata'.
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index c25c15b5545..270510163d5 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -31,19 +31,17 @@
#include "mongo/base/error_codes.h"
#include "mongo/base/owned_pointer_vector.h"
#include "mongo/client/remote_command_targeter.h"
-#include "mongo/db/client.h"
-#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/write_commands/write_commands_common.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/stats/counters.h"
-#include "mongo/db/stats/counters.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/async_requests_sender.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_last_error_info.h"
#include "mongo/s/commands/chunk_manager_targeter.h"
#include "mongo/s/commands/cluster_explain.h"
#include "mongo/s/commands/cluster_write.h"
-#include "mongo/s/commands/dbclient_multi_command.h"
#include "mongo/s/grid.h"
#include "mongo/s/write_ops/batch_upconvert.h"
#include "mongo/s/write_ops/batched_command_request.h"
@@ -113,9 +111,7 @@ public:
}
BSONObjBuilder explainCmdBob;
- int options = 0;
- ClusterExplain::wrapAsExplain(
- cmdObj, verbosity, serverSelectionMetadata, &explainCmdBob, &options);
+ ClusterExplain::wrapAsExplainForOP_COMMAND(cmdObj, verbosity, &explainCmdBob);
// We will time how long it takes to run the commands on the shards.
Timer timer;
@@ -258,53 +254,53 @@ private:
return status;
}
- DBClientMultiCommand dispatcher;
+ auto shardRegistry = Grid::get(txn)->shardRegistry();
// Assemble requests
+ std::vector<AsyncRequestsSender::Request> requests;
for (vector<ShardEndpoint*>::const_iterator it = endpoints.begin(); it != endpoints.end();
++it) {
const ShardEndpoint* endpoint = *it;
- const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
- auto shardStatus = grid.shardRegistry()->getShard(txn, endpoint->shardName);
+ auto shardStatus = shardRegistry->getShard(txn, endpoint->shardName);
if (!shardStatus.isOK()) {
return shardStatus.getStatus();
}
- auto swHostAndPort = shardStatus.getValue()->getTargeter()->findHostNoWait(readPref);
- if (!swHostAndPort.isOK()) {
- return swHostAndPort.getStatus();
- }
-
- ConnectionString host(swHostAndPort.getValue());
- dispatcher.addCommand(host, dbName, command);
+ requests.emplace_back(shardStatus.getValue()->getId(), command);
}
- // Errors reported when recv'ing responses
- dispatcher.sendAll();
- Status dispatchStatus = Status::OK();
+ // Send the requests and wait to receive all the responses.
+
+ const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
+ AsyncRequestsSender ars(txn,
+ Grid::get(txn)->getExecutorPool()->getArbitraryExecutor(),
+ dbName,
+ requests,
+ readPref);
+ auto responses = ars.waitForResponses(txn);
- // Recv responses
- while (dispatcher.numPending() > 0) {
- ConnectionString host;
- RawBSONSerializable response;
+ // Parse the responses.
- Status status = dispatcher.recvAny(&host, &response);
- if (!status.isOK()) {
- // We always need to recv() all the sent operations
- dispatchStatus = status;
- continue;
+ Status dispatchStatus = Status::OK();
+ for (const auto& response : responses) {
+ if (!response.swResponse.isOK()) {
+ dispatchStatus = std::move(response.swResponse.getStatus());
+ break;
}
Strategy::CommandResult result;
- result.target = host;
- {
- auto shardStatus = grid.shardRegistry()->getShard(txn, host.toString());
- if (!shardStatus.isOK()) {
- return shardStatus.getStatus();
- }
- result.shardTargetId = shardStatus.getValue()->getId();
+
+ // If the response status was OK, the response must contain which host was targeted.
+ invariant(response.shardHostAndPort);
+ result.target = ConnectionString(std::move(*response.shardHostAndPort));
+
+ auto shardStatus = shardRegistry->getShard(txn, result.target.toString());
+ if (!shardStatus.isOK()) {
+ return shardStatus.getStatus();
}
- result.result = response.toBSON();
+ result.shardTargetId = shardStatus.getValue()->getId();
+
+ result.result = std::move(response.swResponse.getValue().data);
results->push_back(result);
}