diff options
author | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2020-11-25 10:42:39 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-25 16:52:46 +0000 |
commit | 2d295d7b205dcac66d434929553610dd48ac0815 (patch) | |
tree | 33c57ad902b32f64fb6b471e7787a972e27917cf /src/mongo/s/client | |
parent | a6b7feddd72d9a9aa4daf5d1198ab051ca4afd40 (diff) | |
download | mongo-2d295d7b205dcac66d434929553610dd48ac0815.tar.gz |
SERVER-51245: Have resharding oplog fetching use a Fetcher.
Diffstat (limited to 'src/mongo/s/client')
-rw-r--r-- | src/mongo/s/client/shard.h | 13 | ||||
-rw-r--r-- | src/mongo/s/client/shard_remote.cpp | 87 | ||||
-rw-r--r-- | src/mongo/s/client/shard_remote.h | 4 |
3 files changed, 103 insertions, 1 deletions
diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h index 79f1a2b669c..82e45ef5e55 100644 --- a/src/mongo/s/client/shard.h +++ b/src/mongo/s/client/shard.h @@ -36,6 +36,7 @@ #include "mongo/client/read_preference.h" #include "mongo/db/logical_time.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/executor/remote_command_response.h" @@ -207,6 +208,18 @@ public: Milliseconds maxTimeMSOverride); /** + * Synchronously run the aggregation request, with a best effort honoring of request + * options. `callback` will be called with the batch contained in each response. `callback` + * should return `true` to execute another getmore. Returning `false` will send a + * `killCursors`. If the aggregation results are exhausted, there will be no additional calls to + * `callback`. + */ + virtual Status runAggregation( + OperationContext* opCtx, + const AggregationRequest& aggRequest, + std::function<bool(const std::vector<BSONObj>& batch)> callback) = 0; + + /** * Runs a write command against a shard. This is separate from runCommand, because write * commands return errors in a different format than regular commands do, so checking for * retriable errors must be done differently. diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 5254c24d37d..30ceec26868 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -281,7 +281,7 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand( const auto& data = dataStatus.getValue(); if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) { - // Sharding users of ReplSetMetadata do not require the wall clock time field to be set + // Sharding users of ReplSetMetadata do not require the wall clock time field to be set. auto replParseStatus = rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata); if (!replParseStatus.isOK()) { @@ -411,6 +411,91 @@ void ShardRemote::runFireAndForgetCommand(OperationContext* opCtx, .ignore(); } +Status ShardRemote::runAggregation( + OperationContext* opCtx, + const AggregationRequest& aggRequest, + std::function<bool(const std::vector<BSONObj>& batch)> callback) { + + BSONObj readPrefMetadata; + + ReadPreferenceSetting readPreference = + uassertStatusOK(ReadPreferenceSetting::fromContainingBSON( + aggRequest.getUnwrappedReadPref(), ReadPreference::SecondaryPreferred)); + + auto swHost = _targeter->findHost(opCtx, readPreference); + if (!swHost.isOK()) { + return swHost.getStatus(); + } + HostAndPort host = swHost.getValue(); + + BSONObjBuilder builder; + readPreference.toContainingBSON(&builder); + readPrefMetadata = builder.obj(); + + Status status = + Status(ErrorCodes::InternalError, "Internal error running cursor callback in command"); + auto fetcherCallback = [&status, callback](const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + // Throw out any accumulated results on error + if (!dataStatus.isOK()) { + status = dataStatus.getStatus(); + return; + } + + const auto& data = dataStatus.getValue(); + + if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) { + // Sharding users of ReplSetMetadata do not require the wall clock time field to be set. + auto replParseStatus = + rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata); + if (!replParseStatus.isOK()) { + status = replParseStatus.getStatus(); + return; + } + } + + if (!callback(data.documents)) { + *nextAction = Fetcher::NextAction::kNoAction; + } + + status = Status::OK(); + + if (!getMoreBob) { + return; + } + getMoreBob->append("getMore", data.cursorId); + getMoreBob->append("collection", data.nss.coll()); + }; + + Milliseconds requestTimeout(-1); + if (aggRequest.getMaxTimeMS()) { + requestTimeout = Milliseconds(aggRequest.getMaxTimeMS()); + } + + auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + Fetcher fetcher(executor.get(), + host, + aggRequest.getNamespaceString().db().toString(), + aggRequest.serializeToCommandObj().toBson(), + fetcherCallback, + readPrefMetadata, + requestTimeout, /* command network timeout */ + requestTimeout /* getMore network timeout */); + + Status scheduleStatus = fetcher.schedule(); + if (!scheduleStatus.isOK()) { + return scheduleStatus; + } + + fetcher.join(); + + updateReplSetMonitor(host, status); + + return status; +} + + StatusWith<ShardRemote::AsyncCmdHandle> ShardRemote::_scheduleCommand( OperationContext* opCtx, const ReadPreferenceSetting& readPref, diff --git a/src/mongo/s/client/shard_remote.h b/src/mongo/s/client/shard_remote.h index eb0c547c97f..e7ca9004112 100644 --- a/src/mongo/s/client/shard_remote.h +++ b/src/mongo/s/client/shard_remote.h @@ -85,6 +85,10 @@ public: const std::string& dbName, const BSONObj& cmdObj) final; + Status runAggregation(OperationContext* opCtx, + const AggregationRequest& aggRequest, + std::function<bool(const std::vector<BSONObj>& batch)> callback); + private: struct AsyncCmdHandle { HostAndPort hostTargetted; |