summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_oplog_fetcher.h')
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.h81
1 files changed, 39 insertions, 42 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
index aeb198dfabf..a5c06225b07 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
@@ -37,6 +37,7 @@
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/s/resharding/donor_oplog_id_gen.h"
#include "mongo/db/service_context.h"
+#include "mongo/s/client/shard.h"
#include "mongo/s/shard_id.h"
#include "mongo/util/background.h"
#include "mongo/util/uuid.h"
@@ -59,61 +60,50 @@ public:
* - Send an aggregation request + getMores until either:
* -- The "final resharding" oplog entry is found.
* -- An interruption occurs.
+ * -- The fetcher concludes it's fallen off the oplog.
* -- A different error occurs.
*
- * In the first two circumstances, the task will return. In the last circumstance, the task will
- * be rescheduled in a way that resumes where it had left off from.
+ * In the first two circumstances, the task will terminate. If the fetcher has fallen off the
+ * oplog, this is thrown as a fatal resharding exception. In the last circumstance, the task
+ * will be rescheduled in a way that resumes where it had left off from.
*/
- void schedule(executor::TaskExecutor* exector);
+ Future<void> schedule(executor::TaskExecutor* executor);
/**
- * Given a connection, fetches and copies oplog entries until reaching an error, or coming
+ * Given a shard, fetches and copies oplog entries until reaching an error, or coming
* across a sentinel finish oplog entry. Throws if there's more oplog entries to be copied.
*/
- void consume(DBClientBase* conn);
+ void consume(Client* client, Shard* shard);
- /**
- * Kill the underlying client the BackgroundJob is using to expedite cleaning up resources when
- * the output is no longer necessary. The underlying `toWriteInto` collection is left intact,
- * though likely incomplete.
- */
- void setKilled();
+ bool iterate(Client* client);
- /**
- * Returns boost::none if the last oplog entry to be copied is found. Otherwise returns the
- * ReshardingDonorOplogId to resume querying from.
- *
- * Issues an aggregation to `DBClientBase`s starting at `startAfter` and copies the entries
- * relevant to `recipientShard` into `toWriteInto`. Control is returned when the aggregation
- * cursor is exhausted.
- *
- * Returns an identifier for the last oplog-ish document written to `toWriteInto`.
- *
- * This method throws.
- *
- * TODO SERVER-51245 Replace `DBClientBase` with a `Shard`. Right now `Shard` does not do things
- * like perform aggregate commands nor does it expose a cursor/stream interface. However, using
- * a `Shard` object will provide critical behavior such as advancing logical clock values on a
- * response and targetting a node to send the aggregation command to.
- */
- boost::optional<ReshardingDonorOplogId> iterate(OperationContext* opCtx,
- DBClientBase* conn,
- boost::intrusive_ptr<ExpressionContext> expCtx,
- ReshardingDonorOplogId startAfter,
- UUID collUUID,
- const ShardId& recipientShard,
- bool doesDonorOwnMinKeyChunk,
- NamespaceString toWriteInto);
-
- int getNumOplogEntriesCopied() {
+ int getNumOplogEntriesCopied() const {
return _numOplogEntriesCopied;
}
+ ReshardingDonorOplogId getLastSeenTimestamp() const {
+ return _startAt;
+ }
+
+ void setInitialBatchSizeForTest(int size) {
+ _initialBatchSize = size;
+ }
+
+ void useReadConcernForTest(bool use) {
+ _useReadConcern = use;
+ }
+
+ void setMaxBatchesForTest(int maxBatches) {
+ _maxBatches = maxBatches;
+ }
+
private:
/**
* Returns true if there's more work to do and the task should be rescheduled.
*/
- bool _runTask();
+ void _ensureCollection(Client* client, const NamespaceString nss);
+ std::vector<BSONObj> _makePipeline(Client* client);
+ void _reschedule(executor::TaskExecutor* executor);
const UUID _reshardingUUID;
const UUID _collUUID;
@@ -123,9 +113,16 @@ private:
const bool _doesDonorOwnMinKeyChunk;
const NamespaceString _toWriteInto;
- ServiceContext::UniqueClient _client;
- AtomicWord<bool> _isAlive{true};
-
+ Promise<void> _fetchedFinishPromise;
int _numOplogEntriesCopied = 0;
+
+ // For testing to control behavior.
+
+ // The aggregation batch size. This only affects the original call and not `getmore`s.
+ int _initialBatchSize = 0;
+ // Setting to false will omit the `afterClusterTime` and `majority` read concern.
+ bool _useReadConcern = true;
+ // Dictates how many batches get processed before returning control from a call to `consume`.
+ int _maxBatches = -1;
};
} // namespace mongo