diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_oplog_fetcher.h')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_fetcher.h | 81 |
1 files changed, 39 insertions, 42 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h index aeb198dfabf..a5c06225b07 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h @@ -37,6 +37,7 @@ #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/s/resharding/donor_oplog_id_gen.h" #include "mongo/db/service_context.h" +#include "mongo/s/client/shard.h" #include "mongo/s/shard_id.h" #include "mongo/util/background.h" #include "mongo/util/uuid.h" @@ -59,61 +60,50 @@ public: * - Send an aggregation request + getMores until either: * -- The "final resharding" oplog entry is found. * -- An interruption occurs. + * -- The fetcher concludes it's fallen off the oplog. * -- A different error occurs. * - * In the first two circumstances, the task will return. In the last circumstance, the task will - * be rescheduled in a way that resumes where it had left off from. + * In the first two circumstances, the task will terminate. If the fetcher has fallen off the + * oplog, this is thrown as a fatal resharding exception. In the last circumstance, the task + * will be rescheduled in a way that resumes where it had left off from. */ - void schedule(executor::TaskExecutor* exector); + Future<void> schedule(executor::TaskExecutor* executor); /** - * Given a connection, fetches and copies oplog entries until reaching an error, or coming + * Given a shard, fetches and copies oplog entries until reaching an error, or coming * across a sentinel finish oplog entry. Throws if there's more oplog entries to be copied. */ - void consume(DBClientBase* conn); + void consume(Client* client, Shard* shard); - /** - * Kill the underlying client the BackgroundJob is using to expedite cleaning up resources when - * the output is no longer necessary. The underlying `toWriteInto` collection is left intact, - * though likely incomplete. - */ - void setKilled(); + bool iterate(Client* client); - /** - * Returns boost::none if the last oplog entry to be copied is found. Otherwise returns the - * ReshardingDonorOplogId to resume querying from. - * - * Issues an aggregation to `DBClientBase`s starting at `startAfter` and copies the entries - * relevant to `recipientShard` into `toWriteInto`. Control is returned when the aggregation - * cursor is exhausted. - * - * Returns an identifier for the last oplog-ish document written to `toWriteInto`. - * - * This method throws. - * - * TODO SERVER-51245 Replace `DBClientBase` with a `Shard`. Right now `Shard` does not do things - * like perform aggregate commands nor does it expose a cursor/stream interface. However, using - * a `Shard` object will provide critical behavior such as advancing logical clock values on a - * response and targetting a node to send the aggregation command to. - */ - boost::optional<ReshardingDonorOplogId> iterate(OperationContext* opCtx, - DBClientBase* conn, - boost::intrusive_ptr<ExpressionContext> expCtx, - ReshardingDonorOplogId startAfter, - UUID collUUID, - const ShardId& recipientShard, - bool doesDonorOwnMinKeyChunk, - NamespaceString toWriteInto); - - int getNumOplogEntriesCopied() { + int getNumOplogEntriesCopied() const { return _numOplogEntriesCopied; } + ReshardingDonorOplogId getLastSeenTimestamp() const { + return _startAt; + } + + void setInitialBatchSizeForTest(int size) { + _initialBatchSize = size; + } + + void useReadConcernForTest(bool use) { + _useReadConcern = use; + } + + void setMaxBatchesForTest(int maxBatches) { + _maxBatches = maxBatches; + } + private: /** * Returns true if there's more work to do and the task should be rescheduled. */ - bool _runTask(); + void _ensureCollection(Client* client, const NamespaceString nss); + std::vector<BSONObj> _makePipeline(Client* client); + void _reschedule(executor::TaskExecutor* executor); const UUID _reshardingUUID; const UUID _collUUID; @@ -123,9 +113,16 @@ private: const bool _doesDonorOwnMinKeyChunk; const NamespaceString _toWriteInto; - ServiceContext::UniqueClient _client; - AtomicWord<bool> _isAlive{true}; - + Promise<void> _fetchedFinishPromise; int _numOplogEntriesCopied = 0; + + // For testing to control behavior. + + // The aggregation batch size. This only affects the original call and not `getmore`s. + int _initialBatchSize = 0; + // Setting to false will omit the `afterClusterTime` and `majority` read concern. + bool _useReadConcern = true; + // Dictates how many batches get processed before returning control from a call to `consume`. + int _maxBatches = -1; }; } // namespace mongo |