From 7b92716c97e05c09f624513b03bd3eb7663aa537 Mon Sep 17 00:00:00 2001 From: Uladzimir Makouski Date: Wed, 25 Nov 2020 07:45:36 +0000 Subject: Revert "SERVER-51245: Have resharding oplog fetching use a Fetcher." This reverts commit cda3a52701fe4143b06bd981b98514e69d0a93eb. --- .../db/auth/authz_manager_external_state_mock.cpp | 6 +- src/mongo/db/s/SConscript | 3 - .../db/s/resharding/resharding_oplog_fetcher.cpp | 231 +++++----- .../db/s/resharding/resharding_oplog_fetcher.h | 81 ++-- .../s/resharding/resharding_oplog_fetcher_test.cpp | 417 ----------------- src/mongo/db/s/shard_local.cpp | 6 - src/mongo/db/s/shard_local.h | 4 - src/mongo/dbtests/SConscript | 1 + src/mongo/dbtests/resharding_tests.cpp | 507 +++++++++++++++++++++ src/mongo/s/client/shard.h | 13 - src/mongo/s/client/shard_remote.cpp | 87 +--- src/mongo/s/client/shard_remote.h | 4 - src/mongo/unittest/unittest.h | 1 - 13 files changed, 663 insertions(+), 698 deletions(-) delete mode 100644 src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp create mode 100644 src/mongo/dbtests/resharding_tests.cpp diff --git a/src/mongo/db/auth/authz_manager_external_state_mock.cpp b/src/mongo/db/auth/authz_manager_external_state_mock.cpp index aae6de7764e..00d479756a6 100644 --- a/src/mongo/db/auth/authz_manager_external_state_mock.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_mock.cpp @@ -112,11 +112,7 @@ void AuthzManagerExternalStateMock::setAuthzVersion(int version) { std::unique_ptr AuthzManagerExternalStateMock::makeAuthzSessionExternalState(AuthorizationManager* authzManager) { - auto ret = std::make_unique(authzManager); - // Construct a `AuthzSessionExternalStateMock` structure that represents the default no-auth - // state of a running mongod. - ret->setReturnValueForShouldIgnoreAuthChecks(true); - return ret; + return std::make_unique(authzManager); } Status AuthzManagerExternalStateMock::findOne(OperationContext* opCtx, diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 010435d8cec..373be39e030 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -452,7 +452,6 @@ env.CppUnitTest( 'resharding/resharding_donor_oplog_iterator_test.cpp', 'resharding/resharding_donor_recipient_common_test.cpp', 'resharding/resharding_oplog_applier_test.cpp', - 'resharding/resharding_oplog_fetcher_test.cpp', 'resharding/resharding_recipient_service_test.cpp', 'session_catalog_migration_destination_test.cpp', 'session_catalog_migration_source_test.cpp', @@ -475,9 +474,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/pipeline/document_source_mock', - '$BUILD_DIR/mongo/db/pipeline/expression_context', '$BUILD_DIR/mongo/db/query/query_request', - '$BUILD_DIR/mongo/db/query/query_test_service_context', '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', '$BUILD_DIR/mongo/db/repl/oplog_interface_local', '$BUILD_DIR/mongo/db/repl/replmocks', diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index e0bd954b8c1..2dce0fee77b 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -87,71 +87,88 @@ ReshardingOplogFetcher::ReshardingOplogFetcher(UUID reshardingUUID, _donorShard(donorShard), _recipientShard(recipientShard), _doesDonorOwnMinKeyChunk(doesDonorOwnMinKeyChunk), - _toWriteInto(toWriteInto) {} + _toWriteInto(toWriteInto), + _client(getGlobalServiceContext()->makeClient( + fmt::format("OplogFetcher-{}-{}", reshardingUUID.toString(), donorShard.toString()))) {} -Future ReshardingOplogFetcher::schedule(executor::TaskExecutor* executor) { - auto pf = makePromiseFuture(); - _fetchedFinishPromise = std::move(pf.promise); - - _reschedule(executor); +void ReshardingOplogFetcher::consume(DBClientBase* conn) { + while (true) { + auto opCtxRaii = _client->makeOperationContext(); + opCtxRaii->checkForInterrupt(); + auto expCtx = _makeExpressionContext(opCtxRaii.get()); + boost::optional restartAt = iterate(opCtxRaii.get(), + conn, + expCtx, + _startAt, + _collUUID, + _recipientShard, + _doesDonorOwnMinKeyChunk, + _toWriteInto); + if (!restartAt) { + return; + } + _startAt = restartAt.get(); + } +} - return std::move(pf.future); +void ReshardingOplogFetcher::setKilled() { + _isAlive.store(false); + _client->setKilled(); } -void ReshardingOplogFetcher::_reschedule(executor::TaskExecutor* executor) { +void ReshardingOplogFetcher::schedule(executor::TaskExecutor* executor) { executor->schedule([this, executor](Status status) { - ThreadClient client( - fmt::format("OplogFetcher-{}-{}", _reshardingUUID.toString(), _donorShard.toString()), - getGlobalServiceContext()); if (!status.isOK()) { - LOGV2_INFO(5192101, "Resharding oplog fetcher aborting.", "reason"_attr = status); - _fetchedFinishPromise.setError(status); return; } - try { - if (iterate(client.get())) { - _reschedule(executor); - } else { - _fetchedFinishPromise.emplaceValue(); - } - } catch (...) { - LOGV2_INFO(5192102, "Error.", "reason"_attr = exceptionToStatus()); - _fetchedFinishPromise.setError(exceptionToStatus()); + if (_runTask()) { + schedule(executor); } }); } -bool ReshardingOplogFetcher::iterate(Client* client) { - std::shared_ptr targetShard; - { - auto opCtxRaii = client->makeOperationContext(); - opCtxRaii->checkForInterrupt(); +bool ReshardingOplogFetcher::_runTask() { + auto opCtxRaii = _client->makeOperationContext(); + opCtxRaii->checkForInterrupt(); + + const Seconds maxStaleness(10); + ReadPreferenceSetting readPref(ReadPreference::Nearest, maxStaleness); + StatusWith> swDonor = + Grid::get(opCtxRaii.get())->shardRegistry()->getShard(opCtxRaii.get(), _donorShard); + if (!swDonor.isOK()) { + LOGV2_WARNING(5127203, + "Error finding shard in registry, retrying.", + "error"_attr = swDonor.getStatus()); + return true; + } - const Seconds maxStaleness(10); - ReadPreferenceSetting readPref(ReadPreference::Nearest, maxStaleness); - StatusWith> swDonor = - Grid::get(opCtxRaii.get())->shardRegistry()->getShard(opCtxRaii.get(), _donorShard); - if (!swDonor.isOK()) { - LOGV2_WARNING(5127203, - "Error finding shard in registry, retrying.", - "error"_attr = swDonor.getStatus()); - return true; - } - targetShard = swDonor.getValue(); + StatusWith swTargettedDonor = + swDonor.getValue()->getTargeter()->findHost(opCtxRaii.get(), readPref); + if (!swTargettedDonor.isOK()) { + LOGV2_WARNING(5127202, + "Error targetting donor, retrying.", + "error"_attr = swTargettedDonor.getStatus()); + return true; } + DBClientConnection donorConn; + if (auto status = donorConn.connect(swTargettedDonor.getValue(), "ReshardingOplogFetching"_sd); + !status.isOK()) { + LOGV2_WARNING(5127201, "Failed connecting to donor, retrying.", "error"_attr = status); + return true; + } + // Reset the OpCtx so consuming can manage short-lived OpCtx lifetimes with the current client. + opCtxRaii.reset(); + try { // Consume will throw if there's oplog entries to be copied. It only returns cleanly when // the final oplog has been seen and copied. - consume(client, targetShard.get()); + consume(&donorConn); return false; } catch (const ExceptionForCat&) { + _isAlive.store(false); return false; - } catch (const ExceptionFor&) { - LOGV2_ERROR( - 5192103, "Fatal resharding error while fetching.", "error"_attr = exceptionToStatus()); - throw; } catch (const DBException&) { LOGV2_WARNING( 5127200, "Error while fetching, retrying.", "error"_attr = exceptionToStatus()); @@ -159,99 +176,73 @@ bool ReshardingOplogFetcher::iterate(Client* client) { } } -void ReshardingOplogFetcher::_ensureCollection(Client* client, const NamespaceString nss) { - auto opCtxRaii = client->makeOperationContext(); - auto opCtx = opCtxRaii.get(); +boost::optional ReshardingOplogFetcher::iterate( + OperationContext* opCtx, + DBClientBase* conn, + boost::intrusive_ptr expCtx, + const ReshardingDonorOplogId startAfter, + const UUID collUUID, + const ShardId& recipientShard, + const bool doesDonorOwnMinKeyChunk, + const NamespaceString toWriteToNss) { + // This method will use the input opCtx to perform writes into `toWriteToNss`. invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Create the destination collection if necessary. - writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", nss.toString(), [&] { - const CollectionPtr coll = - CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss); - if (coll) { + writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", toWriteToNss.toString(), [&] { + const CollectionPtr toWriteTo = + CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, toWriteToNss); + if (toWriteTo) { return; } WriteUnitOfWork wuow(opCtx); - AutoGetOrCreateDb db(opCtx, nss.db(), LockMode::MODE_IX); - Lock::CollectionLock collLock(opCtx, nss, MODE_IX); - db.getDb()->createCollection(opCtx, nss); + AutoGetOrCreateDb db(opCtx, toWriteToNss.db(), LockMode::MODE_IX); + Lock::CollectionLock collLock(opCtx, toWriteToNss, MODE_IX); + db.getDb()->createCollection(opCtx, toWriteToNss); wuow.commit(); }); -} -std::vector ReshardingOplogFetcher::_makePipeline(Client* client) { - auto opCtxRaii = client->makeOperationContext(); - auto opCtx = opCtxRaii.get(); - auto expCtx = _makeExpressionContext(opCtx); - - return createOplogFetchingPipelineForResharding( - expCtx, _startAt, _collUUID, _recipientShard, _doesDonorOwnMinKeyChunk) - ->serializeToBson(); -} - -void ReshardingOplogFetcher::consume(Client* client, Shard* shard) { - _ensureCollection(client, _toWriteInto); - std::vector serializedPipeline = _makePipeline(client); + std::vector serializedPipeline = + createOplogFetchingPipelineForResharding( + expCtx, startAfter, collUUID, recipientShard, doesDonorOwnMinKeyChunk) + ->serializeToBson(); AggregationRequest aggRequest(NamespaceString::kRsOplogNamespace, serializedPipeline); - if (_useReadConcern) { - auto readConcernArgs = repl::ReadConcernArgs( - boost::optional(_startAt.getTs()), - boost::optional(repl::ReadConcernLevel::kMajorityReadConcern)); - aggRequest.setReadConcern(readConcernArgs.toBSONInner()); - } - + auto readConcernArgs = repl::ReadConcernArgs( + boost::optional(startAfter.getTs()), + boost::optional(repl::ReadConcernLevel::kMajorityReadConcern)); + aggRequest.setReadConcern(readConcernArgs.toBSONInner()); aggRequest.setHint(BSON("$natural" << 1)); - aggRequest.setRequestReshardingResumeToken(true); - - if (_initialBatchSize) { - aggRequest.setBatchSize(_initialBatchSize); - } - auto opCtxRaii = client->makeOperationContext(); - int batchesProcessed = 0; - auto svcCtx = client->getServiceContext(); - uassertStatusOK(shard->runAggregation( - opCtxRaii.get(), - aggRequest, - [this, svcCtx, &batchesProcessed](const std::vector& batch) { - ThreadClient client(fmt::format("ReshardingFetcher-{}-{}", - _reshardingUUID.toString(), - _donorShard.toString()), - svcCtx, - nullptr); - auto opCtxRaii = cc().makeOperationContext(); - auto opCtx = opCtxRaii.get(); - - // Noting some possible optimizations: - // - // * Batch more inserts into larger storage transactions. - // * Parallize writing documents across multiple threads. - // * Doing either of the above while still using the underlying message buffer of bson - // objects. - AutoGetCollection toWriteTo(opCtx, _toWriteInto, LockMode::MODE_IX); - for (const BSONObj& doc : batch) { - WriteUnitOfWork wuow(opCtx); - auto nextOplog = uassertStatusOK(repl::OplogEntry::parse(doc)); - - _startAt = ReshardingDonorOplogId::parse( - {"OplogFetcherParsing"}, nextOplog.get_id()->getDocument().toBson()); - uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{doc}, nullptr)); - wuow.commit(); - ++_numOplogEntriesCopied; + const bool secondaryOk = true; + const bool useExhaust = true; + std::unique_ptr cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest( + conn, std::move(aggRequest), secondaryOk, useExhaust)); + + // Noting some possible optimizations: + // + // * Batch more inserts into larger storage transactions. + // * Parallize writing documents across multiple threads. + // * Doing either of the above while still using the underlying message buffer of bson objects. + AutoGetCollection toWriteTo(opCtx, toWriteToNss, LockMode::MODE_IX); + ReshardingDonorOplogId lastSeen = startAfter; + while (cursor->more() && _isAlive.load()) { + WriteUnitOfWork wuow(opCtx); + BSONObj obj = cursor->next(); + auto nextOplog = uassertStatusOK(repl::OplogEntry::parse(obj)); - if (isFinalOplog(nextOplog, _reshardingUUID)) { - return false; - } - } + lastSeen = ReshardingDonorOplogId::parse({"OplogFetcherParsing"}, + nextOplog.get_id()->getDocument().toBson()); + uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{obj}, nullptr)); + wuow.commit(); + ++_numOplogEntriesCopied; - if (_maxBatches > -1 && ++batchesProcessed >= _maxBatches) { - return false; - } + if (isFinalOplog(nextOplog, _reshardingUUID)) { + return boost::none; + } + } - return true; - })); + return lastSeen; } - } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h index a5c06225b07..aeb198dfabf 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h @@ -37,7 +37,6 @@ #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/s/resharding/donor_oplog_id_gen.h" #include "mongo/db/service_context.h" -#include "mongo/s/client/shard.h" #include "mongo/s/shard_id.h" #include "mongo/util/background.h" #include "mongo/util/uuid.h" @@ -60,50 +59,61 @@ public: * - Send an aggregation request + getMores until either: * -- The "final resharding" oplog entry is found. * -- An interruption occurs. - * -- The fetcher concludes it's fallen off the oplog. * -- A different error occurs. * - * In the first two circumstances, the task will terminate. If the fetcher has fallen off the - * oplog, this is thrown as a fatal resharding exception. In the last circumstance, the task - * will be rescheduled in a way that resumes where it had left off from. + * In the first two circumstances, the task will return. In the last circumstance, the task will + * be rescheduled in a way that resumes where it had left off from. */ - Future schedule(executor::TaskExecutor* executor); + void schedule(executor::TaskExecutor* exector); /** - * Given a shard, fetches and copies oplog entries until reaching an error, or coming + * Given a connection, fetches and copies oplog entries until reaching an error, or coming * across a sentinel finish oplog entry. Throws if there's more oplog entries to be copied. */ - void consume(Client* client, Shard* shard); + void consume(DBClientBase* conn); - bool iterate(Client* client); + /** + * Kill the underlying client the BackgroundJob is using to expedite cleaning up resources when + * the output is no longer necessary. The underlying `toWriteInto` collection is left intact, + * though likely incomplete. + */ + void setKilled(); - int getNumOplogEntriesCopied() const { + /** + * Returns boost::none if the last oplog entry to be copied is found. Otherwise returns the + * ReshardingDonorOplogId to resume querying from. + * + * Issues an aggregation to `DBClientBase`s starting at `startAfter` and copies the entries + * relevant to `recipientShard` into `toWriteInto`. Control is returned when the aggregation + * cursor is exhausted. + * + * Returns an identifier for the last oplog-ish document written to `toWriteInto`. + * + * This method throws. + * + * TODO SERVER-51245 Replace `DBClientBase` with a `Shard`. Right now `Shard` does not do things + * like perform aggregate commands nor does it expose a cursor/stream interface. However, using + * a `Shard` object will provide critical behavior such as advancing logical clock values on a + * response and targetting a node to send the aggregation command to. + */ + boost::optional iterate(OperationContext* opCtx, + DBClientBase* conn, + boost::intrusive_ptr expCtx, + ReshardingDonorOplogId startAfter, + UUID collUUID, + const ShardId& recipientShard, + bool doesDonorOwnMinKeyChunk, + NamespaceString toWriteInto); + + int getNumOplogEntriesCopied() { return _numOplogEntriesCopied; } - ReshardingDonorOplogId getLastSeenTimestamp() const { - return _startAt; - } - - void setInitialBatchSizeForTest(int size) { - _initialBatchSize = size; - } - - void useReadConcernForTest(bool use) { - _useReadConcern = use; - } - - void setMaxBatchesForTest(int maxBatches) { - _maxBatches = maxBatches; - } - private: /** * Returns true if there's more work to do and the task should be rescheduled. */ - void _ensureCollection(Client* client, const NamespaceString nss); - std::vector _makePipeline(Client* client); - void _reschedule(executor::TaskExecutor* executor); + bool _runTask(); const UUID _reshardingUUID; const UUID _collUUID; @@ -113,16 +123,9 @@ private: const bool _doesDonorOwnMinKeyChunk; const NamespaceString _toWriteInto; - Promise _fetchedFinishPromise; - int _numOplogEntriesCopied = 0; - - // For testing to control behavior. + ServiceContext::UniqueClient _client; + AtomicWord _isAlive{true}; - // The aggregation batch size. This only affects the original call and not `getmore`s. - int _initialBatchSize = 0; - // Setting to false will omit the `afterClusterTime` and `majority` read concern. - bool _useReadConcern = true; - // Dictates how many batches get processed before returning control from a call to `consume`. - int _maxBatches = -1; + int _numOplogEntriesCopied = 0; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp deleted file mode 100644 index f91a8a5996c..00000000000 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ /dev/null @@ -1,417 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - -#include "mongo/platform/basic.h" - -#include - -#include "mongo/bson/bsonobj.h" -#include "mongo/db/client.h" -#include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/logical_session_cache_noop.h" -#include "mongo/db/op_observer_impl.h" -#include "mongo/db/pipeline/document_source_mock.h" -#include "mongo/db/repl/storage_interface_impl.h" -#include "mongo/db/repl/wait_for_majority_service.h" -#include "mongo/db/s/resharding/resharding_oplog_fetcher.h" -#include "mongo/db/s/resharding_util.h" -#include "mongo/db/s/shard_server_test_fixture.h" -#include "mongo/db/service_context.h" -#include "mongo/db/session_catalog_mongod.h" -#include "mongo/db/storage/write_unit_of_work.h" -#include "mongo/s/catalog/sharding_catalog_client_mock.h" -#include "mongo/s/catalog/type_shard.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace { - -using namespace unittest; - -/** - * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs. - */ -class OneOffRead { -public: - OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) { - _opCtx->recoveryUnit()->abandonSnapshot(); - if (ts.isNull()) { - _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); - } else { - _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts); - } - } - - ~OneOffRead() { - _opCtx->recoveryUnit()->abandonSnapshot(); - _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); - } - -private: - OperationContext* _opCtx; -}; - -class ReshardingOplogFetcherTest : public ShardServerTestFixture { -public: - OperationContext* _opCtx; - ServiceContext* _svcCtx; - UUID _reshardingUUID = UUID::gen(); - Timestamp _fetchTimestamp; - ShardId _donorShard; - ShardId _destinationShard; - - void setUp() { - ShardServerTestFixture::setUp(); - _opCtx = operationContext(); - _svcCtx = _opCtx->getServiceContext(); - - for (const auto& shardId : kTwoShardIdList) { - auto shardTargeter = RemoteCommandTargeterMock::get( - uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId)) - ->getTargeter()); - shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId)); - } - - WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); - - // onStepUp() relies on the storage interface to create the config.transactions table. - repl::StorageInterface::set(getServiceContext(), - std::make_unique()); - MongoDSessionCatalog::onStepUp(operationContext()); - LogicalSessionCache::set(getServiceContext(), std::make_unique()); - _fetchTimestamp = queryOplog(BSONObj())["ts"].timestamp(); - - _donorShard = kTwoShardIdList[0]; - _destinationShard = kTwoShardIdList[1]; - } - - void tearDown() { - WaitForMajorityService::get(getServiceContext()).shutDown(); - ShardServerTestFixture::tearDown(); - } - - /** - * Override the CatalogClient to make CatalogClient::getAllShards automatically return the - * expected shards. We cannot mock the network responses for the ShardRegistry reload, since the - * ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no - * DBClientMock analogous to the NetworkInterfaceMock. - */ - std::unique_ptr makeShardingCatalogClient( - std::unique_ptr distLockManager) { - - class StaticCatalogClient final : public ShardingCatalogClientMock { - public: - StaticCatalogClient(std::vector shardIds) - : ShardingCatalogClientMock(nullptr), _shardIds(std::move(shardIds)) {} - - StatusWith>> getAllShards( - OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { - std::vector shardTypes; - for (const auto& shardId : _shardIds) { - const ConnectionString cs = ConnectionString::forReplicaSet( - shardId.toString(), {makeHostAndPort(shardId)}); - ShardType sType; - sType.setName(cs.getSetName()); - sType.setHost(cs.toString()); - shardTypes.push_back(std::move(sType)); - }; - return repl::OpTimeWith>(shardTypes); - } - - private: - const std::vector _shardIds; - }; - - return std::make_unique(kTwoShardIdList); - } - - void insertDocument(const CollectionPtr& coll, const InsertStatement& stmt) { - // Insert some documents. - OpDebug* const nullOpDebug = nullptr; - const bool fromMigrate = false; - ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, fromMigrate)); - } - - BSONObj queryCollection(NamespaceString nss, const BSONObj& query) { - BSONObj ret; - ASSERT_TRUE(Helpers::findOne( - _opCtx, AutoGetCollectionForRead(_opCtx, nss).getCollection(), query, ret)) - << "Query: " << query; - return ret; - } - - BSONObj queryOplog(const BSONObj& query) { - OneOffRead oor(_opCtx, Timestamp::min()); - return queryCollection(NamespaceString::kRsOplogNamespace, query); - } - - repl::OpTime getLastApplied() { - return repl::ReplicationCoordinator::get(_opCtx)->getMyLastAppliedOpTime(); - } - - boost::intrusive_ptr createExpressionContext() { - NamespaceString slimNss = - NamespaceString("local.system.resharding.slimOplogForGraphLookup"); - - boost::intrusive_ptr expCtx( - new ExpressionContextForTest(_opCtx, NamespaceString::kRsOplogNamespace)); - expCtx->setResolvedNamespace(NamespaceString::kRsOplogNamespace, - {NamespaceString::kRsOplogNamespace, {}}); - expCtx->setResolvedNamespace(slimNss, - {slimNss, std::vector{getSlimOplogPipeline()}}); - return expCtx; - } - - int itcount(NamespaceString nss) { - OneOffRead oof(_opCtx, Timestamp::min()); - AutoGetCollectionForRead autoColl(_opCtx, nss); - auto cursor = autoColl.getCollection()->getCursor(_opCtx); - - int ret = 0; - while (auto rec = cursor->next()) { - ++ret; - } - - return ret; - } - - void create(NamespaceString nss) { - writeConflictRetry(_opCtx, "create", nss.ns(), [&] { - AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X); - WriteUnitOfWork wunit(_opCtx); - if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { - ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1))); - } - invariant(dbRaii.getDb()->createCollection(_opCtx, nss)); - wunit.commit(); - }); - } - - template - T requestPassthroughHandler(executor::NetworkTestEnv::FutureHandle& future, - int maxBatches = -1) { - - int maxNumRequests = 1000; // No unittests would request more than this? - if (maxBatches > -1) { - // The fetcher will send a `killCursors` after the last `getMore`. - maxNumRequests = maxBatches + 1; - } - - bool hasMore = true; - for (int batchNum = 0; hasMore && batchNum < maxNumRequests; ++batchNum) { - onCommand([&](const executor::RemoteCommandRequest& request) -> StatusWith { - DBDirectClient client(cc().getOperationContext()); - BSONObj result; - bool res = client.runCommand(request.dbname, request.cmdObj, result); - if (res == false || result.hasField("cursorsKilled") || - result["cursor"]["id"].Long() == 0) { - hasMore = false; - } - - return result; - }); - } - - return future.timed_get(Seconds(5)); - } - - // Writes five documents to `dataCollectionNss` that are replicated with a `destinedRecipient` - // followed by the final no-op oplog entry that signals the last oplog entry needed to be - // applied for resharding to move to the next stage. - void setupBasic(NamespaceString outputCollectionNss, - NamespaceString dataCollectionNss, - ShardId destinedRecipient) { - create(outputCollectionNss); - create(dataCollectionNss); - - _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); - - AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); - - // Set a failpoint to tack a `destinedRecipient` onto oplog entries. - setGlobalFailPoint("addDestinedRecipient", - BSON("mode" - << "alwaysOn" - << "data" - << BSON("destinedRecipient" << destinedRecipient.toString()))); - - // Insert five documents. Advance the majority point. - const std::int32_t docsToInsert = 5; - { - for (std::int32_t num = 0; num < docsToInsert; ++num) { - WriteUnitOfWork wuow(_opCtx); - insertDocument(dataColl.getCollection(), - InsertStatement(BSON("_id" << num << "a" << num))); - wuow.commit(); - } - } - - // Write an entry saying that fetching is complete. - { - WriteUnitOfWork wuow(_opCtx); - _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage( - _opCtx, - dataColl.getCollection()->ns(), - dataColl.getCollection()->uuid(), - BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.", - dataColl.getCollection()->ns().toString())), - BSON("type" - << "reshardFinalOp" - << "reshardingUUID" << _reshardingUUID), - boost::none, - boost::none, - boost::none, - boost::none); - wuow.commit(); - } - - repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); - - // Disable the failpoint. - setGlobalFailPoint("addDestinedRecipient", - BSON("mode" - << "off")); - } - -protected: - const std::vector kTwoShardIdList{{"s1"}, {"s2"}}; - -private: - static HostAndPort makeHostAndPort(const ShardId& shardId) { - return HostAndPort(str::stream() << shardId << ":123"); - } -}; - -TEST_F(ReshardingOplogFetcherTest, TestBasic) { - const NamespaceString outputCollectionNss("dbtests.outputCollection"); - const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); - - setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard); - - AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); - auto fetcherJob = launchAsync([&, this] { - Client::initThread("RefetchRunner", _svcCtx, nullptr); - ReshardingOplogFetcher fetcher(_reshardingUUID, - dataColl->uuid(), - {_fetchTimestamp, _fetchTimestamp}, - _donorShard, - _destinationShard, - true, - outputCollectionNss); - fetcher.useReadConcernForTest(false); - fetcher.setInitialBatchSizeForTest(2); - - fetcher.iterate(&cc()); - }); - - requestPassthroughHandler(fetcherJob); - - // Five oplog entries for resharding + the sentinel final oplog entry. - ASSERT_EQ(6, itcount(outputCollectionNss)); -} - -TEST_F(ReshardingOplogFetcherTest, TestTrackLastSeen) { - const NamespaceString outputCollectionNss("dbtests.outputCollection"); - const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); - - setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard); - - AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); - - const int maxBatches = 1; - auto fetcherJob = launchAsync([&, this] { - Client::initThread("RefetcherRunner", _svcCtx, nullptr); - - ReshardingOplogFetcher fetcher(_reshardingUUID, - dataColl->uuid(), - {_fetchTimestamp, _fetchTimestamp}, - _donorShard, - _destinationShard, - true, - outputCollectionNss); - fetcher.useReadConcernForTest(false); - fetcher.setInitialBatchSizeForTest(2); - fetcher.setMaxBatchesForTest(maxBatches); - - fetcher.iterate(&cc()); - return fetcher.getLastSeenTimestamp(); - }); - - ReshardingDonorOplogId lastSeen = requestPassthroughHandler(fetcherJob, maxBatches); - - // Two oplog entries due to the batch size. - ASSERT_EQ(2, itcount(outputCollectionNss)); - // Assert the lastSeen value has been bumped from the original `_fetchTimestamp`. - ASSERT_GT(lastSeen.getTs(), _fetchTimestamp); -} - -TEST_F(ReshardingOplogFetcherTest, TestFallingOffOplog) { - const NamespaceString outputCollectionNss("dbtests.outputCollection"); - const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); - - setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard); - - AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); - - auto fetcherJob = launchAsync([&, this] { - Client::initThread("RefetcherRunner", _svcCtx, nullptr); - - const Timestamp doesNotExist(1, 1); - ReshardingOplogFetcher fetcher(_reshardingUUID, - dataColl->uuid(), - {doesNotExist, doesNotExist}, - _donorShard, - _destinationShard, - true, - outputCollectionNss); - fetcher.useReadConcernForTest(false); - - try { - fetcher.iterate(&cc()); - // Test failure case. - return Status::OK(); - } catch (...) { - return exceptionToStatus(); - } - }); - - auto fetcherStatus = requestPassthroughHandler(fetcherJob); - - // Two oplog entries due to the batch size. - ASSERT_EQ(0, itcount(outputCollectionNss)); - ASSERT_EQ(ErrorCodes::OplogQueryMinTsMissing, fetcherStatus.code()); -} - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/s/shard_local.cpp b/src/mongo/db/s/shard_local.cpp index d1d66fddb53..0f27d3a05d5 100644 --- a/src/mongo/db/s/shard_local.cpp +++ b/src/mongo/db/s/shard_local.cpp @@ -201,10 +201,4 @@ void ShardLocal::runFireAndForgetCommand(OperationContext* opCtx, MONGO_UNREACHABLE; } -Status ShardLocal::runAggregation(OperationContext* opCtx, - const AggregationRequest& aggRequest, - std::function& batch)> callback) { - MONGO_UNREACHABLE; -} - } // namespace mongo diff --git a/src/mongo/db/s/shard_local.h b/src/mongo/db/s/shard_local.h index 696bd665c3d..df625905d7c 100644 --- a/src/mongo/db/s/shard_local.h +++ b/src/mongo/db/s/shard_local.h @@ -70,10 +70,6 @@ public: const std::string& dbName, const BSONObj& cmdObj) override; - Status runAggregation(OperationContext* opCtx, - const AggregationRequest& aggRequest, - std::function& batch)> callback); - private: StatusWith _runCommand(OperationContext* opCtx, const ReadPreferenceSetting& unused, diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index 09d63233bdf..c03839ddbed 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -123,6 +123,7 @@ if not has_option('noshell') and usemozjs: 'querytests.cpp', 'replica_set_tests.cpp', 'repltests.cpp', + 'resharding_tests.cpp', 'rollbacktests.cpp', 'socktests.cpp', 'storage_timestamp_tests.cpp', diff --git a/src/mongo/dbtests/resharding_tests.cpp b/src/mongo/dbtests/resharding_tests.cpp new file mode 100644 index 00000000000..996371a0cf4 --- /dev/null +++ b/src/mongo/dbtests/resharding_tests.cpp @@ -0,0 +1,507 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include + +#include "mongo/db/client.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/global_settings.h" +#include "mongo/db/op_observer_impl.h" +#include "mongo/db/op_observer_registry.h" +#include "mongo/db/pipeline/expression_context_for_test.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/replication_consistency_markers_mock.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/replication_recovery_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/s/resharding/resharding_oplog_fetcher.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/server_options.h" +#include "mongo/db/service_context.h" +#include "mongo/db/storage/snapshot_manager.h" +#include "mongo/db/storage/storage_engine.h" +#include "mongo/db/storage/write_unit_of_work.h" +#include "mongo/db/vector_clock_mutable.h" +#include "mongo/dbtests/dbtests.h" +#include "mongo/logv2/log.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { +/** + * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs. + */ +class OneOffRead { +public: + OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) { + _opCtx->recoveryUnit()->abandonSnapshot(); + if (ts.isNull()) { + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + } else { + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts); + } + } + + ~OneOffRead() { + _opCtx->recoveryUnit()->abandonSnapshot(); + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + } + +private: + OperationContext* _opCtx; +}; + +/** + * Observed problems using ShardingMongodTestFixture: + * + * - Does not mix with dbtest. Both will initialize a ServiceContext. + * - By default uses ephemeralForTest. These tests require a storage engine that supports majority + * reads. + * - When run as a unittest (and using WT), the fixture initializes the storage engine for each test + * that is run. WT specifically installs a ServerStatusSection. The server status code asserts + * that a section is never added after a `serverStatus` command is run. Tests defined in + * `migration_manager_test` (part of the `db_s_config_server_test` unittest binary) call a + * serverStatus triggerring this assertion. + */ +class ReshardingTest { +public: + ServiceContext::UniqueOperationContext _opCtxRaii = cc().makeOperationContext(); + OperationContext* _opCtx = _opCtxRaii.get(); + ServiceContext* _svcCtx = _opCtx->getServiceContext(); + VectorClockMutable* _clock = VectorClockMutable::get(_opCtx); + // A convenience UUID. + UUID _reshardingUUID = UUID::gen(); + // Timestamp of the first oplog entry which the fixture will set up. + Timestamp _fetchTimestamp; + + ReshardingTest() { + repl::ReplSettings replSettings; + replSettings.setOplogSizeBytes(100 * 1024 * 1024); + replSettings.setReplSetString("rs0"); + setGlobalReplSettings(replSettings); + + auto replCoordinatorMock = + std::make_unique(_svcCtx, replSettings); + replCoordinatorMock->alwaysAllowWrites(true); + repl::ReplicationCoordinator::set(_svcCtx, std::move(replCoordinatorMock)); + repl::StorageInterface::set(_svcCtx, std::make_unique()); + repl::ReplicationProcess::set( + _svcCtx, + std::make_unique( + repl::StorageInterface::get(_svcCtx), + std::make_unique(), + std::make_unique())); + + // Since the Client object persists across tests, even though the global + // ReplicationCoordinator does not, we need to clear the last op associated with the client + // to avoid the invariant in ReplClientInfo::setLastOp that the optime only goes forward. + repl::ReplClientInfo::forClient(_opCtx->getClient()).clearLastOp_forTest(); + + auto opObsRegistry = std::make_unique(); + opObsRegistry->addObserver(std::make_unique()); + _opCtx->getServiceContext()->setOpObserver(std::move(opObsRegistry)); + + // Clean out the oplog and write one no-op entry. The timestamp of this first oplog entry + // will serve as resharding's `fetchTimestamp`. + repl::createOplog(_opCtx); + reset(NamespaceString::kRsOplogNamespace); + { + WriteUnitOfWork wuow(_opCtx); + Lock::GlobalLock lk(_opCtx, LockMode::MODE_IX); + _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage( + _opCtx, + // Choose a random, irrelevant replicated namespace. + NamespaceString::kSystemKeysNamespace, + UUID::gen(), + BSON("msg" + << "Dummy op."), + boost::none, + boost::none, + boost::none, + boost::none, + boost::none); + wuow.commit(); + repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); + } + _fetchTimestamp = queryOplog(BSONObj())["ts"].timestamp(); + std::cout << " Fetch timestamp: " << _fetchTimestamp.toString() << std::endl; + + _clock->tickClusterTimeTo(LogicalTime(Timestamp(1, 0))); + } + + ~ReshardingTest() { + try { + reset(NamespaceString("local.oplog.rs")); + } catch (...) { + FAIL("Exception while cleaning up test"); + } + } + + + /** + * Walking on ice: resetting the ReplicationCoordinator destroys the underlying + * `DropPendingCollectionReaper`. Use a truncate/dropAllIndexes to clean out a collection + * without actually dropping it. + */ + void reset(NamespaceString nss) const { + ::mongo::writeConflictRetry(_opCtx, "deleteAll", nss.ns(), [&] { + // Do not write DDL operations to the oplog. This keeps the initial oplog state for each + // test predictable. + repl::UnreplicatedWritesBlock uwb(_opCtx); + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + AutoGetCollection collRaii(_opCtx, nss, LockMode::MODE_X); + + if (collRaii) { + WriteUnitOfWork wunit(_opCtx); + invariant(collRaii.getWritableCollection()->truncate(_opCtx).isOK()); + if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { + ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1))); + } + collRaii.getWritableCollection()->getIndexCatalog()->dropAllIndexes(_opCtx, false); + wunit.commit(); + return; + } + + AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X); + WriteUnitOfWork wunit(_opCtx); + if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { + ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1))); + } + invariant(dbRaii.getDb()->createCollection(_opCtx, nss)); + wunit.commit(); + }); + } + + void insertDocument(const CollectionPtr& coll, const InsertStatement& stmt) { + // Insert some documents. + OpDebug* const nullOpDebug = nullptr; + const bool fromMigrate = false; + ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, fromMigrate)); + } + + BSONObj queryCollection(NamespaceString nss, const BSONObj& query) { + BSONObj ret; + ASSERT_TRUE(Helpers::findOne( + _opCtx, AutoGetCollectionForRead(_opCtx, nss).getCollection(), query, ret)) + << "Query: " << query; + return ret; + } + + BSONObj queryOplog(const BSONObj& query) { + OneOffRead oor(_opCtx, Timestamp::min()); + return queryCollection(NamespaceString::kRsOplogNamespace, query); + } + + repl::OpTime getLastApplied() { + return repl::ReplicationCoordinator::get(_opCtx)->getMyLastAppliedOpTime(); + } + + boost::intrusive_ptr createExpressionContext() { + NamespaceString slimNss = + NamespaceString("local.system.resharding.slimOplogForGraphLookup"); + + boost::intrusive_ptr expCtx( + new ExpressionContextForTest(_opCtx, NamespaceString::kRsOplogNamespace)); + expCtx->setResolvedNamespace(NamespaceString::kRsOplogNamespace, + {NamespaceString::kRsOplogNamespace, {}}); + expCtx->setResolvedNamespace(slimNss, + {slimNss, std::vector{getSlimOplogPipeline()}}); + return expCtx; + } + + int itcount(NamespaceString nss) { + OneOffRead oof(_opCtx, Timestamp::min()); + AutoGetCollectionForRead autoColl(_opCtx, nss); + auto cursor = autoColl.getCollection()->getCursor(_opCtx); + + int ret = 0; + while (auto rec = cursor->next()) { + ++ret; + } + + return ret; + } + + // Writes five documents to `dataCollectionNss` that are replicated with a `destinedRecipient` + // followed by the final no-op oplog entry that signals the last oplog entry needed to be + // applied for resharding to move to the next stage. + void setupBasic(NamespaceString outputCollectionNss, + NamespaceString dataCollectionNss, + ShardId destinedRecipient) { + reset(outputCollectionNss); + reset(dataCollectionNss); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + + // Set a failpoint to tack a `destinedRecipient` onto oplog entries. + setGlobalFailPoint("addDestinedRecipient", + BSON("mode" + << "alwaysOn" + << "data" + << BSON("destinedRecipient" << destinedRecipient.toString()))); + + // Insert five documents. Advance the majority point. + const std::int32_t docsToInsert = 5; + { + for (std::int32_t num = 0; num < docsToInsert; ++num) { + WriteUnitOfWork wuow(_opCtx); + insertDocument(dataColl.getCollection(), + InsertStatement(BSON("_id" << num << "a" << num))); + wuow.commit(); + } + } + + // Write an entry saying that fetching is complete. + { + WriteUnitOfWork wuow(_opCtx); + _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage( + _opCtx, + dataColl.getCollection()->ns(), + dataColl.getCollection()->uuid(), + BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.", + dataColl.getCollection()->ns().toString())), + BSON("type" + << "reshardFinalOp" + << "reshardingUUID" << _reshardingUUID), + boost::none, + boost::none, + boost::none, + boost::none); + wuow.commit(); + } + + repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); + _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot( + getLastApplied().getTimestamp()); + + // Disable the failpoint. + setGlobalFailPoint("addDestinedRecipient", + BSON("mode" + << "off")); + } +}; + +class RunFetchIteration : public ReshardingTest { +public: + void run() { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + reset(outputCollectionNss); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + reset(dataCollectionNss); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + + // Set a failpoint to tack a `destinedRecipient` onto oplog entries. + setGlobalFailPoint("addDestinedRecipient", + BSON("mode" + << "alwaysOn" + << "data" + << BSON("destinedRecipient" + << "shard1"))); + + // Insert five documents. Advance the majority point. Insert five more documents. + const std::int32_t docsToInsert = 5; + { + for (std::int32_t num = 0; num < docsToInsert; ++num) { + WriteUnitOfWork wuow(_opCtx); + insertDocument(dataColl.getCollection(), + InsertStatement(BSON("_id" << num << "a" << num))); + wuow.commit(); + } + } + + repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); + const Timestamp firstFiveLastApplied = getLastApplied().getTimestamp(); + _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot( + firstFiveLastApplied); + { + for (std::int32_t num = docsToInsert; num < 2 * docsToInsert; ++num) { + WriteUnitOfWork wuow(_opCtx); + insertDocument(dataColl.getCollection(), + InsertStatement(BSON("_id" << num << "a" << num))); + wuow.commit(); + } + } + + // Disable the failpoint. + setGlobalFailPoint("addDestinedRecipient", + BSON("mode" + << "off")); + + repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); + const Timestamp latestLastApplied = getLastApplied().getTimestamp(); + + // The first call to `iterate` should return the first five inserts and return a + // `ReshardingDonorOplogId` matching the last applied of those five inserts. + ReshardingOplogFetcher fetcher(_reshardingUUID, + dataColl->uuid(), + {_fetchTimestamp, _fetchTimestamp}, + ShardId("fakeDonorShard"), + ShardId("shard1"), + true, + outputCollectionNss); + DBDirectClient client(_opCtx); + boost::optional donorOplogId = + fetcher.iterate(_opCtx, + &client, + createExpressionContext(), + {_fetchTimestamp, _fetchTimestamp}, + dataColl->uuid(), + {"shard1"}, + true, + outputCollectionNss); + + ASSERT(donorOplogId != boost::none); + ASSERT_EQ(docsToInsert, itcount(outputCollectionNss)); + ASSERT_EQ(firstFiveLastApplied, donorOplogId->getClusterTime()); + ASSERT_EQ(firstFiveLastApplied, donorOplogId->getTs()); + + // Advance the committed snapshot. A second `iterate` should return the second batch of five + // inserts. + _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot( + getLastApplied().getTimestamp()); + + donorOplogId = fetcher.iterate(_opCtx, + &client, + createExpressionContext(), + {firstFiveLastApplied, firstFiveLastApplied}, + dataColl->uuid(), + {"shard1"}, + true, + outputCollectionNss); + + ASSERT(donorOplogId != boost::none); + // Two batches of five inserts entry for the create collection oplog entry. + ASSERT_EQ((2 * docsToInsert), itcount(outputCollectionNss)); + ASSERT_EQ(latestLastApplied, donorOplogId->getClusterTime()); + ASSERT_EQ(latestLastApplied, donorOplogId->getTs()); + } +}; + +class RunConsume : public ReshardingTest { +public: + void run() { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + + const ShardId destinationShard("shard1"); + setupBasic(outputCollectionNss, dataCollectionNss, destinationShard); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IS); + ReshardingOplogFetcher fetcher(_reshardingUUID, + dataColl->uuid(), + {_fetchTimestamp, _fetchTimestamp}, + ShardId("fakeDonorShard"), + destinationShard, + true, + outputCollectionNss); + DBDirectClient client(_opCtx); + fetcher.consume(&client); + + // Six oplog entries should be copied. Five inserts and the final no-op oplog entry. + ASSERT_EQ(6, fetcher.getNumOplogEntriesCopied()); + } +}; + +class InterruptConsume : public ReshardingTest { +public: + void run() { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + + const ShardId destinationShard("shard1"); + setupBasic(outputCollectionNss, dataCollectionNss, destinationShard); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IS); + ReshardingOplogFetcher fetcher(_reshardingUUID, + dataColl->uuid(), + {_fetchTimestamp, _fetchTimestamp}, + ShardId("fakeDonorShard"), + destinationShard, + true, + outputCollectionNss); + + // Interrupt the fetcher. A fetcher object owns its own client, but interruption does not + // require the background job to be started. + fetcher.setKilled(); + + DBDirectClient client(_opCtx); + ASSERT_THROWS(fetcher.consume(&client), ExceptionForCat); + } +}; + +class AllReshardingTests : public unittest::OldStyleSuiteSpecification { +public: + AllReshardingTests() : unittest::OldStyleSuiteSpecification("ReshardingTests") {} + + // Must be evaluated at test run() time, not static-init time. + static bool shouldSkip() { + // Only run on storage engines that support snapshot reads. + auto storageEngine = cc().getServiceContext()->getStorageEngine(); + if (!storageEngine->supportsReadConcernSnapshot() || + !mongo::serverGlobalParams.enableMajorityReadConcern) { + LOGV2(5123009, + "Skipping this test because the configuration does not support majority reads.", + "storageEngine"_attr = storageGlobalParams.engine, + "enableMajorityReadConcern"_attr = + mongo::serverGlobalParams.enableMajorityReadConcern); + return true; + } + return false; + } + + template + void addIf() { + addNameCallback(nameForTestClass(), [] { + if (!shouldSkip()) + T().run(); + }); + } + + void setupTests() { + addIf(); + addIf(); + addIf(); + } +}; + +unittest::OldStyleSuiteInitializer allReshardingTests; + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h index 82e45ef5e55..79f1a2b669c 100644 --- a/src/mongo/s/client/shard.h +++ b/src/mongo/s/client/shard.h @@ -36,7 +36,6 @@ #include "mongo/client/read_preference.h" #include "mongo/db/logical_time.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/executor/remote_command_response.h" @@ -207,18 +206,6 @@ public: const BSONObj& cmdObj, Milliseconds maxTimeMSOverride); - /** - * Synchronously run the aggregation request, with a best effort honoring of request - * options. `callback` will be called with the batch contained in each response. `callback` - * should return `true` to execute another getmore. Returning `false` will send a - * `killCursors`. If the aggregation results are exhausted, there will be no additional calls to - * `callback`. - */ - virtual Status runAggregation( - OperationContext* opCtx, - const AggregationRequest& aggRequest, - std::function& batch)> callback) = 0; - /** * Runs a write command against a shard. This is separate from runCommand, because write * commands return errors in a different format than regular commands do, so checking for diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 30ceec26868..5254c24d37d 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -281,7 +281,7 @@ StatusWith ShardRemote::_runExhaustiveCursorCommand( const auto& data = dataStatus.getValue(); if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) { - // Sharding users of ReplSetMetadata do not require the wall clock time field to be set. + // Sharding users of ReplSetMetadata do not require the wall clock time field to be set auto replParseStatus = rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata); if (!replParseStatus.isOK()) { @@ -411,91 +411,6 @@ void ShardRemote::runFireAndForgetCommand(OperationContext* opCtx, .ignore(); } -Status ShardRemote::runAggregation( - OperationContext* opCtx, - const AggregationRequest& aggRequest, - std::function& batch)> callback) { - - BSONObj readPrefMetadata; - - ReadPreferenceSetting readPreference = - uassertStatusOK(ReadPreferenceSetting::fromContainingBSON( - aggRequest.getUnwrappedReadPref(), ReadPreference::SecondaryPreferred)); - - auto swHost = _targeter->findHost(opCtx, readPreference); - if (!swHost.isOK()) { - return swHost.getStatus(); - } - HostAndPort host = swHost.getValue(); - - BSONObjBuilder builder; - readPreference.toContainingBSON(&builder); - readPrefMetadata = builder.obj(); - - Status status = - Status(ErrorCodes::InternalError, "Internal error running cursor callback in command"); - auto fetcherCallback = [&status, callback](const Fetcher::QueryResponseStatus& dataStatus, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { - // Throw out any accumulated results on error - if (!dataStatus.isOK()) { - status = dataStatus.getStatus(); - return; - } - - const auto& data = dataStatus.getValue(); - - if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) { - // Sharding users of ReplSetMetadata do not require the wall clock time field to be set. - auto replParseStatus = - rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata); - if (!replParseStatus.isOK()) { - status = replParseStatus.getStatus(); - return; - } - } - - if (!callback(data.documents)) { - *nextAction = Fetcher::NextAction::kNoAction; - } - - status = Status::OK(); - - if (!getMoreBob) { - return; - } - getMoreBob->append("getMore", data.cursorId); - getMoreBob->append("collection", data.nss.coll()); - }; - - Milliseconds requestTimeout(-1); - if (aggRequest.getMaxTimeMS()) { - requestTimeout = Milliseconds(aggRequest.getMaxTimeMS()); - } - - auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); - Fetcher fetcher(executor.get(), - host, - aggRequest.getNamespaceString().db().toString(), - aggRequest.serializeToCommandObj().toBson(), - fetcherCallback, - readPrefMetadata, - requestTimeout, /* command network timeout */ - requestTimeout /* getMore network timeout */); - - Status scheduleStatus = fetcher.schedule(); - if (!scheduleStatus.isOK()) { - return scheduleStatus; - } - - fetcher.join(); - - updateReplSetMonitor(host, status); - - return status; -} - - StatusWith ShardRemote::_scheduleCommand( OperationContext* opCtx, const ReadPreferenceSetting& readPref, diff --git a/src/mongo/s/client/shard_remote.h b/src/mongo/s/client/shard_remote.h index e7ca9004112..eb0c547c97f 100644 --- a/src/mongo/s/client/shard_remote.h +++ b/src/mongo/s/client/shard_remote.h @@ -85,10 +85,6 @@ public: const std::string& dbName, const BSONObj& cmdObj) final; - Status runAggregation(OperationContext* opCtx, - const AggregationRequest& aggRequest, - std::function& batch)> callback); - private: struct AsyncCmdHandle { HostAndPort hostTargetted; diff --git a/src/mongo/unittest/unittest.h b/src/mongo/unittest/unittest.h index 76563290fff..2ef1d7cd77e 100644 --- a/src/mongo/unittest/unittest.h +++ b/src/mongo/unittest/unittest.h @@ -49,7 +49,6 @@ #include "mongo/base/status_with.h" #include "mongo/base/string_data.h" -#include "mongo/logv2/log_debug.h" #include "mongo/logv2/log_detail.h" #include "mongo/unittest/bson_test_util.h" #include "mongo/unittest/unittest_helpers.h" -- cgit v1.2.1