summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUladzimir Makouski <uladzimir.makouski@mongodb.com>2020-11-25 07:45:36 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-25 09:53:58 +0000
commit7b92716c97e05c09f624513b03bd3eb7663aa537 (patch)
tree8691a9414e1be23f8187263bfbca191d05766b9e
parent1c5ab129a97535162db665db97effcc1ead0ad22 (diff)
downloadmongo-7b92716c97e05c09f624513b03bd3eb7663aa537.tar.gz
Revert "SERVER-51245: Have resharding oplog fetching use a Fetcher."
This reverts commit cda3a52701fe4143b06bd981b98514e69d0a93eb.
-rw-r--r--src/mongo/db/auth/authz_manager_external_state_mock.cpp6
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp231
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.h81
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp417
-rw-r--r--src/mongo/db/s/shard_local.cpp6
-rw-r--r--src/mongo/db/s/shard_local.h4
-rw-r--r--src/mongo/dbtests/SConscript1
-rw-r--r--src/mongo/dbtests/resharding_tests.cpp507
-rw-r--r--src/mongo/s/client/shard.h13
-rw-r--r--src/mongo/s/client/shard_remote.cpp87
-rw-r--r--src/mongo/s/client/shard_remote.h4
-rw-r--r--src/mongo/unittest/unittest.h1
13 files changed, 663 insertions, 698 deletions
diff --git a/src/mongo/db/auth/authz_manager_external_state_mock.cpp b/src/mongo/db/auth/authz_manager_external_state_mock.cpp
index aae6de7764e..00d479756a6 100644
--- a/src/mongo/db/auth/authz_manager_external_state_mock.cpp
+++ b/src/mongo/db/auth/authz_manager_external_state_mock.cpp
@@ -112,11 +112,7 @@ void AuthzManagerExternalStateMock::setAuthzVersion(int version) {
std::unique_ptr<AuthzSessionExternalState>
AuthzManagerExternalStateMock::makeAuthzSessionExternalState(AuthorizationManager* authzManager) {
- auto ret = std::make_unique<AuthzSessionExternalStateMock>(authzManager);
- // Construct a `AuthzSessionExternalStateMock` structure that represents the default no-auth
- // state of a running mongod.
- ret->setReturnValueForShouldIgnoreAuthChecks(true);
- return ret;
+ return std::make_unique<AuthzSessionExternalStateMock>(authzManager);
}
Status AuthzManagerExternalStateMock::findOne(OperationContext* opCtx,
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 010435d8cec..373be39e030 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -452,7 +452,6 @@ env.CppUnitTest(
'resharding/resharding_donor_oplog_iterator_test.cpp',
'resharding/resharding_donor_recipient_common_test.cpp',
'resharding/resharding_oplog_applier_test.cpp',
- 'resharding/resharding_oplog_fetcher_test.cpp',
'resharding/resharding_recipient_service_test.cpp',
'session_catalog_migration_destination_test.cpp',
'session_catalog_migration_source_test.cpp',
@@ -475,9 +474,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/logical_session_cache_impl',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/pipeline/document_source_mock',
- '$BUILD_DIR/mongo/db/pipeline/expression_context',
'$BUILD_DIR/mongo/db/query/query_request',
- '$BUILD_DIR/mongo/db/query/query_test_service_context',
'$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
'$BUILD_DIR/mongo/db/repl/oplog_interface_local',
'$BUILD_DIR/mongo/db/repl/replmocks',
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
index e0bd954b8c1..2dce0fee77b 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
@@ -87,71 +87,88 @@ ReshardingOplogFetcher::ReshardingOplogFetcher(UUID reshardingUUID,
_donorShard(donorShard),
_recipientShard(recipientShard),
_doesDonorOwnMinKeyChunk(doesDonorOwnMinKeyChunk),
- _toWriteInto(toWriteInto) {}
+ _toWriteInto(toWriteInto),
+ _client(getGlobalServiceContext()->makeClient(
+ fmt::format("OplogFetcher-{}-{}", reshardingUUID.toString(), donorShard.toString()))) {}
-Future<void> ReshardingOplogFetcher::schedule(executor::TaskExecutor* executor) {
- auto pf = makePromiseFuture<void>();
- _fetchedFinishPromise = std::move(pf.promise);
-
- _reschedule(executor);
+void ReshardingOplogFetcher::consume(DBClientBase* conn) {
+ while (true) {
+ auto opCtxRaii = _client->makeOperationContext();
+ opCtxRaii->checkForInterrupt();
+ auto expCtx = _makeExpressionContext(opCtxRaii.get());
+ boost::optional<ReshardingDonorOplogId> restartAt = iterate(opCtxRaii.get(),
+ conn,
+ expCtx,
+ _startAt,
+ _collUUID,
+ _recipientShard,
+ _doesDonorOwnMinKeyChunk,
+ _toWriteInto);
+ if (!restartAt) {
+ return;
+ }
+ _startAt = restartAt.get();
+ }
+}
- return std::move(pf.future);
+void ReshardingOplogFetcher::setKilled() {
+ _isAlive.store(false);
+ _client->setKilled();
}
-void ReshardingOplogFetcher::_reschedule(executor::TaskExecutor* executor) {
+void ReshardingOplogFetcher::schedule(executor::TaskExecutor* executor) {
executor->schedule([this, executor](Status status) {
- ThreadClient client(
- fmt::format("OplogFetcher-{}-{}", _reshardingUUID.toString(), _donorShard.toString()),
- getGlobalServiceContext());
if (!status.isOK()) {
- LOGV2_INFO(5192101, "Resharding oplog fetcher aborting.", "reason"_attr = status);
- _fetchedFinishPromise.setError(status);
return;
}
- try {
- if (iterate(client.get())) {
- _reschedule(executor);
- } else {
- _fetchedFinishPromise.emplaceValue();
- }
- } catch (...) {
- LOGV2_INFO(5192102, "Error.", "reason"_attr = exceptionToStatus());
- _fetchedFinishPromise.setError(exceptionToStatus());
+ if (_runTask()) {
+ schedule(executor);
}
});
}
-bool ReshardingOplogFetcher::iterate(Client* client) {
- std::shared_ptr<Shard> targetShard;
- {
- auto opCtxRaii = client->makeOperationContext();
- opCtxRaii->checkForInterrupt();
+bool ReshardingOplogFetcher::_runTask() {
+ auto opCtxRaii = _client->makeOperationContext();
+ opCtxRaii->checkForInterrupt();
+
+ const Seconds maxStaleness(10);
+ ReadPreferenceSetting readPref(ReadPreference::Nearest, maxStaleness);
+ StatusWith<std::shared_ptr<Shard>> swDonor =
+ Grid::get(opCtxRaii.get())->shardRegistry()->getShard(opCtxRaii.get(), _donorShard);
+ if (!swDonor.isOK()) {
+ LOGV2_WARNING(5127203,
+ "Error finding shard in registry, retrying.",
+ "error"_attr = swDonor.getStatus());
+ return true;
+ }
- const Seconds maxStaleness(10);
- ReadPreferenceSetting readPref(ReadPreference::Nearest, maxStaleness);
- StatusWith<std::shared_ptr<Shard>> swDonor =
- Grid::get(opCtxRaii.get())->shardRegistry()->getShard(opCtxRaii.get(), _donorShard);
- if (!swDonor.isOK()) {
- LOGV2_WARNING(5127203,
- "Error finding shard in registry, retrying.",
- "error"_attr = swDonor.getStatus());
- return true;
- }
- targetShard = swDonor.getValue();
+ StatusWith<HostAndPort> swTargettedDonor =
+ swDonor.getValue()->getTargeter()->findHost(opCtxRaii.get(), readPref);
+ if (!swTargettedDonor.isOK()) {
+ LOGV2_WARNING(5127202,
+ "Error targetting donor, retrying.",
+ "error"_attr = swTargettedDonor.getStatus());
+ return true;
}
+ DBClientConnection donorConn;
+ if (auto status = donorConn.connect(swTargettedDonor.getValue(), "ReshardingOplogFetching"_sd);
+ !status.isOK()) {
+ LOGV2_WARNING(5127201, "Failed connecting to donor, retrying.", "error"_attr = status);
+ return true;
+ }
+ // Reset the OpCtx so consuming can manage short-lived OpCtx lifetimes with the current client.
+ opCtxRaii.reset();
+
try {
// Consume will throw if there's oplog entries to be copied. It only returns cleanly when
// the final oplog has been seen and copied.
- consume(client, targetShard.get());
+ consume(&donorConn);
return false;
} catch (const ExceptionForCat<ErrorCategory::Interruption>&) {
+ _isAlive.store(false);
return false;
- } catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>&) {
- LOGV2_ERROR(
- 5192103, "Fatal resharding error while fetching.", "error"_attr = exceptionToStatus());
- throw;
} catch (const DBException&) {
LOGV2_WARNING(
5127200, "Error while fetching, retrying.", "error"_attr = exceptionToStatus());
@@ -159,99 +176,73 @@ bool ReshardingOplogFetcher::iterate(Client* client) {
}
}
-void ReshardingOplogFetcher::_ensureCollection(Client* client, const NamespaceString nss) {
- auto opCtxRaii = client->makeOperationContext();
- auto opCtx = opCtxRaii.get();
+boost::optional<ReshardingDonorOplogId> ReshardingOplogFetcher::iterate(
+ OperationContext* opCtx,
+ DBClientBase* conn,
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ const ReshardingDonorOplogId startAfter,
+ const UUID collUUID,
+ const ShardId& recipientShard,
+ const bool doesDonorOwnMinKeyChunk,
+ const NamespaceString toWriteToNss) {
+ // This method will use the input opCtx to perform writes into `toWriteToNss`.
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
// Create the destination collection if necessary.
- writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", nss.toString(), [&] {
- const CollectionPtr coll =
- CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss);
- if (coll) {
+ writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", toWriteToNss.toString(), [&] {
+ const CollectionPtr toWriteTo =
+ CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, toWriteToNss);
+ if (toWriteTo) {
return;
}
WriteUnitOfWork wuow(opCtx);
- AutoGetOrCreateDb db(opCtx, nss.db(), LockMode::MODE_IX);
- Lock::CollectionLock collLock(opCtx, nss, MODE_IX);
- db.getDb()->createCollection(opCtx, nss);
+ AutoGetOrCreateDb db(opCtx, toWriteToNss.db(), LockMode::MODE_IX);
+ Lock::CollectionLock collLock(opCtx, toWriteToNss, MODE_IX);
+ db.getDb()->createCollection(opCtx, toWriteToNss);
wuow.commit();
});
-}
-std::vector<BSONObj> ReshardingOplogFetcher::_makePipeline(Client* client) {
- auto opCtxRaii = client->makeOperationContext();
- auto opCtx = opCtxRaii.get();
- auto expCtx = _makeExpressionContext(opCtx);
-
- return createOplogFetchingPipelineForResharding(
- expCtx, _startAt, _collUUID, _recipientShard, _doesDonorOwnMinKeyChunk)
- ->serializeToBson();
-}
-
-void ReshardingOplogFetcher::consume(Client* client, Shard* shard) {
- _ensureCollection(client, _toWriteInto);
- std::vector<BSONObj> serializedPipeline = _makePipeline(client);
+ std::vector<BSONObj> serializedPipeline =
+ createOplogFetchingPipelineForResharding(
+ expCtx, startAfter, collUUID, recipientShard, doesDonorOwnMinKeyChunk)
+ ->serializeToBson();
AggregationRequest aggRequest(NamespaceString::kRsOplogNamespace, serializedPipeline);
- if (_useReadConcern) {
- auto readConcernArgs = repl::ReadConcernArgs(
- boost::optional<LogicalTime>(_startAt.getTs()),
- boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern));
- aggRequest.setReadConcern(readConcernArgs.toBSONInner());
- }
-
+ auto readConcernArgs = repl::ReadConcernArgs(
+ boost::optional<LogicalTime>(startAfter.getTs()),
+ boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern));
+ aggRequest.setReadConcern(readConcernArgs.toBSONInner());
aggRequest.setHint(BSON("$natural" << 1));
- aggRequest.setRequestReshardingResumeToken(true);
-
- if (_initialBatchSize) {
- aggRequest.setBatchSize(_initialBatchSize);
- }
- auto opCtxRaii = client->makeOperationContext();
- int batchesProcessed = 0;
- auto svcCtx = client->getServiceContext();
- uassertStatusOK(shard->runAggregation(
- opCtxRaii.get(),
- aggRequest,
- [this, svcCtx, &batchesProcessed](const std::vector<BSONObj>& batch) {
- ThreadClient client(fmt::format("ReshardingFetcher-{}-{}",
- _reshardingUUID.toString(),
- _donorShard.toString()),
- svcCtx,
- nullptr);
- auto opCtxRaii = cc().makeOperationContext();
- auto opCtx = opCtxRaii.get();
-
- // Noting some possible optimizations:
- //
- // * Batch more inserts into larger storage transactions.
- // * Parallize writing documents across multiple threads.
- // * Doing either of the above while still using the underlying message buffer of bson
- // objects.
- AutoGetCollection toWriteTo(opCtx, _toWriteInto, LockMode::MODE_IX);
- for (const BSONObj& doc : batch) {
- WriteUnitOfWork wuow(opCtx);
- auto nextOplog = uassertStatusOK(repl::OplogEntry::parse(doc));
-
- _startAt = ReshardingDonorOplogId::parse(
- {"OplogFetcherParsing"}, nextOplog.get_id()->getDocument().toBson());
- uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{doc}, nullptr));
- wuow.commit();
- ++_numOplogEntriesCopied;
+ const bool secondaryOk = true;
+ const bool useExhaust = true;
+ std::unique_ptr<DBClientCursor> cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest(
+ conn, std::move(aggRequest), secondaryOk, useExhaust));
+
+ // Noting some possible optimizations:
+ //
+ // * Batch more inserts into larger storage transactions.
+ // * Parallize writing documents across multiple threads.
+ // * Doing either of the above while still using the underlying message buffer of bson objects.
+ AutoGetCollection toWriteTo(opCtx, toWriteToNss, LockMode::MODE_IX);
+ ReshardingDonorOplogId lastSeen = startAfter;
+ while (cursor->more() && _isAlive.load()) {
+ WriteUnitOfWork wuow(opCtx);
+ BSONObj obj = cursor->next();
+ auto nextOplog = uassertStatusOK(repl::OplogEntry::parse(obj));
- if (isFinalOplog(nextOplog, _reshardingUUID)) {
- return false;
- }
- }
+ lastSeen = ReshardingDonorOplogId::parse({"OplogFetcherParsing"},
+ nextOplog.get_id()->getDocument().toBson());
+ uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{obj}, nullptr));
+ wuow.commit();
+ ++_numOplogEntriesCopied;
- if (_maxBatches > -1 && ++batchesProcessed >= _maxBatches) {
- return false;
- }
+ if (isFinalOplog(nextOplog, _reshardingUUID)) {
+ return boost::none;
+ }
+ }
- return true;
- }));
+ return lastSeen;
}
-
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
index a5c06225b07..aeb198dfabf 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
@@ -37,7 +37,6 @@
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/s/resharding/donor_oplog_id_gen.h"
#include "mongo/db/service_context.h"
-#include "mongo/s/client/shard.h"
#include "mongo/s/shard_id.h"
#include "mongo/util/background.h"
#include "mongo/util/uuid.h"
@@ -60,50 +59,61 @@ public:
* - Send an aggregation request + getMores until either:
* -- The "final resharding" oplog entry is found.
* -- An interruption occurs.
- * -- The fetcher concludes it's fallen off the oplog.
* -- A different error occurs.
*
- * In the first two circumstances, the task will terminate. If the fetcher has fallen off the
- * oplog, this is thrown as a fatal resharding exception. In the last circumstance, the task
- * will be rescheduled in a way that resumes where it had left off from.
+ * In the first two circumstances, the task will return. In the last circumstance, the task will
+ * be rescheduled in a way that resumes where it had left off from.
*/
- Future<void> schedule(executor::TaskExecutor* executor);
+ void schedule(executor::TaskExecutor* exector);
/**
- * Given a shard, fetches and copies oplog entries until reaching an error, or coming
+ * Given a connection, fetches and copies oplog entries until reaching an error, or coming
* across a sentinel finish oplog entry. Throws if there's more oplog entries to be copied.
*/
- void consume(Client* client, Shard* shard);
+ void consume(DBClientBase* conn);
- bool iterate(Client* client);
+ /**
+ * Kill the underlying client the BackgroundJob is using to expedite cleaning up resources when
+ * the output is no longer necessary. The underlying `toWriteInto` collection is left intact,
+ * though likely incomplete.
+ */
+ void setKilled();
- int getNumOplogEntriesCopied() const {
+ /**
+ * Returns boost::none if the last oplog entry to be copied is found. Otherwise returns the
+ * ReshardingDonorOplogId to resume querying from.
+ *
+ * Issues an aggregation to `DBClientBase`s starting at `startAfter` and copies the entries
+ * relevant to `recipientShard` into `toWriteInto`. Control is returned when the aggregation
+ * cursor is exhausted.
+ *
+ * Returns an identifier for the last oplog-ish document written to `toWriteInto`.
+ *
+ * This method throws.
+ *
+ * TODO SERVER-51245 Replace `DBClientBase` with a `Shard`. Right now `Shard` does not do things
+ * like perform aggregate commands nor does it expose a cursor/stream interface. However, using
+ * a `Shard` object will provide critical behavior such as advancing logical clock values on a
+ * response and targetting a node to send the aggregation command to.
+ */
+ boost::optional<ReshardingDonorOplogId> iterate(OperationContext* opCtx,
+ DBClientBase* conn,
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ ReshardingDonorOplogId startAfter,
+ UUID collUUID,
+ const ShardId& recipientShard,
+ bool doesDonorOwnMinKeyChunk,
+ NamespaceString toWriteInto);
+
+ int getNumOplogEntriesCopied() {
return _numOplogEntriesCopied;
}
- ReshardingDonorOplogId getLastSeenTimestamp() const {
- return _startAt;
- }
-
- void setInitialBatchSizeForTest(int size) {
- _initialBatchSize = size;
- }
-
- void useReadConcernForTest(bool use) {
- _useReadConcern = use;
- }
-
- void setMaxBatchesForTest(int maxBatches) {
- _maxBatches = maxBatches;
- }
-
private:
/**
* Returns true if there's more work to do and the task should be rescheduled.
*/
- void _ensureCollection(Client* client, const NamespaceString nss);
- std::vector<BSONObj> _makePipeline(Client* client);
- void _reschedule(executor::TaskExecutor* executor);
+ bool _runTask();
const UUID _reshardingUUID;
const UUID _collUUID;
@@ -113,16 +123,9 @@ private:
const bool _doesDonorOwnMinKeyChunk;
const NamespaceString _toWriteInto;
- Promise<void> _fetchedFinishPromise;
- int _numOplogEntriesCopied = 0;
-
- // For testing to control behavior.
+ ServiceContext::UniqueClient _client;
+ AtomicWord<bool> _isAlive{true};
- // The aggregation batch size. This only affects the original call and not `getmore`s.
- int _initialBatchSize = 0;
- // Setting to false will omit the `afterClusterTime` and `majority` read concern.
- bool _useReadConcern = true;
- // Dictates how many batches get processed before returning control from a call to `consume`.
- int _maxBatches = -1;
+ int _numOplogEntriesCopied = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
deleted file mode 100644
index f91a8a5996c..00000000000
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ /dev/null
@@ -1,417 +0,0 @@
-/**
- * Copyright (C) 2020-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
-
-#include "mongo/platform/basic.h"
-
-#include <vector>
-
-#include "mongo/bson/bsonobj.h"
-#include "mongo/db/client.h"
-#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/dbhelpers.h"
-#include "mongo/db/logical_session_cache_noop.h"
-#include "mongo/db/op_observer_impl.h"
-#include "mongo/db/pipeline/document_source_mock.h"
-#include "mongo/db/repl/storage_interface_impl.h"
-#include "mongo/db/repl/wait_for_majority_service.h"
-#include "mongo/db/s/resharding/resharding_oplog_fetcher.h"
-#include "mongo/db/s/resharding_util.h"
-#include "mongo/db/s/shard_server_test_fixture.h"
-#include "mongo/db/service_context.h"
-#include "mongo/db/session_catalog_mongod.h"
-#include "mongo/db/storage/write_unit_of_work.h"
-#include "mongo/s/catalog/sharding_catalog_client_mock.h"
-#include "mongo/s/catalog/type_shard.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-namespace {
-
-using namespace unittest;
-
-/**
- * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs.
- */
-class OneOffRead {
-public:
- OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) {
- _opCtx->recoveryUnit()->abandonSnapshot();
- if (ts.isNull()) {
- _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
- } else {
- _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts);
- }
- }
-
- ~OneOffRead() {
- _opCtx->recoveryUnit()->abandonSnapshot();
- _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
- }
-
-private:
- OperationContext* _opCtx;
-};
-
-class ReshardingOplogFetcherTest : public ShardServerTestFixture {
-public:
- OperationContext* _opCtx;
- ServiceContext* _svcCtx;
- UUID _reshardingUUID = UUID::gen();
- Timestamp _fetchTimestamp;
- ShardId _donorShard;
- ShardId _destinationShard;
-
- void setUp() {
- ShardServerTestFixture::setUp();
- _opCtx = operationContext();
- _svcCtx = _opCtx->getServiceContext();
-
- for (const auto& shardId : kTwoShardIdList) {
- auto shardTargeter = RemoteCommandTargeterMock::get(
- uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))
- ->getTargeter());
- shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId));
- }
-
- WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
-
- // onStepUp() relies on the storage interface to create the config.transactions table.
- repl::StorageInterface::set(getServiceContext(),
- std::make_unique<repl::StorageInterfaceImpl>());
- MongoDSessionCatalog::onStepUp(operationContext());
- LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
- _fetchTimestamp = queryOplog(BSONObj())["ts"].timestamp();
-
- _donorShard = kTwoShardIdList[0];
- _destinationShard = kTwoShardIdList[1];
- }
-
- void tearDown() {
- WaitForMajorityService::get(getServiceContext()).shutDown();
- ShardServerTestFixture::tearDown();
- }
-
- /**
- * Override the CatalogClient to make CatalogClient::getAllShards automatically return the
- * expected shards. We cannot mock the network responses for the ShardRegistry reload, since the
- * ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no
- * DBClientMock analogous to the NetworkInterfaceMock.
- */
- std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
- std::unique_ptr<DistLockManager> distLockManager) {
-
- class StaticCatalogClient final : public ShardingCatalogClientMock {
- public:
- StaticCatalogClient(std::vector<ShardId> shardIds)
- : ShardingCatalogClientMock(nullptr), _shardIds(std::move(shardIds)) {}
-
- StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
- OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
- std::vector<ShardType> shardTypes;
- for (const auto& shardId : _shardIds) {
- const ConnectionString cs = ConnectionString::forReplicaSet(
- shardId.toString(), {makeHostAndPort(shardId)});
- ShardType sType;
- sType.setName(cs.getSetName());
- sType.setHost(cs.toString());
- shardTypes.push_back(std::move(sType));
- };
- return repl::OpTimeWith<std::vector<ShardType>>(shardTypes);
- }
-
- private:
- const std::vector<ShardId> _shardIds;
- };
-
- return std::make_unique<StaticCatalogClient>(kTwoShardIdList);
- }
-
- void insertDocument(const CollectionPtr& coll, const InsertStatement& stmt) {
- // Insert some documents.
- OpDebug* const nullOpDebug = nullptr;
- const bool fromMigrate = false;
- ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, fromMigrate));
- }
-
- BSONObj queryCollection(NamespaceString nss, const BSONObj& query) {
- BSONObj ret;
- ASSERT_TRUE(Helpers::findOne(
- _opCtx, AutoGetCollectionForRead(_opCtx, nss).getCollection(), query, ret))
- << "Query: " << query;
- return ret;
- }
-
- BSONObj queryOplog(const BSONObj& query) {
- OneOffRead oor(_opCtx, Timestamp::min());
- return queryCollection(NamespaceString::kRsOplogNamespace, query);
- }
-
- repl::OpTime getLastApplied() {
- return repl::ReplicationCoordinator::get(_opCtx)->getMyLastAppliedOpTime();
- }
-
- boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext() {
- NamespaceString slimNss =
- NamespaceString("local.system.resharding.slimOplogForGraphLookup");
-
- boost::intrusive_ptr<ExpressionContextForTest> expCtx(
- new ExpressionContextForTest(_opCtx, NamespaceString::kRsOplogNamespace));
- expCtx->setResolvedNamespace(NamespaceString::kRsOplogNamespace,
- {NamespaceString::kRsOplogNamespace, {}});
- expCtx->setResolvedNamespace(slimNss,
- {slimNss, std::vector<BSONObj>{getSlimOplogPipeline()}});
- return expCtx;
- }
-
- int itcount(NamespaceString nss) {
- OneOffRead oof(_opCtx, Timestamp::min());
- AutoGetCollectionForRead autoColl(_opCtx, nss);
- auto cursor = autoColl.getCollection()->getCursor(_opCtx);
-
- int ret = 0;
- while (auto rec = cursor->next()) {
- ++ret;
- }
-
- return ret;
- }
-
- void create(NamespaceString nss) {
- writeConflictRetry(_opCtx, "create", nss.ns(), [&] {
- AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X);
- WriteUnitOfWork wunit(_opCtx);
- if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) {
- ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1)));
- }
- invariant(dbRaii.getDb()->createCollection(_opCtx, nss));
- wunit.commit();
- });
- }
-
- template <typename T>
- T requestPassthroughHandler(executor::NetworkTestEnv::FutureHandle<T>& future,
- int maxBatches = -1) {
-
- int maxNumRequests = 1000; // No unittests would request more than this?
- if (maxBatches > -1) {
- // The fetcher will send a `killCursors` after the last `getMore`.
- maxNumRequests = maxBatches + 1;
- }
-
- bool hasMore = true;
- for (int batchNum = 0; hasMore && batchNum < maxNumRequests; ++batchNum) {
- onCommand([&](const executor::RemoteCommandRequest& request) -> StatusWith<BSONObj> {
- DBDirectClient client(cc().getOperationContext());
- BSONObj result;
- bool res = client.runCommand(request.dbname, request.cmdObj, result);
- if (res == false || result.hasField("cursorsKilled") ||
- result["cursor"]["id"].Long() == 0) {
- hasMore = false;
- }
-
- return result;
- });
- }
-
- return future.timed_get(Seconds(5));
- }
-
- // Writes five documents to `dataCollectionNss` that are replicated with a `destinedRecipient`
- // followed by the final no-op oplog entry that signals the last oplog entry needed to be
- // applied for resharding to move to the next stage.
- void setupBasic(NamespaceString outputCollectionNss,
- NamespaceString dataCollectionNss,
- ShardId destinedRecipient) {
- create(outputCollectionNss);
- create(dataCollectionNss);
-
- _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx);
-
- AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
-
- // Set a failpoint to tack a `destinedRecipient` onto oplog entries.
- setGlobalFailPoint("addDestinedRecipient",
- BSON("mode"
- << "alwaysOn"
- << "data"
- << BSON("destinedRecipient" << destinedRecipient.toString())));
-
- // Insert five documents. Advance the majority point.
- const std::int32_t docsToInsert = 5;
- {
- for (std::int32_t num = 0; num < docsToInsert; ++num) {
- WriteUnitOfWork wuow(_opCtx);
- insertDocument(dataColl.getCollection(),
- InsertStatement(BSON("_id" << num << "a" << num)));
- wuow.commit();
- }
- }
-
- // Write an entry saying that fetching is complete.
- {
- WriteUnitOfWork wuow(_opCtx);
- _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage(
- _opCtx,
- dataColl.getCollection()->ns(),
- dataColl.getCollection()->uuid(),
- BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.",
- dataColl.getCollection()->ns().toString())),
- BSON("type"
- << "reshardFinalOp"
- << "reshardingUUID" << _reshardingUUID),
- boost::none,
- boost::none,
- boost::none,
- boost::none);
- wuow.commit();
- }
-
- repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
-
- // Disable the failpoint.
- setGlobalFailPoint("addDestinedRecipient",
- BSON("mode"
- << "off"));
- }
-
-protected:
- const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}};
-
-private:
- static HostAndPort makeHostAndPort(const ShardId& shardId) {
- return HostAndPort(str::stream() << shardId << ":123");
- }
-};
-
-TEST_F(ReshardingOplogFetcherTest, TestBasic) {
- const NamespaceString outputCollectionNss("dbtests.outputCollection");
- const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
-
- setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard);
-
- AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
- auto fetcherJob = launchAsync([&, this] {
- Client::initThread("RefetchRunner", _svcCtx, nullptr);
- ReshardingOplogFetcher fetcher(_reshardingUUID,
- dataColl->uuid(),
- {_fetchTimestamp, _fetchTimestamp},
- _donorShard,
- _destinationShard,
- true,
- outputCollectionNss);
- fetcher.useReadConcernForTest(false);
- fetcher.setInitialBatchSizeForTest(2);
-
- fetcher.iterate(&cc());
- });
-
- requestPassthroughHandler(fetcherJob);
-
- // Five oplog entries for resharding + the sentinel final oplog entry.
- ASSERT_EQ(6, itcount(outputCollectionNss));
-}
-
-TEST_F(ReshardingOplogFetcherTest, TestTrackLastSeen) {
- const NamespaceString outputCollectionNss("dbtests.outputCollection");
- const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
-
- setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard);
-
- AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
-
- const int maxBatches = 1;
- auto fetcherJob = launchAsync([&, this] {
- Client::initThread("RefetcherRunner", _svcCtx, nullptr);
-
- ReshardingOplogFetcher fetcher(_reshardingUUID,
- dataColl->uuid(),
- {_fetchTimestamp, _fetchTimestamp},
- _donorShard,
- _destinationShard,
- true,
- outputCollectionNss);
- fetcher.useReadConcernForTest(false);
- fetcher.setInitialBatchSizeForTest(2);
- fetcher.setMaxBatchesForTest(maxBatches);
-
- fetcher.iterate(&cc());
- return fetcher.getLastSeenTimestamp();
- });
-
- ReshardingDonorOplogId lastSeen = requestPassthroughHandler(fetcherJob, maxBatches);
-
- // Two oplog entries due to the batch size.
- ASSERT_EQ(2, itcount(outputCollectionNss));
- // Assert the lastSeen value has been bumped from the original `_fetchTimestamp`.
- ASSERT_GT(lastSeen.getTs(), _fetchTimestamp);
-}
-
-TEST_F(ReshardingOplogFetcherTest, TestFallingOffOplog) {
- const NamespaceString outputCollectionNss("dbtests.outputCollection");
- const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
-
- setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard);
-
- AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
-
- auto fetcherJob = launchAsync([&, this] {
- Client::initThread("RefetcherRunner", _svcCtx, nullptr);
-
- const Timestamp doesNotExist(1, 1);
- ReshardingOplogFetcher fetcher(_reshardingUUID,
- dataColl->uuid(),
- {doesNotExist, doesNotExist},
- _donorShard,
- _destinationShard,
- true,
- outputCollectionNss);
- fetcher.useReadConcernForTest(false);
-
- try {
- fetcher.iterate(&cc());
- // Test failure case.
- return Status::OK();
- } catch (...) {
- return exceptionToStatus();
- }
- });
-
- auto fetcherStatus = requestPassthroughHandler(fetcherJob);
-
- // Two oplog entries due to the batch size.
- ASSERT_EQ(0, itcount(outputCollectionNss));
- ASSERT_EQ(ErrorCodes::OplogQueryMinTsMissing, fetcherStatus.code());
-}
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/db/s/shard_local.cpp b/src/mongo/db/s/shard_local.cpp
index d1d66fddb53..0f27d3a05d5 100644
--- a/src/mongo/db/s/shard_local.cpp
+++ b/src/mongo/db/s/shard_local.cpp
@@ -201,10 +201,4 @@ void ShardLocal::runFireAndForgetCommand(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
-Status ShardLocal::runAggregation(OperationContext* opCtx,
- const AggregationRequest& aggRequest,
- std::function<bool(const std::vector<BSONObj>& batch)> callback) {
- MONGO_UNREACHABLE;
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/shard_local.h b/src/mongo/db/s/shard_local.h
index 696bd665c3d..df625905d7c 100644
--- a/src/mongo/db/s/shard_local.h
+++ b/src/mongo/db/s/shard_local.h
@@ -70,10 +70,6 @@ public:
const std::string& dbName,
const BSONObj& cmdObj) override;
- Status runAggregation(OperationContext* opCtx,
- const AggregationRequest& aggRequest,
- std::function<bool(const std::vector<BSONObj>& batch)> callback);
-
private:
StatusWith<Shard::CommandResponse> _runCommand(OperationContext* opCtx,
const ReadPreferenceSetting& unused,
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index 09d63233bdf..c03839ddbed 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -123,6 +123,7 @@ if not has_option('noshell') and usemozjs:
'querytests.cpp',
'replica_set_tests.cpp',
'repltests.cpp',
+ 'resharding_tests.cpp',
'rollbacktests.cpp',
'socktests.cpp',
'storage_timestamp_tests.cpp',
diff --git a/src/mongo/dbtests/resharding_tests.cpp b/src/mongo/dbtests/resharding_tests.cpp
new file mode 100644
index 00000000000..996371a0cf4
--- /dev/null
+++ b/src/mongo/dbtests/resharding_tests.cpp
@@ -0,0 +1,507 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/global_settings.h"
+#include "mongo/db/op_observer_impl.h"
+#include "mongo/db/op_observer_registry.h"
+#include "mongo/db/pipeline/expression_context_for_test.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_consistency_markers_mock.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/repl/replication_recovery_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/s/resharding/resharding_oplog_fetcher.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/server_options.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/storage/snapshot_manager.h"
+#include "mongo/db/storage/storage_engine.h"
+#include "mongo/db/storage/write_unit_of_work.h"
+#include "mongo/db/vector_clock_mutable.h"
+#include "mongo/dbtests/dbtests.h"
+#include "mongo/logv2/log.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+/**
+ * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs.
+ */
+class OneOffRead {
+public:
+ OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) {
+ _opCtx->recoveryUnit()->abandonSnapshot();
+ if (ts.isNull()) {
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+ } else {
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts);
+ }
+ }
+
+ ~OneOffRead() {
+ _opCtx->recoveryUnit()->abandonSnapshot();
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+ }
+
+private:
+ OperationContext* _opCtx;
+};
+
+/**
+ * Observed problems using ShardingMongodTestFixture:
+ *
+ * - Does not mix with dbtest. Both will initialize a ServiceContext.
+ * - By default uses ephemeralForTest. These tests require a storage engine that supports majority
+ * reads.
+ * - When run as a unittest (and using WT), the fixture initializes the storage engine for each test
+ * that is run. WT specifically installs a ServerStatusSection. The server status code asserts
+ * that a section is never added after a `serverStatus` command is run. Tests defined in
+ * `migration_manager_test` (part of the `db_s_config_server_test` unittest binary) call a
+ * serverStatus triggerring this assertion.
+ */
+class ReshardingTest {
+public:
+ ServiceContext::UniqueOperationContext _opCtxRaii = cc().makeOperationContext();
+ OperationContext* _opCtx = _opCtxRaii.get();
+ ServiceContext* _svcCtx = _opCtx->getServiceContext();
+ VectorClockMutable* _clock = VectorClockMutable::get(_opCtx);
+ // A convenience UUID.
+ UUID _reshardingUUID = UUID::gen();
+ // Timestamp of the first oplog entry which the fixture will set up.
+ Timestamp _fetchTimestamp;
+
+ ReshardingTest() {
+ repl::ReplSettings replSettings;
+ replSettings.setOplogSizeBytes(100 * 1024 * 1024);
+ replSettings.setReplSetString("rs0");
+ setGlobalReplSettings(replSettings);
+
+ auto replCoordinatorMock =
+ std::make_unique<repl::ReplicationCoordinatorMock>(_svcCtx, replSettings);
+ replCoordinatorMock->alwaysAllowWrites(true);
+ repl::ReplicationCoordinator::set(_svcCtx, std::move(replCoordinatorMock));
+ repl::StorageInterface::set(_svcCtx, std::make_unique<repl::StorageInterfaceImpl>());
+ repl::ReplicationProcess::set(
+ _svcCtx,
+ std::make_unique<repl::ReplicationProcess>(
+ repl::StorageInterface::get(_svcCtx),
+ std::make_unique<repl::ReplicationConsistencyMarkersMock>(),
+ std::make_unique<repl::ReplicationRecoveryMock>()));
+
+ // Since the Client object persists across tests, even though the global
+ // ReplicationCoordinator does not, we need to clear the last op associated with the client
+ // to avoid the invariant in ReplClientInfo::setLastOp that the optime only goes forward.
+ repl::ReplClientInfo::forClient(_opCtx->getClient()).clearLastOp_forTest();
+
+ auto opObsRegistry = std::make_unique<OpObserverRegistry>();
+ opObsRegistry->addObserver(std::make_unique<OpObserverImpl>());
+ _opCtx->getServiceContext()->setOpObserver(std::move(opObsRegistry));
+
+ // Clean out the oplog and write one no-op entry. The timestamp of this first oplog entry
+ // will serve as resharding's `fetchTimestamp`.
+ repl::createOplog(_opCtx);
+ reset(NamespaceString::kRsOplogNamespace);
+ {
+ WriteUnitOfWork wuow(_opCtx);
+ Lock::GlobalLock lk(_opCtx, LockMode::MODE_IX);
+ _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage(
+ _opCtx,
+ // Choose a random, irrelevant replicated namespace.
+ NamespaceString::kSystemKeysNamespace,
+ UUID::gen(),
+ BSON("msg"
+ << "Dummy op."),
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
+ wuow.commit();
+ repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
+ }
+ _fetchTimestamp = queryOplog(BSONObj())["ts"].timestamp();
+ std::cout << " Fetch timestamp: " << _fetchTimestamp.toString() << std::endl;
+
+ _clock->tickClusterTimeTo(LogicalTime(Timestamp(1, 0)));
+ }
+
+ ~ReshardingTest() {
+ try {
+ reset(NamespaceString("local.oplog.rs"));
+ } catch (...) {
+ FAIL("Exception while cleaning up test");
+ }
+ }
+
+
+ /**
+ * Walking on ice: resetting the ReplicationCoordinator destroys the underlying
+ * `DropPendingCollectionReaper`. Use a truncate/dropAllIndexes to clean out a collection
+ * without actually dropping it.
+ */
+ void reset(NamespaceString nss) const {
+ ::mongo::writeConflictRetry(_opCtx, "deleteAll", nss.ns(), [&] {
+ // Do not write DDL operations to the oplog. This keeps the initial oplog state for each
+ // test predictable.
+ repl::UnreplicatedWritesBlock uwb(_opCtx);
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+ AutoGetCollection collRaii(_opCtx, nss, LockMode::MODE_X);
+
+ if (collRaii) {
+ WriteUnitOfWork wunit(_opCtx);
+ invariant(collRaii.getWritableCollection()->truncate(_opCtx).isOK());
+ if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) {
+ ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1)));
+ }
+ collRaii.getWritableCollection()->getIndexCatalog()->dropAllIndexes(_opCtx, false);
+ wunit.commit();
+ return;
+ }
+
+ AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X);
+ WriteUnitOfWork wunit(_opCtx);
+ if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) {
+ ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1)));
+ }
+ invariant(dbRaii.getDb()->createCollection(_opCtx, nss));
+ wunit.commit();
+ });
+ }
+
+ void insertDocument(const CollectionPtr& coll, const InsertStatement& stmt) {
+ // Insert some documents.
+ OpDebug* const nullOpDebug = nullptr;
+ const bool fromMigrate = false;
+ ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, fromMigrate));
+ }
+
+ BSONObj queryCollection(NamespaceString nss, const BSONObj& query) {
+ BSONObj ret;
+ ASSERT_TRUE(Helpers::findOne(
+ _opCtx, AutoGetCollectionForRead(_opCtx, nss).getCollection(), query, ret))
+ << "Query: " << query;
+ return ret;
+ }
+
+ BSONObj queryOplog(const BSONObj& query) {
+ OneOffRead oor(_opCtx, Timestamp::min());
+ return queryCollection(NamespaceString::kRsOplogNamespace, query);
+ }
+
+ repl::OpTime getLastApplied() {
+ return repl::ReplicationCoordinator::get(_opCtx)->getMyLastAppliedOpTime();
+ }
+
+ boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext() {
+ NamespaceString slimNss =
+ NamespaceString("local.system.resharding.slimOplogForGraphLookup");
+
+ boost::intrusive_ptr<ExpressionContextForTest> expCtx(
+ new ExpressionContextForTest(_opCtx, NamespaceString::kRsOplogNamespace));
+ expCtx->setResolvedNamespace(NamespaceString::kRsOplogNamespace,
+ {NamespaceString::kRsOplogNamespace, {}});
+ expCtx->setResolvedNamespace(slimNss,
+ {slimNss, std::vector<BSONObj>{getSlimOplogPipeline()}});
+ return expCtx;
+ }
+
+ int itcount(NamespaceString nss) {
+ OneOffRead oof(_opCtx, Timestamp::min());
+ AutoGetCollectionForRead autoColl(_opCtx, nss);
+ auto cursor = autoColl.getCollection()->getCursor(_opCtx);
+
+ int ret = 0;
+ while (auto rec = cursor->next()) {
+ ++ret;
+ }
+
+ return ret;
+ }
+
+ // Writes five documents to `dataCollectionNss` that are replicated with a `destinedRecipient`
+ // followed by the final no-op oplog entry that signals the last oplog entry needed to be
+ // applied for resharding to move to the next stage.
+ void setupBasic(NamespaceString outputCollectionNss,
+ NamespaceString dataCollectionNss,
+ ShardId destinedRecipient) {
+ reset(outputCollectionNss);
+ reset(dataCollectionNss);
+
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+
+ // Set a failpoint to tack a `destinedRecipient` onto oplog entries.
+ setGlobalFailPoint("addDestinedRecipient",
+ BSON("mode"
+ << "alwaysOn"
+ << "data"
+ << BSON("destinedRecipient" << destinedRecipient.toString())));
+
+ // Insert five documents. Advance the majority point.
+ const std::int32_t docsToInsert = 5;
+ {
+ for (std::int32_t num = 0; num < docsToInsert; ++num) {
+ WriteUnitOfWork wuow(_opCtx);
+ insertDocument(dataColl.getCollection(),
+ InsertStatement(BSON("_id" << num << "a" << num)));
+ wuow.commit();
+ }
+ }
+
+ // Write an entry saying that fetching is complete.
+ {
+ WriteUnitOfWork wuow(_opCtx);
+ _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage(
+ _opCtx,
+ dataColl.getCollection()->ns(),
+ dataColl.getCollection()->uuid(),
+ BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.",
+ dataColl.getCollection()->ns().toString())),
+ BSON("type"
+ << "reshardFinalOp"
+ << "reshardingUUID" << _reshardingUUID),
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
+ wuow.commit();
+ }
+
+ repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
+ _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot(
+ getLastApplied().getTimestamp());
+
+ // Disable the failpoint.
+ setGlobalFailPoint("addDestinedRecipient",
+ BSON("mode"
+ << "off"));
+ }
+};
+
+class RunFetchIteration : public ReshardingTest {
+public:
+ void run() {
+ const NamespaceString outputCollectionNss("dbtests.outputCollection");
+ reset(outputCollectionNss);
+ const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
+ reset(dataCollectionNss);
+
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+
+ // Set a failpoint to tack a `destinedRecipient` onto oplog entries.
+ setGlobalFailPoint("addDestinedRecipient",
+ BSON("mode"
+ << "alwaysOn"
+ << "data"
+ << BSON("destinedRecipient"
+ << "shard1")));
+
+ // Insert five documents. Advance the majority point. Insert five more documents.
+ const std::int32_t docsToInsert = 5;
+ {
+ for (std::int32_t num = 0; num < docsToInsert; ++num) {
+ WriteUnitOfWork wuow(_opCtx);
+ insertDocument(dataColl.getCollection(),
+ InsertStatement(BSON("_id" << num << "a" << num)));
+ wuow.commit();
+ }
+ }
+
+ repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
+ const Timestamp firstFiveLastApplied = getLastApplied().getTimestamp();
+ _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot(
+ firstFiveLastApplied);
+ {
+ for (std::int32_t num = docsToInsert; num < 2 * docsToInsert; ++num) {
+ WriteUnitOfWork wuow(_opCtx);
+ insertDocument(dataColl.getCollection(),
+ InsertStatement(BSON("_id" << num << "a" << num)));
+ wuow.commit();
+ }
+ }
+
+ // Disable the failpoint.
+ setGlobalFailPoint("addDestinedRecipient",
+ BSON("mode"
+ << "off"));
+
+ repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
+ const Timestamp latestLastApplied = getLastApplied().getTimestamp();
+
+ // The first call to `iterate` should return the first five inserts and return a
+ // `ReshardingDonorOplogId` matching the last applied of those five inserts.
+ ReshardingOplogFetcher fetcher(_reshardingUUID,
+ dataColl->uuid(),
+ {_fetchTimestamp, _fetchTimestamp},
+ ShardId("fakeDonorShard"),
+ ShardId("shard1"),
+ true,
+ outputCollectionNss);
+ DBDirectClient client(_opCtx);
+ boost::optional<ReshardingDonorOplogId> donorOplogId =
+ fetcher.iterate(_opCtx,
+ &client,
+ createExpressionContext(),
+ {_fetchTimestamp, _fetchTimestamp},
+ dataColl->uuid(),
+ {"shard1"},
+ true,
+ outputCollectionNss);
+
+ ASSERT(donorOplogId != boost::none);
+ ASSERT_EQ(docsToInsert, itcount(outputCollectionNss));
+ ASSERT_EQ(firstFiveLastApplied, donorOplogId->getClusterTime());
+ ASSERT_EQ(firstFiveLastApplied, donorOplogId->getTs());
+
+ // Advance the committed snapshot. A second `iterate` should return the second batch of five
+ // inserts.
+ _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot(
+ getLastApplied().getTimestamp());
+
+ donorOplogId = fetcher.iterate(_opCtx,
+ &client,
+ createExpressionContext(),
+ {firstFiveLastApplied, firstFiveLastApplied},
+ dataColl->uuid(),
+ {"shard1"},
+ true,
+ outputCollectionNss);
+
+ ASSERT(donorOplogId != boost::none);
+ // Two batches of five inserts entry for the create collection oplog entry.
+ ASSERT_EQ((2 * docsToInsert), itcount(outputCollectionNss));
+ ASSERT_EQ(latestLastApplied, donorOplogId->getClusterTime());
+ ASSERT_EQ(latestLastApplied, donorOplogId->getTs());
+ }
+};
+
+class RunConsume : public ReshardingTest {
+public:
+ void run() {
+ const NamespaceString outputCollectionNss("dbtests.outputCollection");
+ const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
+
+ const ShardId destinationShard("shard1");
+ setupBasic(outputCollectionNss, dataCollectionNss, destinationShard);
+
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IS);
+ ReshardingOplogFetcher fetcher(_reshardingUUID,
+ dataColl->uuid(),
+ {_fetchTimestamp, _fetchTimestamp},
+ ShardId("fakeDonorShard"),
+ destinationShard,
+ true,
+ outputCollectionNss);
+ DBDirectClient client(_opCtx);
+ fetcher.consume(&client);
+
+ // Six oplog entries should be copied. Five inserts and the final no-op oplog entry.
+ ASSERT_EQ(6, fetcher.getNumOplogEntriesCopied());
+ }
+};
+
+class InterruptConsume : public ReshardingTest {
+public:
+ void run() {
+ const NamespaceString outputCollectionNss("dbtests.outputCollection");
+ const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
+
+ const ShardId destinationShard("shard1");
+ setupBasic(outputCollectionNss, dataCollectionNss, destinationShard);
+
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IS);
+ ReshardingOplogFetcher fetcher(_reshardingUUID,
+ dataColl->uuid(),
+ {_fetchTimestamp, _fetchTimestamp},
+ ShardId("fakeDonorShard"),
+ destinationShard,
+ true,
+ outputCollectionNss);
+
+ // Interrupt the fetcher. A fetcher object owns its own client, but interruption does not
+ // require the background job to be started.
+ fetcher.setKilled();
+
+ DBDirectClient client(_opCtx);
+ ASSERT_THROWS(fetcher.consume(&client), ExceptionForCat<ErrorCategory::Interruption>);
+ }
+};
+
+class AllReshardingTests : public unittest::OldStyleSuiteSpecification {
+public:
+ AllReshardingTests() : unittest::OldStyleSuiteSpecification("ReshardingTests") {}
+
+ // Must be evaluated at test run() time, not static-init time.
+ static bool shouldSkip() {
+ // Only run on storage engines that support snapshot reads.
+ auto storageEngine = cc().getServiceContext()->getStorageEngine();
+ if (!storageEngine->supportsReadConcernSnapshot() ||
+ !mongo::serverGlobalParams.enableMajorityReadConcern) {
+ LOGV2(5123009,
+ "Skipping this test because the configuration does not support majority reads.",
+ "storageEngine"_attr = storageGlobalParams.engine,
+ "enableMajorityReadConcern"_attr =
+ mongo::serverGlobalParams.enableMajorityReadConcern);
+ return true;
+ }
+ return false;
+ }
+
+ template <typename T>
+ void addIf() {
+ addNameCallback(nameForTestClass<T>(), [] {
+ if (!shouldSkip())
+ T().run();
+ });
+ }
+
+ void setupTests() {
+ addIf<RunFetchIteration>();
+ addIf<RunConsume>();
+ addIf<InterruptConsume>();
+ }
+};
+
+unittest::OldStyleSuiteInitializer<AllReshardingTests> allReshardingTests;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h
index 82e45ef5e55..79f1a2b669c 100644
--- a/src/mongo/s/client/shard.h
+++ b/src/mongo/s/client/shard.h
@@ -36,7 +36,6 @@
#include "mongo/client/read_preference.h"
#include "mongo/db/logical_time.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/executor/remote_command_response.h"
@@ -208,18 +207,6 @@ public:
Milliseconds maxTimeMSOverride);
/**
- * Synchronously run the aggregation request, with a best effort honoring of request
- * options. `callback` will be called with the batch contained in each response. `callback`
- * should return `true` to execute another getmore. Returning `false` will send a
- * `killCursors`. If the aggregation results are exhausted, there will be no additional calls to
- * `callback`.
- */
- virtual Status runAggregation(
- OperationContext* opCtx,
- const AggregationRequest& aggRequest,
- std::function<bool(const std::vector<BSONObj>& batch)> callback) = 0;
-
- /**
* Runs a write command against a shard. This is separate from runCommand, because write
* commands return errors in a different format than regular commands do, so checking for
* retriable errors must be done differently.
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 30ceec26868..5254c24d37d 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -281,7 +281,7 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand(
const auto& data = dataStatus.getValue();
if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) {
- // Sharding users of ReplSetMetadata do not require the wall clock time field to be set.
+ // Sharding users of ReplSetMetadata do not require the wall clock time field to be set
auto replParseStatus =
rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata);
if (!replParseStatus.isOK()) {
@@ -411,91 +411,6 @@ void ShardRemote::runFireAndForgetCommand(OperationContext* opCtx,
.ignore();
}
-Status ShardRemote::runAggregation(
- OperationContext* opCtx,
- const AggregationRequest& aggRequest,
- std::function<bool(const std::vector<BSONObj>& batch)> callback) {
-
- BSONObj readPrefMetadata;
-
- ReadPreferenceSetting readPreference =
- uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(
- aggRequest.getUnwrappedReadPref(), ReadPreference::SecondaryPreferred));
-
- auto swHost = _targeter->findHost(opCtx, readPreference);
- if (!swHost.isOK()) {
- return swHost.getStatus();
- }
- HostAndPort host = swHost.getValue();
-
- BSONObjBuilder builder;
- readPreference.toContainingBSON(&builder);
- readPrefMetadata = builder.obj();
-
- Status status =
- Status(ErrorCodes::InternalError, "Internal error running cursor callback in command");
- auto fetcherCallback = [&status, callback](const Fetcher::QueryResponseStatus& dataStatus,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob) {
- // Throw out any accumulated results on error
- if (!dataStatus.isOK()) {
- status = dataStatus.getStatus();
- return;
- }
-
- const auto& data = dataStatus.getValue();
-
- if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) {
- // Sharding users of ReplSetMetadata do not require the wall clock time field to be set.
- auto replParseStatus =
- rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata);
- if (!replParseStatus.isOK()) {
- status = replParseStatus.getStatus();
- return;
- }
- }
-
- if (!callback(data.documents)) {
- *nextAction = Fetcher::NextAction::kNoAction;
- }
-
- status = Status::OK();
-
- if (!getMoreBob) {
- return;
- }
- getMoreBob->append("getMore", data.cursorId);
- getMoreBob->append("collection", data.nss.coll());
- };
-
- Milliseconds requestTimeout(-1);
- if (aggRequest.getMaxTimeMS()) {
- requestTimeout = Milliseconds(aggRequest.getMaxTimeMS());
- }
-
- auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
- Fetcher fetcher(executor.get(),
- host,
- aggRequest.getNamespaceString().db().toString(),
- aggRequest.serializeToCommandObj().toBson(),
- fetcherCallback,
- readPrefMetadata,
- requestTimeout, /* command network timeout */
- requestTimeout /* getMore network timeout */);
-
- Status scheduleStatus = fetcher.schedule();
- if (!scheduleStatus.isOK()) {
- return scheduleStatus;
- }
-
- fetcher.join();
-
- updateReplSetMonitor(host, status);
-
- return status;
-}
-
-
StatusWith<ShardRemote::AsyncCmdHandle> ShardRemote::_scheduleCommand(
OperationContext* opCtx,
const ReadPreferenceSetting& readPref,
diff --git a/src/mongo/s/client/shard_remote.h b/src/mongo/s/client/shard_remote.h
index e7ca9004112..eb0c547c97f 100644
--- a/src/mongo/s/client/shard_remote.h
+++ b/src/mongo/s/client/shard_remote.h
@@ -85,10 +85,6 @@ public:
const std::string& dbName,
const BSONObj& cmdObj) final;
- Status runAggregation(OperationContext* opCtx,
- const AggregationRequest& aggRequest,
- std::function<bool(const std::vector<BSONObj>& batch)> callback);
-
private:
struct AsyncCmdHandle {
HostAndPort hostTargetted;
diff --git a/src/mongo/unittest/unittest.h b/src/mongo/unittest/unittest.h
index 76563290fff..2ef1d7cd77e 100644
--- a/src/mongo/unittest/unittest.h
+++ b/src/mongo/unittest/unittest.h
@@ -49,7 +49,6 @@
#include "mongo/base/status_with.h"
#include "mongo/base/string_data.h"
-#include "mongo/logv2/log_debug.h"
#include "mongo/logv2/log_detail.h"
#include "mongo/unittest/bson_test_util.h"
#include "mongo/unittest/unittest_helpers.h"