summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2020-11-25 10:42:39 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-25 16:52:46 +0000
commit2d295d7b205dcac66d434929553610dd48ac0815 (patch)
tree33c57ad902b32f64fb6b471e7787a972e27917cf /src/mongo/s
parenta6b7feddd72d9a9aa4daf5d1198ab051ca4afd40 (diff)
downloadmongo-2d295d7b205dcac66d434929553610dd48ac0815.tar.gz
SERVER-51245: Have resharding oplog fetching use a Fetcher.
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/client/shard.h13
-rw-r--r--src/mongo/s/client/shard_remote.cpp87
-rw-r--r--src/mongo/s/client/shard_remote.h4
3 files changed, 103 insertions, 1 deletions
diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h
index 79f1a2b669c..82e45ef5e55 100644
--- a/src/mongo/s/client/shard.h
+++ b/src/mongo/s/client/shard.h
@@ -36,6 +36,7 @@
#include "mongo/client/read_preference.h"
#include "mongo/db/logical_time.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/executor/remote_command_response.h"
@@ -207,6 +208,18 @@ public:
Milliseconds maxTimeMSOverride);
/**
+ * Synchronously run the aggregation request, with a best effort honoring of request
+ * options. `callback` will be called with the batch contained in each response. `callback`
+ * should return `true` to execute another getmore. Returning `false` will send a
+ * `killCursors`. If the aggregation results are exhausted, there will be no additional calls to
+ * `callback`.
+ */
+ virtual Status runAggregation(
+ OperationContext* opCtx,
+ const AggregationRequest& aggRequest,
+ std::function<bool(const std::vector<BSONObj>& batch)> callback) = 0;
+
+ /**
* Runs a write command against a shard. This is separate from runCommand, because write
* commands return errors in a different format than regular commands do, so checking for
* retriable errors must be done differently.
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 5254c24d37d..30ceec26868 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -281,7 +281,7 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand(
const auto& data = dataStatus.getValue();
if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) {
- // Sharding users of ReplSetMetadata do not require the wall clock time field to be set
+ // Sharding users of ReplSetMetadata do not require the wall clock time field to be set.
auto replParseStatus =
rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata);
if (!replParseStatus.isOK()) {
@@ -411,6 +411,91 @@ void ShardRemote::runFireAndForgetCommand(OperationContext* opCtx,
.ignore();
}
+Status ShardRemote::runAggregation(
+ OperationContext* opCtx,
+ const AggregationRequest& aggRequest,
+ std::function<bool(const std::vector<BSONObj>& batch)> callback) {
+
+ BSONObj readPrefMetadata;
+
+ ReadPreferenceSetting readPreference =
+ uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(
+ aggRequest.getUnwrappedReadPref(), ReadPreference::SecondaryPreferred));
+
+ auto swHost = _targeter->findHost(opCtx, readPreference);
+ if (!swHost.isOK()) {
+ return swHost.getStatus();
+ }
+ HostAndPort host = swHost.getValue();
+
+ BSONObjBuilder builder;
+ readPreference.toContainingBSON(&builder);
+ readPrefMetadata = builder.obj();
+
+ Status status =
+ Status(ErrorCodes::InternalError, "Internal error running cursor callback in command");
+ auto fetcherCallback = [&status, callback](const Fetcher::QueryResponseStatus& dataStatus,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ // Throw out any accumulated results on error
+ if (!dataStatus.isOK()) {
+ status = dataStatus.getStatus();
+ return;
+ }
+
+ const auto& data = dataStatus.getValue();
+
+ if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) {
+ // Sharding users of ReplSetMetadata do not require the wall clock time field to be set.
+ auto replParseStatus =
+ rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata);
+ if (!replParseStatus.isOK()) {
+ status = replParseStatus.getStatus();
+ return;
+ }
+ }
+
+ if (!callback(data.documents)) {
+ *nextAction = Fetcher::NextAction::kNoAction;
+ }
+
+ status = Status::OK();
+
+ if (!getMoreBob) {
+ return;
+ }
+ getMoreBob->append("getMore", data.cursorId);
+ getMoreBob->append("collection", data.nss.coll());
+ };
+
+ Milliseconds requestTimeout(-1);
+ if (aggRequest.getMaxTimeMS()) {
+ requestTimeout = Milliseconds(aggRequest.getMaxTimeMS());
+ }
+
+ auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ Fetcher fetcher(executor.get(),
+ host,
+ aggRequest.getNamespaceString().db().toString(),
+ aggRequest.serializeToCommandObj().toBson(),
+ fetcherCallback,
+ readPrefMetadata,
+ requestTimeout, /* command network timeout */
+ requestTimeout /* getMore network timeout */);
+
+ Status scheduleStatus = fetcher.schedule();
+ if (!scheduleStatus.isOK()) {
+ return scheduleStatus;
+ }
+
+ fetcher.join();
+
+ updateReplSetMonitor(host, status);
+
+ return status;
+}
+
+
StatusWith<ShardRemote::AsyncCmdHandle> ShardRemote::_scheduleCommand(
OperationContext* opCtx,
const ReadPreferenceSetting& readPref,
diff --git a/src/mongo/s/client/shard_remote.h b/src/mongo/s/client/shard_remote.h
index eb0c547c97f..e7ca9004112 100644
--- a/src/mongo/s/client/shard_remote.h
+++ b/src/mongo/s/client/shard_remote.h
@@ -85,6 +85,10 @@ public:
const std::string& dbName,
const BSONObj& cmdObj) final;
+ Status runAggregation(OperationContext* opCtx,
+ const AggregationRequest& aggRequest,
+ std::function<bool(const std::vector<BSONObj>& batch)> callback);
+
private:
struct AsyncCmdHandle {
HostAndPort hostTargetted;