summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2020-11-24 20:32:54 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-25 02:22:48 +0000
commitcda3a52701fe4143b06bd981b98514e69d0a93eb (patch)
tree8053df801e88ab708b64cdcac11751ffa976cad5
parent01f08b9bb953166059c48f83909f4cc39f70d947 (diff)
downloadmongo-cda3a52701fe4143b06bd981b98514e69d0a93eb.tar.gz
SERVER-51245: Have resharding oplog fetching use a Fetcher.
-rw-r--r--src/mongo/db/auth/authz_manager_external_state_mock.cpp6
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp231
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.h81
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp417
-rw-r--r--src/mongo/db/s/shard_local.cpp6
-rw-r--r--src/mongo/db/s/shard_local.h4
-rw-r--r--src/mongo/dbtests/SConscript1
-rw-r--r--src/mongo/dbtests/resharding_tests.cpp507
-rw-r--r--src/mongo/s/client/shard.h13
-rw-r--r--src/mongo/s/client/shard_remote.cpp87
-rw-r--r--src/mongo/s/client/shard_remote.h4
-rw-r--r--src/mongo/unittest/unittest.h1
13 files changed, 698 insertions, 663 deletions
diff --git a/src/mongo/db/auth/authz_manager_external_state_mock.cpp b/src/mongo/db/auth/authz_manager_external_state_mock.cpp
index 00d479756a6..aae6de7764e 100644
--- a/src/mongo/db/auth/authz_manager_external_state_mock.cpp
+++ b/src/mongo/db/auth/authz_manager_external_state_mock.cpp
@@ -112,7 +112,11 @@ void AuthzManagerExternalStateMock::setAuthzVersion(int version) {
std::unique_ptr<AuthzSessionExternalState>
AuthzManagerExternalStateMock::makeAuthzSessionExternalState(AuthorizationManager* authzManager) {
- return std::make_unique<AuthzSessionExternalStateMock>(authzManager);
+ auto ret = std::make_unique<AuthzSessionExternalStateMock>(authzManager);
+ // Construct a `AuthzSessionExternalStateMock` structure that represents the default no-auth
+ // state of a running mongod.
+ ret->setReturnValueForShouldIgnoreAuthChecks(true);
+ return ret;
}
Status AuthzManagerExternalStateMock::findOne(OperationContext* opCtx,
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 373be39e030..010435d8cec 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -452,6 +452,7 @@ env.CppUnitTest(
'resharding/resharding_donor_oplog_iterator_test.cpp',
'resharding/resharding_donor_recipient_common_test.cpp',
'resharding/resharding_oplog_applier_test.cpp',
+ 'resharding/resharding_oplog_fetcher_test.cpp',
'resharding/resharding_recipient_service_test.cpp',
'session_catalog_migration_destination_test.cpp',
'session_catalog_migration_source_test.cpp',
@@ -474,7 +475,9 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/logical_session_cache_impl',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/pipeline/document_source_mock',
+ '$BUILD_DIR/mongo/db/pipeline/expression_context',
'$BUILD_DIR/mongo/db/query/query_request',
+ '$BUILD_DIR/mongo/db/query/query_test_service_context',
'$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
'$BUILD_DIR/mongo/db/repl/oplog_interface_local',
'$BUILD_DIR/mongo/db/repl/replmocks',
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
index 2dce0fee77b..e0bd954b8c1 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
@@ -87,88 +87,71 @@ ReshardingOplogFetcher::ReshardingOplogFetcher(UUID reshardingUUID,
_donorShard(donorShard),
_recipientShard(recipientShard),
_doesDonorOwnMinKeyChunk(doesDonorOwnMinKeyChunk),
- _toWriteInto(toWriteInto),
- _client(getGlobalServiceContext()->makeClient(
- fmt::format("OplogFetcher-{}-{}", reshardingUUID.toString(), donorShard.toString()))) {}
+ _toWriteInto(toWriteInto) {}
-void ReshardingOplogFetcher::consume(DBClientBase* conn) {
- while (true) {
- auto opCtxRaii = _client->makeOperationContext();
- opCtxRaii->checkForInterrupt();
- auto expCtx = _makeExpressionContext(opCtxRaii.get());
- boost::optional<ReshardingDonorOplogId> restartAt = iterate(opCtxRaii.get(),
- conn,
- expCtx,
- _startAt,
- _collUUID,
- _recipientShard,
- _doesDonorOwnMinKeyChunk,
- _toWriteInto);
- if (!restartAt) {
- return;
- }
- _startAt = restartAt.get();
- }
-}
+Future<void> ReshardingOplogFetcher::schedule(executor::TaskExecutor* executor) {
+ auto pf = makePromiseFuture<void>();
+ _fetchedFinishPromise = std::move(pf.promise);
+
+ _reschedule(executor);
-void ReshardingOplogFetcher::setKilled() {
- _isAlive.store(false);
- _client->setKilled();
+ return std::move(pf.future);
}
-void ReshardingOplogFetcher::schedule(executor::TaskExecutor* executor) {
+void ReshardingOplogFetcher::_reschedule(executor::TaskExecutor* executor) {
executor->schedule([this, executor](Status status) {
+ ThreadClient client(
+ fmt::format("OplogFetcher-{}-{}", _reshardingUUID.toString(), _donorShard.toString()),
+ getGlobalServiceContext());
if (!status.isOK()) {
+ LOGV2_INFO(5192101, "Resharding oplog fetcher aborting.", "reason"_attr = status);
+ _fetchedFinishPromise.setError(status);
return;
}
- if (_runTask()) {
- schedule(executor);
+ try {
+ if (iterate(client.get())) {
+ _reschedule(executor);
+ } else {
+ _fetchedFinishPromise.emplaceValue();
+ }
+ } catch (...) {
+ LOGV2_INFO(5192102, "Error.", "reason"_attr = exceptionToStatus());
+ _fetchedFinishPromise.setError(exceptionToStatus());
}
});
}
-bool ReshardingOplogFetcher::_runTask() {
- auto opCtxRaii = _client->makeOperationContext();
- opCtxRaii->checkForInterrupt();
-
- const Seconds maxStaleness(10);
- ReadPreferenceSetting readPref(ReadPreference::Nearest, maxStaleness);
- StatusWith<std::shared_ptr<Shard>> swDonor =
- Grid::get(opCtxRaii.get())->shardRegistry()->getShard(opCtxRaii.get(), _donorShard);
- if (!swDonor.isOK()) {
- LOGV2_WARNING(5127203,
- "Error finding shard in registry, retrying.",
- "error"_attr = swDonor.getStatus());
- return true;
- }
-
- StatusWith<HostAndPort> swTargettedDonor =
- swDonor.getValue()->getTargeter()->findHost(opCtxRaii.get(), readPref);
- if (!swTargettedDonor.isOK()) {
- LOGV2_WARNING(5127202,
- "Error targetting donor, retrying.",
- "error"_attr = swTargettedDonor.getStatus());
- return true;
- }
+bool ReshardingOplogFetcher::iterate(Client* client) {
+ std::shared_ptr<Shard> targetShard;
+ {
+ auto opCtxRaii = client->makeOperationContext();
+ opCtxRaii->checkForInterrupt();
- DBClientConnection donorConn;
- if (auto status = donorConn.connect(swTargettedDonor.getValue(), "ReshardingOplogFetching"_sd);
- !status.isOK()) {
- LOGV2_WARNING(5127201, "Failed connecting to donor, retrying.", "error"_attr = status);
- return true;
+ const Seconds maxStaleness(10);
+ ReadPreferenceSetting readPref(ReadPreference::Nearest, maxStaleness);
+ StatusWith<std::shared_ptr<Shard>> swDonor =
+ Grid::get(opCtxRaii.get())->shardRegistry()->getShard(opCtxRaii.get(), _donorShard);
+ if (!swDonor.isOK()) {
+ LOGV2_WARNING(5127203,
+ "Error finding shard in registry, retrying.",
+ "error"_attr = swDonor.getStatus());
+ return true;
+ }
+ targetShard = swDonor.getValue();
}
- // Reset the OpCtx so consuming can manage short-lived OpCtx lifetimes with the current client.
- opCtxRaii.reset();
try {
// Consume will throw if there's oplog entries to be copied. It only returns cleanly when
// the final oplog has been seen and copied.
- consume(&donorConn);
+ consume(client, targetShard.get());
return false;
} catch (const ExceptionForCat<ErrorCategory::Interruption>&) {
- _isAlive.store(false);
return false;
+ } catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>&) {
+ LOGV2_ERROR(
+ 5192103, "Fatal resharding error while fetching.", "error"_attr = exceptionToStatus());
+ throw;
} catch (const DBException&) {
LOGV2_WARNING(
5127200, "Error while fetching, retrying.", "error"_attr = exceptionToStatus());
@@ -176,73 +159,99 @@ bool ReshardingOplogFetcher::_runTask() {
}
}
-boost::optional<ReshardingDonorOplogId> ReshardingOplogFetcher::iterate(
- OperationContext* opCtx,
- DBClientBase* conn,
- boost::intrusive_ptr<ExpressionContext> expCtx,
- const ReshardingDonorOplogId startAfter,
- const UUID collUUID,
- const ShardId& recipientShard,
- const bool doesDonorOwnMinKeyChunk,
- const NamespaceString toWriteToNss) {
- // This method will use the input opCtx to perform writes into `toWriteToNss`.
+void ReshardingOplogFetcher::_ensureCollection(Client* client, const NamespaceString nss) {
+ auto opCtxRaii = client->makeOperationContext();
+ auto opCtx = opCtxRaii.get();
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
// Create the destination collection if necessary.
- writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", toWriteToNss.toString(), [&] {
- const CollectionPtr toWriteTo =
- CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, toWriteToNss);
- if (toWriteTo) {
+ writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", nss.toString(), [&] {
+ const CollectionPtr coll =
+ CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss);
+ if (coll) {
return;
}
WriteUnitOfWork wuow(opCtx);
- AutoGetOrCreateDb db(opCtx, toWriteToNss.db(), LockMode::MODE_IX);
- Lock::CollectionLock collLock(opCtx, toWriteToNss, MODE_IX);
- db.getDb()->createCollection(opCtx, toWriteToNss);
+ AutoGetOrCreateDb db(opCtx, nss.db(), LockMode::MODE_IX);
+ Lock::CollectionLock collLock(opCtx, nss, MODE_IX);
+ db.getDb()->createCollection(opCtx, nss);
wuow.commit();
});
+}
- std::vector<BSONObj> serializedPipeline =
- createOplogFetchingPipelineForResharding(
- expCtx, startAfter, collUUID, recipientShard, doesDonorOwnMinKeyChunk)
- ->serializeToBson();
+std::vector<BSONObj> ReshardingOplogFetcher::_makePipeline(Client* client) {
+ auto opCtxRaii = client->makeOperationContext();
+ auto opCtx = opCtxRaii.get();
+ auto expCtx = _makeExpressionContext(opCtx);
+
+ return createOplogFetchingPipelineForResharding(
+ expCtx, _startAt, _collUUID, _recipientShard, _doesDonorOwnMinKeyChunk)
+ ->serializeToBson();
+}
+
+void ReshardingOplogFetcher::consume(Client* client, Shard* shard) {
+ _ensureCollection(client, _toWriteInto);
+ std::vector<BSONObj> serializedPipeline = _makePipeline(client);
AggregationRequest aggRequest(NamespaceString::kRsOplogNamespace, serializedPipeline);
- auto readConcernArgs = repl::ReadConcernArgs(
- boost::optional<LogicalTime>(startAfter.getTs()),
- boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern));
- aggRequest.setReadConcern(readConcernArgs.toBSONInner());
+ if (_useReadConcern) {
+ auto readConcernArgs = repl::ReadConcernArgs(
+ boost::optional<LogicalTime>(_startAt.getTs()),
+ boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern));
+ aggRequest.setReadConcern(readConcernArgs.toBSONInner());
+ }
+
aggRequest.setHint(BSON("$natural" << 1));
+ aggRequest.setRequestReshardingResumeToken(true);
- const bool secondaryOk = true;
- const bool useExhaust = true;
- std::unique_ptr<DBClientCursor> cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest(
- conn, std::move(aggRequest), secondaryOk, useExhaust));
-
- // Noting some possible optimizations:
- //
- // * Batch more inserts into larger storage transactions.
- // * Parallize writing documents across multiple threads.
- // * Doing either of the above while still using the underlying message buffer of bson objects.
- AutoGetCollection toWriteTo(opCtx, toWriteToNss, LockMode::MODE_IX);
- ReshardingDonorOplogId lastSeen = startAfter;
- while (cursor->more() && _isAlive.load()) {
- WriteUnitOfWork wuow(opCtx);
- BSONObj obj = cursor->next();
- auto nextOplog = uassertStatusOK(repl::OplogEntry::parse(obj));
+ if (_initialBatchSize) {
+ aggRequest.setBatchSize(_initialBatchSize);
+ }
- lastSeen = ReshardingDonorOplogId::parse({"OplogFetcherParsing"},
- nextOplog.get_id()->getDocument().toBson());
- uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{obj}, nullptr));
- wuow.commit();
- ++_numOplogEntriesCopied;
+ auto opCtxRaii = client->makeOperationContext();
+ int batchesProcessed = 0;
+ auto svcCtx = client->getServiceContext();
+ uassertStatusOK(shard->runAggregation(
+ opCtxRaii.get(),
+ aggRequest,
+ [this, svcCtx, &batchesProcessed](const std::vector<BSONObj>& batch) {
+ ThreadClient client(fmt::format("ReshardingFetcher-{}-{}",
+ _reshardingUUID.toString(),
+ _donorShard.toString()),
+ svcCtx,
+ nullptr);
+ auto opCtxRaii = cc().makeOperationContext();
+ auto opCtx = opCtxRaii.get();
- if (isFinalOplog(nextOplog, _reshardingUUID)) {
- return boost::none;
- }
- }
+ // Noting some possible optimizations:
+ //
+ // * Batch more inserts into larger storage transactions.
+ // * Parallize writing documents across multiple threads.
+ // * Doing either of the above while still using the underlying message buffer of bson
+ // objects.
+ AutoGetCollection toWriteTo(opCtx, _toWriteInto, LockMode::MODE_IX);
+ for (const BSONObj& doc : batch) {
+ WriteUnitOfWork wuow(opCtx);
+ auto nextOplog = uassertStatusOK(repl::OplogEntry::parse(doc));
+
+ _startAt = ReshardingDonorOplogId::parse(
+ {"OplogFetcherParsing"}, nextOplog.get_id()->getDocument().toBson());
+ uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{doc}, nullptr));
+ wuow.commit();
+ ++_numOplogEntriesCopied;
- return lastSeen;
+ if (isFinalOplog(nextOplog, _reshardingUUID)) {
+ return false;
+ }
+ }
+
+ if (_maxBatches > -1 && ++batchesProcessed >= _maxBatches) {
+ return false;
+ }
+
+ return true;
+ }));
}
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
index aeb198dfabf..a5c06225b07 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
@@ -37,6 +37,7 @@
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/s/resharding/donor_oplog_id_gen.h"
#include "mongo/db/service_context.h"
+#include "mongo/s/client/shard.h"
#include "mongo/s/shard_id.h"
#include "mongo/util/background.h"
#include "mongo/util/uuid.h"
@@ -59,61 +60,50 @@ public:
* - Send an aggregation request + getMores until either:
* -- The "final resharding" oplog entry is found.
* -- An interruption occurs.
+ * -- The fetcher concludes it's fallen off the oplog.
* -- A different error occurs.
*
- * In the first two circumstances, the task will return. In the last circumstance, the task will
- * be rescheduled in a way that resumes where it had left off from.
+ * In the first two circumstances, the task will terminate. If the fetcher has fallen off the
+ * oplog, this is thrown as a fatal resharding exception. In the last circumstance, the task
+ * will be rescheduled in a way that resumes where it had left off from.
*/
- void schedule(executor::TaskExecutor* exector);
+ Future<void> schedule(executor::TaskExecutor* executor);
/**
- * Given a connection, fetches and copies oplog entries until reaching an error, or coming
+ * Given a shard, fetches and copies oplog entries until reaching an error, or coming
* across a sentinel finish oplog entry. Throws if there's more oplog entries to be copied.
*/
- void consume(DBClientBase* conn);
+ void consume(Client* client, Shard* shard);
- /**
- * Kill the underlying client the BackgroundJob is using to expedite cleaning up resources when
- * the output is no longer necessary. The underlying `toWriteInto` collection is left intact,
- * though likely incomplete.
- */
- void setKilled();
+ bool iterate(Client* client);
- /**
- * Returns boost::none if the last oplog entry to be copied is found. Otherwise returns the
- * ReshardingDonorOplogId to resume querying from.
- *
- * Issues an aggregation to `DBClientBase`s starting at `startAfter` and copies the entries
- * relevant to `recipientShard` into `toWriteInto`. Control is returned when the aggregation
- * cursor is exhausted.
- *
- * Returns an identifier for the last oplog-ish document written to `toWriteInto`.
- *
- * This method throws.
- *
- * TODO SERVER-51245 Replace `DBClientBase` with a `Shard`. Right now `Shard` does not do things
- * like perform aggregate commands nor does it expose a cursor/stream interface. However, using
- * a `Shard` object will provide critical behavior such as advancing logical clock values on a
- * response and targetting a node to send the aggregation command to.
- */
- boost::optional<ReshardingDonorOplogId> iterate(OperationContext* opCtx,
- DBClientBase* conn,
- boost::intrusive_ptr<ExpressionContext> expCtx,
- ReshardingDonorOplogId startAfter,
- UUID collUUID,
- const ShardId& recipientShard,
- bool doesDonorOwnMinKeyChunk,
- NamespaceString toWriteInto);
-
- int getNumOplogEntriesCopied() {
+ int getNumOplogEntriesCopied() const {
return _numOplogEntriesCopied;
}
+ ReshardingDonorOplogId getLastSeenTimestamp() const {
+ return _startAt;
+ }
+
+ void setInitialBatchSizeForTest(int size) {
+ _initialBatchSize = size;
+ }
+
+ void useReadConcernForTest(bool use) {
+ _useReadConcern = use;
+ }
+
+ void setMaxBatchesForTest(int maxBatches) {
+ _maxBatches = maxBatches;
+ }
+
private:
/**
* Returns true if there's more work to do and the task should be rescheduled.
*/
- bool _runTask();
+ void _ensureCollection(Client* client, const NamespaceString nss);
+ std::vector<BSONObj> _makePipeline(Client* client);
+ void _reschedule(executor::TaskExecutor* executor);
const UUID _reshardingUUID;
const UUID _collUUID;
@@ -123,9 +113,16 @@ private:
const bool _doesDonorOwnMinKeyChunk;
const NamespaceString _toWriteInto;
- ServiceContext::UniqueClient _client;
- AtomicWord<bool> _isAlive{true};
-
+ Promise<void> _fetchedFinishPromise;
int _numOplogEntriesCopied = 0;
+
+ // For testing to control behavior.
+
+ // The aggregation batch size. This only affects the original call and not `getmore`s.
+ int _initialBatchSize = 0;
+ // Setting to false will omit the `afterClusterTime` and `majority` read concern.
+ bool _useReadConcern = true;
+ // Dictates how many batches get processed before returning control from a call to `consume`.
+ int _maxBatches = -1;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
new file mode 100644
index 00000000000..f91a8a5996c
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -0,0 +1,417 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include <vector>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/logical_session_cache_noop.h"
+#include "mongo/db/op_observer_impl.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/s/resharding/resharding_oplog_fetcher.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/s/shard_server_test_fixture.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/storage/write_unit_of_work.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using namespace unittest;
+
+/**
+ * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs.
+ */
+class OneOffRead {
+public:
+ OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) {
+ _opCtx->recoveryUnit()->abandonSnapshot();
+ if (ts.isNull()) {
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+ } else {
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts);
+ }
+ }
+
+ ~OneOffRead() {
+ _opCtx->recoveryUnit()->abandonSnapshot();
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+ }
+
+private:
+ OperationContext* _opCtx;
+};
+
+class ReshardingOplogFetcherTest : public ShardServerTestFixture {
+public:
+ OperationContext* _opCtx;
+ ServiceContext* _svcCtx;
+ UUID _reshardingUUID = UUID::gen();
+ Timestamp _fetchTimestamp;
+ ShardId _donorShard;
+ ShardId _destinationShard;
+
+ void setUp() {
+ ShardServerTestFixture::setUp();
+ _opCtx = operationContext();
+ _svcCtx = _opCtx->getServiceContext();
+
+ for (const auto& shardId : kTwoShardIdList) {
+ auto shardTargeter = RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))
+ ->getTargeter());
+ shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId));
+ }
+
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
+
+ // onStepUp() relies on the storage interface to create the config.transactions table.
+ repl::StorageInterface::set(getServiceContext(),
+ std::make_unique<repl::StorageInterfaceImpl>());
+ MongoDSessionCatalog::onStepUp(operationContext());
+ LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
+ _fetchTimestamp = queryOplog(BSONObj())["ts"].timestamp();
+
+ _donorShard = kTwoShardIdList[0];
+ _destinationShard = kTwoShardIdList[1];
+ }
+
+ void tearDown() {
+ WaitForMajorityService::get(getServiceContext()).shutDown();
+ ShardServerTestFixture::tearDown();
+ }
+
+ /**
+ * Override the CatalogClient to make CatalogClient::getAllShards automatically return the
+ * expected shards. We cannot mock the network responses for the ShardRegistry reload, since the
+ * ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no
+ * DBClientMock analogous to the NetworkInterfaceMock.
+ */
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) {
+
+ class StaticCatalogClient final : public ShardingCatalogClientMock {
+ public:
+ StaticCatalogClient(std::vector<ShardId> shardIds)
+ : ShardingCatalogClientMock(nullptr), _shardIds(std::move(shardIds)) {}
+
+ StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
+ OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
+ std::vector<ShardType> shardTypes;
+ for (const auto& shardId : _shardIds) {
+ const ConnectionString cs = ConnectionString::forReplicaSet(
+ shardId.toString(), {makeHostAndPort(shardId)});
+ ShardType sType;
+ sType.setName(cs.getSetName());
+ sType.setHost(cs.toString());
+ shardTypes.push_back(std::move(sType));
+ };
+ return repl::OpTimeWith<std::vector<ShardType>>(shardTypes);
+ }
+
+ private:
+ const std::vector<ShardId> _shardIds;
+ };
+
+ return std::make_unique<StaticCatalogClient>(kTwoShardIdList);
+ }
+
+ void insertDocument(const CollectionPtr& coll, const InsertStatement& stmt) {
+ // Insert some documents.
+ OpDebug* const nullOpDebug = nullptr;
+ const bool fromMigrate = false;
+ ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, fromMigrate));
+ }
+
+ BSONObj queryCollection(NamespaceString nss, const BSONObj& query) {
+ BSONObj ret;
+ ASSERT_TRUE(Helpers::findOne(
+ _opCtx, AutoGetCollectionForRead(_opCtx, nss).getCollection(), query, ret))
+ << "Query: " << query;
+ return ret;
+ }
+
+ BSONObj queryOplog(const BSONObj& query) {
+ OneOffRead oor(_opCtx, Timestamp::min());
+ return queryCollection(NamespaceString::kRsOplogNamespace, query);
+ }
+
+ repl::OpTime getLastApplied() {
+ return repl::ReplicationCoordinator::get(_opCtx)->getMyLastAppliedOpTime();
+ }
+
+ boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext() {
+ NamespaceString slimNss =
+ NamespaceString("local.system.resharding.slimOplogForGraphLookup");
+
+ boost::intrusive_ptr<ExpressionContextForTest> expCtx(
+ new ExpressionContextForTest(_opCtx, NamespaceString::kRsOplogNamespace));
+ expCtx->setResolvedNamespace(NamespaceString::kRsOplogNamespace,
+ {NamespaceString::kRsOplogNamespace, {}});
+ expCtx->setResolvedNamespace(slimNss,
+ {slimNss, std::vector<BSONObj>{getSlimOplogPipeline()}});
+ return expCtx;
+ }
+
+ int itcount(NamespaceString nss) {
+ OneOffRead oof(_opCtx, Timestamp::min());
+ AutoGetCollectionForRead autoColl(_opCtx, nss);
+ auto cursor = autoColl.getCollection()->getCursor(_opCtx);
+
+ int ret = 0;
+ while (auto rec = cursor->next()) {
+ ++ret;
+ }
+
+ return ret;
+ }
+
+ void create(NamespaceString nss) {
+ writeConflictRetry(_opCtx, "create", nss.ns(), [&] {
+ AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X);
+ WriteUnitOfWork wunit(_opCtx);
+ if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) {
+ ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1)));
+ }
+ invariant(dbRaii.getDb()->createCollection(_opCtx, nss));
+ wunit.commit();
+ });
+ }
+
+ template <typename T>
+ T requestPassthroughHandler(executor::NetworkTestEnv::FutureHandle<T>& future,
+ int maxBatches = -1) {
+
+ int maxNumRequests = 1000; // No unittests would request more than this?
+ if (maxBatches > -1) {
+ // The fetcher will send a `killCursors` after the last `getMore`.
+ maxNumRequests = maxBatches + 1;
+ }
+
+ bool hasMore = true;
+ for (int batchNum = 0; hasMore && batchNum < maxNumRequests; ++batchNum) {
+ onCommand([&](const executor::RemoteCommandRequest& request) -> StatusWith<BSONObj> {
+ DBDirectClient client(cc().getOperationContext());
+ BSONObj result;
+ bool res = client.runCommand(request.dbname, request.cmdObj, result);
+ if (res == false || result.hasField("cursorsKilled") ||
+ result["cursor"]["id"].Long() == 0) {
+ hasMore = false;
+ }
+
+ return result;
+ });
+ }
+
+ return future.timed_get(Seconds(5));
+ }
+
+ // Writes five documents to `dataCollectionNss` that are replicated with a `destinedRecipient`
+ // followed by the final no-op oplog entry that signals the last oplog entry needed to be
+ // applied for resharding to move to the next stage.
+ void setupBasic(NamespaceString outputCollectionNss,
+ NamespaceString dataCollectionNss,
+ ShardId destinedRecipient) {
+ create(outputCollectionNss);
+ create(dataCollectionNss);
+
+ _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx);
+
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+
+ // Set a failpoint to tack a `destinedRecipient` onto oplog entries.
+ setGlobalFailPoint("addDestinedRecipient",
+ BSON("mode"
+ << "alwaysOn"
+ << "data"
+ << BSON("destinedRecipient" << destinedRecipient.toString())));
+
+ // Insert five documents. Advance the majority point.
+ const std::int32_t docsToInsert = 5;
+ {
+ for (std::int32_t num = 0; num < docsToInsert; ++num) {
+ WriteUnitOfWork wuow(_opCtx);
+ insertDocument(dataColl.getCollection(),
+ InsertStatement(BSON("_id" << num << "a" << num)));
+ wuow.commit();
+ }
+ }
+
+ // Write an entry saying that fetching is complete.
+ {
+ WriteUnitOfWork wuow(_opCtx);
+ _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage(
+ _opCtx,
+ dataColl.getCollection()->ns(),
+ dataColl.getCollection()->uuid(),
+ BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.",
+ dataColl.getCollection()->ns().toString())),
+ BSON("type"
+ << "reshardFinalOp"
+ << "reshardingUUID" << _reshardingUUID),
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
+ wuow.commit();
+ }
+
+ repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
+
+ // Disable the failpoint.
+ setGlobalFailPoint("addDestinedRecipient",
+ BSON("mode"
+ << "off"));
+ }
+
+protected:
+ const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}};
+
+private:
+ static HostAndPort makeHostAndPort(const ShardId& shardId) {
+ return HostAndPort(str::stream() << shardId << ":123");
+ }
+};
+
+TEST_F(ReshardingOplogFetcherTest, TestBasic) {
+ const NamespaceString outputCollectionNss("dbtests.outputCollection");
+ const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
+
+ setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard);
+
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+ auto fetcherJob = launchAsync([&, this] {
+ Client::initThread("RefetchRunner", _svcCtx, nullptr);
+ ReshardingOplogFetcher fetcher(_reshardingUUID,
+ dataColl->uuid(),
+ {_fetchTimestamp, _fetchTimestamp},
+ _donorShard,
+ _destinationShard,
+ true,
+ outputCollectionNss);
+ fetcher.useReadConcernForTest(false);
+ fetcher.setInitialBatchSizeForTest(2);
+
+ fetcher.iterate(&cc());
+ });
+
+ requestPassthroughHandler(fetcherJob);
+
+ // Five oplog entries for resharding + the sentinel final oplog entry.
+ ASSERT_EQ(6, itcount(outputCollectionNss));
+}
+
+TEST_F(ReshardingOplogFetcherTest, TestTrackLastSeen) {
+ const NamespaceString outputCollectionNss("dbtests.outputCollection");
+ const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
+
+ setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard);
+
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+
+ const int maxBatches = 1;
+ auto fetcherJob = launchAsync([&, this] {
+ Client::initThread("RefetcherRunner", _svcCtx, nullptr);
+
+ ReshardingOplogFetcher fetcher(_reshardingUUID,
+ dataColl->uuid(),
+ {_fetchTimestamp, _fetchTimestamp},
+ _donorShard,
+ _destinationShard,
+ true,
+ outputCollectionNss);
+ fetcher.useReadConcernForTest(false);
+ fetcher.setInitialBatchSizeForTest(2);
+ fetcher.setMaxBatchesForTest(maxBatches);
+
+ fetcher.iterate(&cc());
+ return fetcher.getLastSeenTimestamp();
+ });
+
+ ReshardingDonorOplogId lastSeen = requestPassthroughHandler(fetcherJob, maxBatches);
+
+ // Two oplog entries due to the batch size.
+ ASSERT_EQ(2, itcount(outputCollectionNss));
+ // Assert the lastSeen value has been bumped from the original `_fetchTimestamp`.
+ ASSERT_GT(lastSeen.getTs(), _fetchTimestamp);
+}
+
+TEST_F(ReshardingOplogFetcherTest, TestFallingOffOplog) {
+ const NamespaceString outputCollectionNss("dbtests.outputCollection");
+ const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
+
+ setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard);
+
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+
+ auto fetcherJob = launchAsync([&, this] {
+ Client::initThread("RefetcherRunner", _svcCtx, nullptr);
+
+ const Timestamp doesNotExist(1, 1);
+ ReshardingOplogFetcher fetcher(_reshardingUUID,
+ dataColl->uuid(),
+ {doesNotExist, doesNotExist},
+ _donorShard,
+ _destinationShard,
+ true,
+ outputCollectionNss);
+ fetcher.useReadConcernForTest(false);
+
+ try {
+ fetcher.iterate(&cc());
+ // Test failure case.
+ return Status::OK();
+ } catch (...) {
+ return exceptionToStatus();
+ }
+ });
+
+ auto fetcherStatus = requestPassthroughHandler(fetcherJob);
+
+ // Two oplog entries due to the batch size.
+ ASSERT_EQ(0, itcount(outputCollectionNss));
+ ASSERT_EQ(ErrorCodes::OplogQueryMinTsMissing, fetcherStatus.code());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/shard_local.cpp b/src/mongo/db/s/shard_local.cpp
index 0f27d3a05d5..d1d66fddb53 100644
--- a/src/mongo/db/s/shard_local.cpp
+++ b/src/mongo/db/s/shard_local.cpp
@@ -201,4 +201,10 @@ void ShardLocal::runFireAndForgetCommand(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
+Status ShardLocal::runAggregation(OperationContext* opCtx,
+ const AggregationRequest& aggRequest,
+ std::function<bool(const std::vector<BSONObj>& batch)> callback) {
+ MONGO_UNREACHABLE;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/shard_local.h b/src/mongo/db/s/shard_local.h
index df625905d7c..696bd665c3d 100644
--- a/src/mongo/db/s/shard_local.h
+++ b/src/mongo/db/s/shard_local.h
@@ -70,6 +70,10 @@ public:
const std::string& dbName,
const BSONObj& cmdObj) override;
+ Status runAggregation(OperationContext* opCtx,
+ const AggregationRequest& aggRequest,
+ std::function<bool(const std::vector<BSONObj>& batch)> callback);
+
private:
StatusWith<Shard::CommandResponse> _runCommand(OperationContext* opCtx,
const ReadPreferenceSetting& unused,
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index c03839ddbed..09d63233bdf 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -123,7 +123,6 @@ if not has_option('noshell') and usemozjs:
'querytests.cpp',
'replica_set_tests.cpp',
'repltests.cpp',
- 'resharding_tests.cpp',
'rollbacktests.cpp',
'socktests.cpp',
'storage_timestamp_tests.cpp',
diff --git a/src/mongo/dbtests/resharding_tests.cpp b/src/mongo/dbtests/resharding_tests.cpp
deleted file mode 100644
index 996371a0cf4..00000000000
--- a/src/mongo/dbtests/resharding_tests.cpp
+++ /dev/null
@@ -1,507 +0,0 @@
-/**
- * Copyright (C) 2020-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
-
-#include "mongo/platform/basic.h"
-
-#include <memory>
-
-#include "mongo/db/client.h"
-#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/dbhelpers.h"
-#include "mongo/db/global_settings.h"
-#include "mongo/db/op_observer_impl.h"
-#include "mongo/db/op_observer_registry.h"
-#include "mongo/db/pipeline/expression_context_for_test.h"
-#include "mongo/db/repl/oplog.h"
-#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/repl/replication_consistency_markers_mock.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/replication_coordinator_mock.h"
-#include "mongo/db/repl/replication_process.h"
-#include "mongo/db/repl/replication_recovery_mock.h"
-#include "mongo/db/repl/storage_interface_impl.h"
-#include "mongo/db/s/resharding/resharding_oplog_fetcher.h"
-#include "mongo/db/s/resharding_util.h"
-#include "mongo/db/server_options.h"
-#include "mongo/db/service_context.h"
-#include "mongo/db/storage/snapshot_manager.h"
-#include "mongo/db/storage/storage_engine.h"
-#include "mongo/db/storage/write_unit_of_work.h"
-#include "mongo/db/vector_clock_mutable.h"
-#include "mongo/dbtests/dbtests.h"
-#include "mongo/logv2/log.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-namespace {
-/**
- * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs.
- */
-class OneOffRead {
-public:
- OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) {
- _opCtx->recoveryUnit()->abandonSnapshot();
- if (ts.isNull()) {
- _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
- } else {
- _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts);
- }
- }
-
- ~OneOffRead() {
- _opCtx->recoveryUnit()->abandonSnapshot();
- _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
- }
-
-private:
- OperationContext* _opCtx;
-};
-
-/**
- * Observed problems using ShardingMongodTestFixture:
- *
- * - Does not mix with dbtest. Both will initialize a ServiceContext.
- * - By default uses ephemeralForTest. These tests require a storage engine that supports majority
- * reads.
- * - When run as a unittest (and using WT), the fixture initializes the storage engine for each test
- * that is run. WT specifically installs a ServerStatusSection. The server status code asserts
- * that a section is never added after a `serverStatus` command is run. Tests defined in
- * `migration_manager_test` (part of the `db_s_config_server_test` unittest binary) call a
- * serverStatus triggerring this assertion.
- */
-class ReshardingTest {
-public:
- ServiceContext::UniqueOperationContext _opCtxRaii = cc().makeOperationContext();
- OperationContext* _opCtx = _opCtxRaii.get();
- ServiceContext* _svcCtx = _opCtx->getServiceContext();
- VectorClockMutable* _clock = VectorClockMutable::get(_opCtx);
- // A convenience UUID.
- UUID _reshardingUUID = UUID::gen();
- // Timestamp of the first oplog entry which the fixture will set up.
- Timestamp _fetchTimestamp;
-
- ReshardingTest() {
- repl::ReplSettings replSettings;
- replSettings.setOplogSizeBytes(100 * 1024 * 1024);
- replSettings.setReplSetString("rs0");
- setGlobalReplSettings(replSettings);
-
- auto replCoordinatorMock =
- std::make_unique<repl::ReplicationCoordinatorMock>(_svcCtx, replSettings);
- replCoordinatorMock->alwaysAllowWrites(true);
- repl::ReplicationCoordinator::set(_svcCtx, std::move(replCoordinatorMock));
- repl::StorageInterface::set(_svcCtx, std::make_unique<repl::StorageInterfaceImpl>());
- repl::ReplicationProcess::set(
- _svcCtx,
- std::make_unique<repl::ReplicationProcess>(
- repl::StorageInterface::get(_svcCtx),
- std::make_unique<repl::ReplicationConsistencyMarkersMock>(),
- std::make_unique<repl::ReplicationRecoveryMock>()));
-
- // Since the Client object persists across tests, even though the global
- // ReplicationCoordinator does not, we need to clear the last op associated with the client
- // to avoid the invariant in ReplClientInfo::setLastOp that the optime only goes forward.
- repl::ReplClientInfo::forClient(_opCtx->getClient()).clearLastOp_forTest();
-
- auto opObsRegistry = std::make_unique<OpObserverRegistry>();
- opObsRegistry->addObserver(std::make_unique<OpObserverImpl>());
- _opCtx->getServiceContext()->setOpObserver(std::move(opObsRegistry));
-
- // Clean out the oplog and write one no-op entry. The timestamp of this first oplog entry
- // will serve as resharding's `fetchTimestamp`.
- repl::createOplog(_opCtx);
- reset(NamespaceString::kRsOplogNamespace);
- {
- WriteUnitOfWork wuow(_opCtx);
- Lock::GlobalLock lk(_opCtx, LockMode::MODE_IX);
- _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage(
- _opCtx,
- // Choose a random, irrelevant replicated namespace.
- NamespaceString::kSystemKeysNamespace,
- UUID::gen(),
- BSON("msg"
- << "Dummy op."),
- boost::none,
- boost::none,
- boost::none,
- boost::none,
- boost::none);
- wuow.commit();
- repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
- }
- _fetchTimestamp = queryOplog(BSONObj())["ts"].timestamp();
- std::cout << " Fetch timestamp: " << _fetchTimestamp.toString() << std::endl;
-
- _clock->tickClusterTimeTo(LogicalTime(Timestamp(1, 0)));
- }
-
- ~ReshardingTest() {
- try {
- reset(NamespaceString("local.oplog.rs"));
- } catch (...) {
- FAIL("Exception while cleaning up test");
- }
- }
-
-
- /**
- * Walking on ice: resetting the ReplicationCoordinator destroys the underlying
- * `DropPendingCollectionReaper`. Use a truncate/dropAllIndexes to clean out a collection
- * without actually dropping it.
- */
- void reset(NamespaceString nss) const {
- ::mongo::writeConflictRetry(_opCtx, "deleteAll", nss.ns(), [&] {
- // Do not write DDL operations to the oplog. This keeps the initial oplog state for each
- // test predictable.
- repl::UnreplicatedWritesBlock uwb(_opCtx);
- _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
- AutoGetCollection collRaii(_opCtx, nss, LockMode::MODE_X);
-
- if (collRaii) {
- WriteUnitOfWork wunit(_opCtx);
- invariant(collRaii.getWritableCollection()->truncate(_opCtx).isOK());
- if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) {
- ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1)));
- }
- collRaii.getWritableCollection()->getIndexCatalog()->dropAllIndexes(_opCtx, false);
- wunit.commit();
- return;
- }
-
- AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X);
- WriteUnitOfWork wunit(_opCtx);
- if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) {
- ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1)));
- }
- invariant(dbRaii.getDb()->createCollection(_opCtx, nss));
- wunit.commit();
- });
- }
-
- void insertDocument(const CollectionPtr& coll, const InsertStatement& stmt) {
- // Insert some documents.
- OpDebug* const nullOpDebug = nullptr;
- const bool fromMigrate = false;
- ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, fromMigrate));
- }
-
- BSONObj queryCollection(NamespaceString nss, const BSONObj& query) {
- BSONObj ret;
- ASSERT_TRUE(Helpers::findOne(
- _opCtx, AutoGetCollectionForRead(_opCtx, nss).getCollection(), query, ret))
- << "Query: " << query;
- return ret;
- }
-
- BSONObj queryOplog(const BSONObj& query) {
- OneOffRead oor(_opCtx, Timestamp::min());
- return queryCollection(NamespaceString::kRsOplogNamespace, query);
- }
-
- repl::OpTime getLastApplied() {
- return repl::ReplicationCoordinator::get(_opCtx)->getMyLastAppliedOpTime();
- }
-
- boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext() {
- NamespaceString slimNss =
- NamespaceString("local.system.resharding.slimOplogForGraphLookup");
-
- boost::intrusive_ptr<ExpressionContextForTest> expCtx(
- new ExpressionContextForTest(_opCtx, NamespaceString::kRsOplogNamespace));
- expCtx->setResolvedNamespace(NamespaceString::kRsOplogNamespace,
- {NamespaceString::kRsOplogNamespace, {}});
- expCtx->setResolvedNamespace(slimNss,
- {slimNss, std::vector<BSONObj>{getSlimOplogPipeline()}});
- return expCtx;
- }
-
- int itcount(NamespaceString nss) {
- OneOffRead oof(_opCtx, Timestamp::min());
- AutoGetCollectionForRead autoColl(_opCtx, nss);
- auto cursor = autoColl.getCollection()->getCursor(_opCtx);
-
- int ret = 0;
- while (auto rec = cursor->next()) {
- ++ret;
- }
-
- return ret;
- }
-
- // Writes five documents to `dataCollectionNss` that are replicated with a `destinedRecipient`
- // followed by the final no-op oplog entry that signals the last oplog entry needed to be
- // applied for resharding to move to the next stage.
- void setupBasic(NamespaceString outputCollectionNss,
- NamespaceString dataCollectionNss,
- ShardId destinedRecipient) {
- reset(outputCollectionNss);
- reset(dataCollectionNss);
-
- AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
-
- // Set a failpoint to tack a `destinedRecipient` onto oplog entries.
- setGlobalFailPoint("addDestinedRecipient",
- BSON("mode"
- << "alwaysOn"
- << "data"
- << BSON("destinedRecipient" << destinedRecipient.toString())));
-
- // Insert five documents. Advance the majority point.
- const std::int32_t docsToInsert = 5;
- {
- for (std::int32_t num = 0; num < docsToInsert; ++num) {
- WriteUnitOfWork wuow(_opCtx);
- insertDocument(dataColl.getCollection(),
- InsertStatement(BSON("_id" << num << "a" << num)));
- wuow.commit();
- }
- }
-
- // Write an entry saying that fetching is complete.
- {
- WriteUnitOfWork wuow(_opCtx);
- _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage(
- _opCtx,
- dataColl.getCollection()->ns(),
- dataColl.getCollection()->uuid(),
- BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.",
- dataColl.getCollection()->ns().toString())),
- BSON("type"
- << "reshardFinalOp"
- << "reshardingUUID" << _reshardingUUID),
- boost::none,
- boost::none,
- boost::none,
- boost::none);
- wuow.commit();
- }
-
- repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
- _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot(
- getLastApplied().getTimestamp());
-
- // Disable the failpoint.
- setGlobalFailPoint("addDestinedRecipient",
- BSON("mode"
- << "off"));
- }
-};
-
-class RunFetchIteration : public ReshardingTest {
-public:
- void run() {
- const NamespaceString outputCollectionNss("dbtests.outputCollection");
- reset(outputCollectionNss);
- const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
- reset(dataCollectionNss);
-
- AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
-
- // Set a failpoint to tack a `destinedRecipient` onto oplog entries.
- setGlobalFailPoint("addDestinedRecipient",
- BSON("mode"
- << "alwaysOn"
- << "data"
- << BSON("destinedRecipient"
- << "shard1")));
-
- // Insert five documents. Advance the majority point. Insert five more documents.
- const std::int32_t docsToInsert = 5;
- {
- for (std::int32_t num = 0; num < docsToInsert; ++num) {
- WriteUnitOfWork wuow(_opCtx);
- insertDocument(dataColl.getCollection(),
- InsertStatement(BSON("_id" << num << "a" << num)));
- wuow.commit();
- }
- }
-
- repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
- const Timestamp firstFiveLastApplied = getLastApplied().getTimestamp();
- _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot(
- firstFiveLastApplied);
- {
- for (std::int32_t num = docsToInsert; num < 2 * docsToInsert; ++num) {
- WriteUnitOfWork wuow(_opCtx);
- insertDocument(dataColl.getCollection(),
- InsertStatement(BSON("_id" << num << "a" << num)));
- wuow.commit();
- }
- }
-
- // Disable the failpoint.
- setGlobalFailPoint("addDestinedRecipient",
- BSON("mode"
- << "off"));
-
- repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
- const Timestamp latestLastApplied = getLastApplied().getTimestamp();
-
- // The first call to `iterate` should return the first five inserts and return a
- // `ReshardingDonorOplogId` matching the last applied of those five inserts.
- ReshardingOplogFetcher fetcher(_reshardingUUID,
- dataColl->uuid(),
- {_fetchTimestamp, _fetchTimestamp},
- ShardId("fakeDonorShard"),
- ShardId("shard1"),
- true,
- outputCollectionNss);
- DBDirectClient client(_opCtx);
- boost::optional<ReshardingDonorOplogId> donorOplogId =
- fetcher.iterate(_opCtx,
- &client,
- createExpressionContext(),
- {_fetchTimestamp, _fetchTimestamp},
- dataColl->uuid(),
- {"shard1"},
- true,
- outputCollectionNss);
-
- ASSERT(donorOplogId != boost::none);
- ASSERT_EQ(docsToInsert, itcount(outputCollectionNss));
- ASSERT_EQ(firstFiveLastApplied, donorOplogId->getClusterTime());
- ASSERT_EQ(firstFiveLastApplied, donorOplogId->getTs());
-
- // Advance the committed snapshot. A second `iterate` should return the second batch of five
- // inserts.
- _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot(
- getLastApplied().getTimestamp());
-
- donorOplogId = fetcher.iterate(_opCtx,
- &client,
- createExpressionContext(),
- {firstFiveLastApplied, firstFiveLastApplied},
- dataColl->uuid(),
- {"shard1"},
- true,
- outputCollectionNss);
-
- ASSERT(donorOplogId != boost::none);
- // Two batches of five inserts entry for the create collection oplog entry.
- ASSERT_EQ((2 * docsToInsert), itcount(outputCollectionNss));
- ASSERT_EQ(latestLastApplied, donorOplogId->getClusterTime());
- ASSERT_EQ(latestLastApplied, donorOplogId->getTs());
- }
-};
-
-class RunConsume : public ReshardingTest {
-public:
- void run() {
- const NamespaceString outputCollectionNss("dbtests.outputCollection");
- const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
-
- const ShardId destinationShard("shard1");
- setupBasic(outputCollectionNss, dataCollectionNss, destinationShard);
-
- AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IS);
- ReshardingOplogFetcher fetcher(_reshardingUUID,
- dataColl->uuid(),
- {_fetchTimestamp, _fetchTimestamp},
- ShardId("fakeDonorShard"),
- destinationShard,
- true,
- outputCollectionNss);
- DBDirectClient client(_opCtx);
- fetcher.consume(&client);
-
- // Six oplog entries should be copied. Five inserts and the final no-op oplog entry.
- ASSERT_EQ(6, fetcher.getNumOplogEntriesCopied());
- }
-};
-
-class InterruptConsume : public ReshardingTest {
-public:
- void run() {
- const NamespaceString outputCollectionNss("dbtests.outputCollection");
- const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
-
- const ShardId destinationShard("shard1");
- setupBasic(outputCollectionNss, dataCollectionNss, destinationShard);
-
- AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IS);
- ReshardingOplogFetcher fetcher(_reshardingUUID,
- dataColl->uuid(),
- {_fetchTimestamp, _fetchTimestamp},
- ShardId("fakeDonorShard"),
- destinationShard,
- true,
- outputCollectionNss);
-
- // Interrupt the fetcher. A fetcher object owns its own client, but interruption does not
- // require the background job to be started.
- fetcher.setKilled();
-
- DBDirectClient client(_opCtx);
- ASSERT_THROWS(fetcher.consume(&client), ExceptionForCat<ErrorCategory::Interruption>);
- }
-};
-
-class AllReshardingTests : public unittest::OldStyleSuiteSpecification {
-public:
- AllReshardingTests() : unittest::OldStyleSuiteSpecification("ReshardingTests") {}
-
- // Must be evaluated at test run() time, not static-init time.
- static bool shouldSkip() {
- // Only run on storage engines that support snapshot reads.
- auto storageEngine = cc().getServiceContext()->getStorageEngine();
- if (!storageEngine->supportsReadConcernSnapshot() ||
- !mongo::serverGlobalParams.enableMajorityReadConcern) {
- LOGV2(5123009,
- "Skipping this test because the configuration does not support majority reads.",
- "storageEngine"_attr = storageGlobalParams.engine,
- "enableMajorityReadConcern"_attr =
- mongo::serverGlobalParams.enableMajorityReadConcern);
- return true;
- }
- return false;
- }
-
- template <typename T>
- void addIf() {
- addNameCallback(nameForTestClass<T>(), [] {
- if (!shouldSkip())
- T().run();
- });
- }
-
- void setupTests() {
- addIf<RunFetchIteration>();
- addIf<RunConsume>();
- addIf<InterruptConsume>();
- }
-};
-
-unittest::OldStyleSuiteInitializer<AllReshardingTests> allReshardingTests;
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h
index 79f1a2b669c..82e45ef5e55 100644
--- a/src/mongo/s/client/shard.h
+++ b/src/mongo/s/client/shard.h
@@ -36,6 +36,7 @@
#include "mongo/client/read_preference.h"
#include "mongo/db/logical_time.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/executor/remote_command_response.h"
@@ -207,6 +208,18 @@ public:
Milliseconds maxTimeMSOverride);
/**
+ * Synchronously run the aggregation request, with a best effort honoring of request
+ * options. `callback` will be called with the batch contained in each response. `callback`
+ * should return `true` to execute another getmore. Returning `false` will send a
+ * `killCursors`. If the aggregation results are exhausted, there will be no additional calls to
+ * `callback`.
+ */
+ virtual Status runAggregation(
+ OperationContext* opCtx,
+ const AggregationRequest& aggRequest,
+ std::function<bool(const std::vector<BSONObj>& batch)> callback) = 0;
+
+ /**
* Runs a write command against a shard. This is separate from runCommand, because write
* commands return errors in a different format than regular commands do, so checking for
* retriable errors must be done differently.
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 5254c24d37d..30ceec26868 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -281,7 +281,7 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand(
const auto& data = dataStatus.getValue();
if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) {
- // Sharding users of ReplSetMetadata do not require the wall clock time field to be set
+ // Sharding users of ReplSetMetadata do not require the wall clock time field to be set.
auto replParseStatus =
rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata);
if (!replParseStatus.isOK()) {
@@ -411,6 +411,91 @@ void ShardRemote::runFireAndForgetCommand(OperationContext* opCtx,
.ignore();
}
+Status ShardRemote::runAggregation(
+ OperationContext* opCtx,
+ const AggregationRequest& aggRequest,
+ std::function<bool(const std::vector<BSONObj>& batch)> callback) {
+
+ BSONObj readPrefMetadata;
+
+ ReadPreferenceSetting readPreference =
+ uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(
+ aggRequest.getUnwrappedReadPref(), ReadPreference::SecondaryPreferred));
+
+ auto swHost = _targeter->findHost(opCtx, readPreference);
+ if (!swHost.isOK()) {
+ return swHost.getStatus();
+ }
+ HostAndPort host = swHost.getValue();
+
+ BSONObjBuilder builder;
+ readPreference.toContainingBSON(&builder);
+ readPrefMetadata = builder.obj();
+
+ Status status =
+ Status(ErrorCodes::InternalError, "Internal error running cursor callback in command");
+ auto fetcherCallback = [&status, callback](const Fetcher::QueryResponseStatus& dataStatus,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ // Throw out any accumulated results on error
+ if (!dataStatus.isOK()) {
+ status = dataStatus.getStatus();
+ return;
+ }
+
+ const auto& data = dataStatus.getValue();
+
+ if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) {
+ // Sharding users of ReplSetMetadata do not require the wall clock time field to be set.
+ auto replParseStatus =
+ rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata);
+ if (!replParseStatus.isOK()) {
+ status = replParseStatus.getStatus();
+ return;
+ }
+ }
+
+ if (!callback(data.documents)) {
+ *nextAction = Fetcher::NextAction::kNoAction;
+ }
+
+ status = Status::OK();
+
+ if (!getMoreBob) {
+ return;
+ }
+ getMoreBob->append("getMore", data.cursorId);
+ getMoreBob->append("collection", data.nss.coll());
+ };
+
+ Milliseconds requestTimeout(-1);
+ if (aggRequest.getMaxTimeMS()) {
+ requestTimeout = Milliseconds(aggRequest.getMaxTimeMS());
+ }
+
+ auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ Fetcher fetcher(executor.get(),
+ host,
+ aggRequest.getNamespaceString().db().toString(),
+ aggRequest.serializeToCommandObj().toBson(),
+ fetcherCallback,
+ readPrefMetadata,
+ requestTimeout, /* command network timeout */
+ requestTimeout /* getMore network timeout */);
+
+ Status scheduleStatus = fetcher.schedule();
+ if (!scheduleStatus.isOK()) {
+ return scheduleStatus;
+ }
+
+ fetcher.join();
+
+ updateReplSetMonitor(host, status);
+
+ return status;
+}
+
+
StatusWith<ShardRemote::AsyncCmdHandle> ShardRemote::_scheduleCommand(
OperationContext* opCtx,
const ReadPreferenceSetting& readPref,
diff --git a/src/mongo/s/client/shard_remote.h b/src/mongo/s/client/shard_remote.h
index eb0c547c97f..e7ca9004112 100644
--- a/src/mongo/s/client/shard_remote.h
+++ b/src/mongo/s/client/shard_remote.h
@@ -85,6 +85,10 @@ public:
const std::string& dbName,
const BSONObj& cmdObj) final;
+ Status runAggregation(OperationContext* opCtx,
+ const AggregationRequest& aggRequest,
+ std::function<bool(const std::vector<BSONObj>& batch)> callback);
+
private:
struct AsyncCmdHandle {
HostAndPort hostTargetted;
diff --git a/src/mongo/unittest/unittest.h b/src/mongo/unittest/unittest.h
index 2ef1d7cd77e..76563290fff 100644
--- a/src/mongo/unittest/unittest.h
+++ b/src/mongo/unittest/unittest.h
@@ -49,6 +49,7 @@
#include "mongo/base/status_with.h"
#include "mongo/base/string_data.h"
+#include "mongo/logv2/log_debug.h"
#include "mongo/logv2/log_detail.h"
#include "mongo/unittest/bson_test_util.h"
#include "mongo/unittest/unittest_helpers.h"