diff options
author | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2020-11-25 10:42:39 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-25 16:52:46 +0000 |
commit | 2d295d7b205dcac66d434929553610dd48ac0815 (patch) | |
tree | 33c57ad902b32f64fb6b471e7787a972e27917cf | |
parent | a6b7feddd72d9a9aa4daf5d1198ab051ca4afd40 (diff) | |
download | mongo-2d295d7b205dcac66d434929553610dd48ac0815.tar.gz |
SERVER-51245: Have resharding oplog fetching use a Fetcher.
-rw-r--r-- | src/mongo/db/auth/authz_manager_external_state_mock.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp | 231 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_fetcher.h | 81 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp | 417 | ||||
-rw-r--r-- | src/mongo/db/s/shard_local.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/shard_local.h | 4 | ||||
-rw-r--r-- | src/mongo/dbtests/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/dbtests/resharding_tests.cpp | 507 | ||||
-rw-r--r-- | src/mongo/s/client/shard.h | 13 | ||||
-rw-r--r-- | src/mongo/s/client/shard_remote.cpp | 87 | ||||
-rw-r--r-- | src/mongo/s/client/shard_remote.h | 4 | ||||
-rw-r--r-- | src/mongo/unittest/unittest.h | 1 |
13 files changed, 700 insertions, 663 deletions
diff --git a/src/mongo/db/auth/authz_manager_external_state_mock.cpp b/src/mongo/db/auth/authz_manager_external_state_mock.cpp index 00d479756a6..cb684ed4f84 100644 --- a/src/mongo/db/auth/authz_manager_external_state_mock.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_mock.cpp @@ -112,7 +112,13 @@ void AuthzManagerExternalStateMock::setAuthzVersion(int version) { std::unique_ptr<AuthzSessionExternalState> AuthzManagerExternalStateMock::makeAuthzSessionExternalState(AuthorizationManager* authzManager) { - return std::make_unique<AuthzSessionExternalStateMock>(authzManager); + auto ret = std::make_unique<AuthzSessionExternalStateMock>(authzManager); + if (!authzManager->isAuthEnabled()) { + // Construct a `AuthzSessionExternalStateMock` structure that represents the default no-auth + // state of a running mongod. + ret->setReturnValueForShouldIgnoreAuthChecks(true); + } + return ret; } Status AuthzManagerExternalStateMock::findOne(OperationContext* opCtx, diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index f554ffc9933..99889dcbc99 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -453,6 +453,7 @@ env.CppUnitTest( 'resharding/resharding_donor_oplog_iterator_test.cpp', 'resharding/resharding_donor_recipient_common_test.cpp', 'resharding/resharding_oplog_applier_test.cpp', + 'resharding/resharding_oplog_fetcher_test.cpp', 'resharding/resharding_recipient_service_test.cpp', 'session_catalog_migration_destination_test.cpp', 'session_catalog_migration_source_test.cpp', @@ -475,7 +476,9 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/pipeline/document_source_mock', + '$BUILD_DIR/mongo/db/pipeline/expression_context', '$BUILD_DIR/mongo/db/query/query_request', + '$BUILD_DIR/mongo/db/query/query_test_service_context', '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', '$BUILD_DIR/mongo/db/repl/oplog_interface_local', '$BUILD_DIR/mongo/db/repl/replmocks', diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index 2dce0fee77b..e0bd954b8c1 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -87,88 +87,71 @@ ReshardingOplogFetcher::ReshardingOplogFetcher(UUID reshardingUUID, _donorShard(donorShard), _recipientShard(recipientShard), _doesDonorOwnMinKeyChunk(doesDonorOwnMinKeyChunk), - _toWriteInto(toWriteInto), - _client(getGlobalServiceContext()->makeClient( - fmt::format("OplogFetcher-{}-{}", reshardingUUID.toString(), donorShard.toString()))) {} + _toWriteInto(toWriteInto) {} -void ReshardingOplogFetcher::consume(DBClientBase* conn) { - while (true) { - auto opCtxRaii = _client->makeOperationContext(); - opCtxRaii->checkForInterrupt(); - auto expCtx = _makeExpressionContext(opCtxRaii.get()); - boost::optional<ReshardingDonorOplogId> restartAt = iterate(opCtxRaii.get(), - conn, - expCtx, - _startAt, - _collUUID, - _recipientShard, - _doesDonorOwnMinKeyChunk, - _toWriteInto); - if (!restartAt) { - return; - } - _startAt = restartAt.get(); - } -} +Future<void> ReshardingOplogFetcher::schedule(executor::TaskExecutor* executor) { + auto pf = makePromiseFuture<void>(); + _fetchedFinishPromise = std::move(pf.promise); + + _reschedule(executor); -void ReshardingOplogFetcher::setKilled() { - _isAlive.store(false); - _client->setKilled(); + return std::move(pf.future); } -void ReshardingOplogFetcher::schedule(executor::TaskExecutor* executor) { +void ReshardingOplogFetcher::_reschedule(executor::TaskExecutor* executor) { executor->schedule([this, executor](Status status) { + ThreadClient client( + fmt::format("OplogFetcher-{}-{}", _reshardingUUID.toString(), _donorShard.toString()), + getGlobalServiceContext()); if (!status.isOK()) { + LOGV2_INFO(5192101, "Resharding oplog fetcher aborting.", "reason"_attr = status); + _fetchedFinishPromise.setError(status); return; } - if (_runTask()) { - schedule(executor); + try { + if (iterate(client.get())) { + _reschedule(executor); + } else { + _fetchedFinishPromise.emplaceValue(); + } + } catch (...) { + LOGV2_INFO(5192102, "Error.", "reason"_attr = exceptionToStatus()); + _fetchedFinishPromise.setError(exceptionToStatus()); } }); } -bool ReshardingOplogFetcher::_runTask() { - auto opCtxRaii = _client->makeOperationContext(); - opCtxRaii->checkForInterrupt(); - - const Seconds maxStaleness(10); - ReadPreferenceSetting readPref(ReadPreference::Nearest, maxStaleness); - StatusWith<std::shared_ptr<Shard>> swDonor = - Grid::get(opCtxRaii.get())->shardRegistry()->getShard(opCtxRaii.get(), _donorShard); - if (!swDonor.isOK()) { - LOGV2_WARNING(5127203, - "Error finding shard in registry, retrying.", - "error"_attr = swDonor.getStatus()); - return true; - } - - StatusWith<HostAndPort> swTargettedDonor = - swDonor.getValue()->getTargeter()->findHost(opCtxRaii.get(), readPref); - if (!swTargettedDonor.isOK()) { - LOGV2_WARNING(5127202, - "Error targetting donor, retrying.", - "error"_attr = swTargettedDonor.getStatus()); - return true; - } +bool ReshardingOplogFetcher::iterate(Client* client) { + std::shared_ptr<Shard> targetShard; + { + auto opCtxRaii = client->makeOperationContext(); + opCtxRaii->checkForInterrupt(); - DBClientConnection donorConn; - if (auto status = donorConn.connect(swTargettedDonor.getValue(), "ReshardingOplogFetching"_sd); - !status.isOK()) { - LOGV2_WARNING(5127201, "Failed connecting to donor, retrying.", "error"_attr = status); - return true; + const Seconds maxStaleness(10); + ReadPreferenceSetting readPref(ReadPreference::Nearest, maxStaleness); + StatusWith<std::shared_ptr<Shard>> swDonor = + Grid::get(opCtxRaii.get())->shardRegistry()->getShard(opCtxRaii.get(), _donorShard); + if (!swDonor.isOK()) { + LOGV2_WARNING(5127203, + "Error finding shard in registry, retrying.", + "error"_attr = swDonor.getStatus()); + return true; + } + targetShard = swDonor.getValue(); } - // Reset the OpCtx so consuming can manage short-lived OpCtx lifetimes with the current client. - opCtxRaii.reset(); try { // Consume will throw if there's oplog entries to be copied. It only returns cleanly when // the final oplog has been seen and copied. - consume(&donorConn); + consume(client, targetShard.get()); return false; } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { - _isAlive.store(false); return false; + } catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>&) { + LOGV2_ERROR( + 5192103, "Fatal resharding error while fetching.", "error"_attr = exceptionToStatus()); + throw; } catch (const DBException&) { LOGV2_WARNING( 5127200, "Error while fetching, retrying.", "error"_attr = exceptionToStatus()); @@ -176,73 +159,99 @@ bool ReshardingOplogFetcher::_runTask() { } } -boost::optional<ReshardingDonorOplogId> ReshardingOplogFetcher::iterate( - OperationContext* opCtx, - DBClientBase* conn, - boost::intrusive_ptr<ExpressionContext> expCtx, - const ReshardingDonorOplogId startAfter, - const UUID collUUID, - const ShardId& recipientShard, - const bool doesDonorOwnMinKeyChunk, - const NamespaceString toWriteToNss) { - // This method will use the input opCtx to perform writes into `toWriteToNss`. +void ReshardingOplogFetcher::_ensureCollection(Client* client, const NamespaceString nss) { + auto opCtxRaii = client->makeOperationContext(); + auto opCtx = opCtxRaii.get(); invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Create the destination collection if necessary. - writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", toWriteToNss.toString(), [&] { - const CollectionPtr toWriteTo = - CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, toWriteToNss); - if (toWriteTo) { + writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", nss.toString(), [&] { + const CollectionPtr coll = + CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss); + if (coll) { return; } WriteUnitOfWork wuow(opCtx); - AutoGetOrCreateDb db(opCtx, toWriteToNss.db(), LockMode::MODE_IX); - Lock::CollectionLock collLock(opCtx, toWriteToNss, MODE_IX); - db.getDb()->createCollection(opCtx, toWriteToNss); + AutoGetOrCreateDb db(opCtx, nss.db(), LockMode::MODE_IX); + Lock::CollectionLock collLock(opCtx, nss, MODE_IX); + db.getDb()->createCollection(opCtx, nss); wuow.commit(); }); +} - std::vector<BSONObj> serializedPipeline = - createOplogFetchingPipelineForResharding( - expCtx, startAfter, collUUID, recipientShard, doesDonorOwnMinKeyChunk) - ->serializeToBson(); +std::vector<BSONObj> ReshardingOplogFetcher::_makePipeline(Client* client) { + auto opCtxRaii = client->makeOperationContext(); + auto opCtx = opCtxRaii.get(); + auto expCtx = _makeExpressionContext(opCtx); + + return createOplogFetchingPipelineForResharding( + expCtx, _startAt, _collUUID, _recipientShard, _doesDonorOwnMinKeyChunk) + ->serializeToBson(); +} + +void ReshardingOplogFetcher::consume(Client* client, Shard* shard) { + _ensureCollection(client, _toWriteInto); + std::vector<BSONObj> serializedPipeline = _makePipeline(client); AggregationRequest aggRequest(NamespaceString::kRsOplogNamespace, serializedPipeline); - auto readConcernArgs = repl::ReadConcernArgs( - boost::optional<LogicalTime>(startAfter.getTs()), - boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern)); - aggRequest.setReadConcern(readConcernArgs.toBSONInner()); + if (_useReadConcern) { + auto readConcernArgs = repl::ReadConcernArgs( + boost::optional<LogicalTime>(_startAt.getTs()), + boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern)); + aggRequest.setReadConcern(readConcernArgs.toBSONInner()); + } + aggRequest.setHint(BSON("$natural" << 1)); + aggRequest.setRequestReshardingResumeToken(true); - const bool secondaryOk = true; - const bool useExhaust = true; - std::unique_ptr<DBClientCursor> cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest( - conn, std::move(aggRequest), secondaryOk, useExhaust)); - - // Noting some possible optimizations: - // - // * Batch more inserts into larger storage transactions. - // * Parallize writing documents across multiple threads. - // * Doing either of the above while still using the underlying message buffer of bson objects. - AutoGetCollection toWriteTo(opCtx, toWriteToNss, LockMode::MODE_IX); - ReshardingDonorOplogId lastSeen = startAfter; - while (cursor->more() && _isAlive.load()) { - WriteUnitOfWork wuow(opCtx); - BSONObj obj = cursor->next(); - auto nextOplog = uassertStatusOK(repl::OplogEntry::parse(obj)); + if (_initialBatchSize) { + aggRequest.setBatchSize(_initialBatchSize); + } - lastSeen = ReshardingDonorOplogId::parse({"OplogFetcherParsing"}, - nextOplog.get_id()->getDocument().toBson()); - uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{obj}, nullptr)); - wuow.commit(); - ++_numOplogEntriesCopied; + auto opCtxRaii = client->makeOperationContext(); + int batchesProcessed = 0; + auto svcCtx = client->getServiceContext(); + uassertStatusOK(shard->runAggregation( + opCtxRaii.get(), + aggRequest, + [this, svcCtx, &batchesProcessed](const std::vector<BSONObj>& batch) { + ThreadClient client(fmt::format("ReshardingFetcher-{}-{}", + _reshardingUUID.toString(), + _donorShard.toString()), + svcCtx, + nullptr); + auto opCtxRaii = cc().makeOperationContext(); + auto opCtx = opCtxRaii.get(); - if (isFinalOplog(nextOplog, _reshardingUUID)) { - return boost::none; - } - } + // Noting some possible optimizations: + // + // * Batch more inserts into larger storage transactions. + // * Parallize writing documents across multiple threads. + // * Doing either of the above while still using the underlying message buffer of bson + // objects. + AutoGetCollection toWriteTo(opCtx, _toWriteInto, LockMode::MODE_IX); + for (const BSONObj& doc : batch) { + WriteUnitOfWork wuow(opCtx); + auto nextOplog = uassertStatusOK(repl::OplogEntry::parse(doc)); + + _startAt = ReshardingDonorOplogId::parse( + {"OplogFetcherParsing"}, nextOplog.get_id()->getDocument().toBson()); + uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{doc}, nullptr)); + wuow.commit(); + ++_numOplogEntriesCopied; - return lastSeen; + if (isFinalOplog(nextOplog, _reshardingUUID)) { + return false; + } + } + + if (_maxBatches > -1 && ++batchesProcessed >= _maxBatches) { + return false; + } + + return true; + })); } + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h index aeb198dfabf..a5c06225b07 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h @@ -37,6 +37,7 @@ #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/s/resharding/donor_oplog_id_gen.h" #include "mongo/db/service_context.h" +#include "mongo/s/client/shard.h" #include "mongo/s/shard_id.h" #include "mongo/util/background.h" #include "mongo/util/uuid.h" @@ -59,61 +60,50 @@ public: * - Send an aggregation request + getMores until either: * -- The "final resharding" oplog entry is found. * -- An interruption occurs. + * -- The fetcher concludes it's fallen off the oplog. * -- A different error occurs. * - * In the first two circumstances, the task will return. In the last circumstance, the task will - * be rescheduled in a way that resumes where it had left off from. + * In the first two circumstances, the task will terminate. If the fetcher has fallen off the + * oplog, this is thrown as a fatal resharding exception. In the last circumstance, the task + * will be rescheduled in a way that resumes where it had left off from. */ - void schedule(executor::TaskExecutor* exector); + Future<void> schedule(executor::TaskExecutor* executor); /** - * Given a connection, fetches and copies oplog entries until reaching an error, or coming + * Given a shard, fetches and copies oplog entries until reaching an error, or coming * across a sentinel finish oplog entry. Throws if there's more oplog entries to be copied. */ - void consume(DBClientBase* conn); + void consume(Client* client, Shard* shard); - /** - * Kill the underlying client the BackgroundJob is using to expedite cleaning up resources when - * the output is no longer necessary. The underlying `toWriteInto` collection is left intact, - * though likely incomplete. - */ - void setKilled(); + bool iterate(Client* client); - /** - * Returns boost::none if the last oplog entry to be copied is found. Otherwise returns the - * ReshardingDonorOplogId to resume querying from. - * - * Issues an aggregation to `DBClientBase`s starting at `startAfter` and copies the entries - * relevant to `recipientShard` into `toWriteInto`. Control is returned when the aggregation - * cursor is exhausted. - * - * Returns an identifier for the last oplog-ish document written to `toWriteInto`. - * - * This method throws. - * - * TODO SERVER-51245 Replace `DBClientBase` with a `Shard`. Right now `Shard` does not do things - * like perform aggregate commands nor does it expose a cursor/stream interface. However, using - * a `Shard` object will provide critical behavior such as advancing logical clock values on a - * response and targetting a node to send the aggregation command to. - */ - boost::optional<ReshardingDonorOplogId> iterate(OperationContext* opCtx, - DBClientBase* conn, - boost::intrusive_ptr<ExpressionContext> expCtx, - ReshardingDonorOplogId startAfter, - UUID collUUID, - const ShardId& recipientShard, - bool doesDonorOwnMinKeyChunk, - NamespaceString toWriteInto); - - int getNumOplogEntriesCopied() { + int getNumOplogEntriesCopied() const { return _numOplogEntriesCopied; } + ReshardingDonorOplogId getLastSeenTimestamp() const { + return _startAt; + } + + void setInitialBatchSizeForTest(int size) { + _initialBatchSize = size; + } + + void useReadConcernForTest(bool use) { + _useReadConcern = use; + } + + void setMaxBatchesForTest(int maxBatches) { + _maxBatches = maxBatches; + } + private: /** * Returns true if there's more work to do and the task should be rescheduled. */ - bool _runTask(); + void _ensureCollection(Client* client, const NamespaceString nss); + std::vector<BSONObj> _makePipeline(Client* client); + void _reschedule(executor::TaskExecutor* executor); const UUID _reshardingUUID; const UUID _collUUID; @@ -123,9 +113,16 @@ private: const bool _doesDonorOwnMinKeyChunk; const NamespaceString _toWriteInto; - ServiceContext::UniqueClient _client; - AtomicWord<bool> _isAlive{true}; - + Promise<void> _fetchedFinishPromise; int _numOplogEntriesCopied = 0; + + // For testing to control behavior. + + // The aggregation batch size. This only affects the original call and not `getmore`s. + int _initialBatchSize = 0; + // Setting to false will omit the `afterClusterTime` and `majority` read concern. + bool _useReadConcern = true; + // Dictates how many batches get processed before returning control from a call to `consume`. + int _maxBatches = -1; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp new file mode 100644 index 00000000000..f91a8a5996c --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -0,0 +1,417 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include <vector> + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/client.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/logical_session_cache_noop.h" +#include "mongo/db/op_observer_impl.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/s/resharding/resharding_oplog_fetcher.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/db/service_context.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/storage/write_unit_of_work.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using namespace unittest; + +/** + * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs. + */ +class OneOffRead { +public: + OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) { + _opCtx->recoveryUnit()->abandonSnapshot(); + if (ts.isNull()) { + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + } else { + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts); + } + } + + ~OneOffRead() { + _opCtx->recoveryUnit()->abandonSnapshot(); + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + } + +private: + OperationContext* _opCtx; +}; + +class ReshardingOplogFetcherTest : public ShardServerTestFixture { +public: + OperationContext* _opCtx; + ServiceContext* _svcCtx; + UUID _reshardingUUID = UUID::gen(); + Timestamp _fetchTimestamp; + ShardId _donorShard; + ShardId _destinationShard; + + void setUp() { + ShardServerTestFixture::setUp(); + _opCtx = operationContext(); + _svcCtx = _opCtx->getServiceContext(); + + for (const auto& shardId : kTwoShardIdList) { + auto shardTargeter = RemoteCommandTargeterMock::get( + uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId)) + ->getTargeter()); + shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId)); + } + + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + + // onStepUp() relies on the storage interface to create the config.transactions table. + repl::StorageInterface::set(getServiceContext(), + std::make_unique<repl::StorageInterfaceImpl>()); + MongoDSessionCatalog::onStepUp(operationContext()); + LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>()); + _fetchTimestamp = queryOplog(BSONObj())["ts"].timestamp(); + + _donorShard = kTwoShardIdList[0]; + _destinationShard = kTwoShardIdList[1]; + } + + void tearDown() { + WaitForMajorityService::get(getServiceContext()).shutDown(); + ShardServerTestFixture::tearDown(); + } + + /** + * Override the CatalogClient to make CatalogClient::getAllShards automatically return the + * expected shards. We cannot mock the network responses for the ShardRegistry reload, since the + * ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no + * DBClientMock analogous to the NetworkInterfaceMock. + */ + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) { + + class StaticCatalogClient final : public ShardingCatalogClientMock { + public: + StaticCatalogClient(std::vector<ShardId> shardIds) + : ShardingCatalogClientMock(nullptr), _shardIds(std::move(shardIds)) {} + + StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( + OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { + std::vector<ShardType> shardTypes; + for (const auto& shardId : _shardIds) { + const ConnectionString cs = ConnectionString::forReplicaSet( + shardId.toString(), {makeHostAndPort(shardId)}); + ShardType sType; + sType.setName(cs.getSetName()); + sType.setHost(cs.toString()); + shardTypes.push_back(std::move(sType)); + }; + return repl::OpTimeWith<std::vector<ShardType>>(shardTypes); + } + + private: + const std::vector<ShardId> _shardIds; + }; + + return std::make_unique<StaticCatalogClient>(kTwoShardIdList); + } + + void insertDocument(const CollectionPtr& coll, const InsertStatement& stmt) { + // Insert some documents. + OpDebug* const nullOpDebug = nullptr; + const bool fromMigrate = false; + ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, fromMigrate)); + } + + BSONObj queryCollection(NamespaceString nss, const BSONObj& query) { + BSONObj ret; + ASSERT_TRUE(Helpers::findOne( + _opCtx, AutoGetCollectionForRead(_opCtx, nss).getCollection(), query, ret)) + << "Query: " << query; + return ret; + } + + BSONObj queryOplog(const BSONObj& query) { + OneOffRead oor(_opCtx, Timestamp::min()); + return queryCollection(NamespaceString::kRsOplogNamespace, query); + } + + repl::OpTime getLastApplied() { + return repl::ReplicationCoordinator::get(_opCtx)->getMyLastAppliedOpTime(); + } + + boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext() { + NamespaceString slimNss = + NamespaceString("local.system.resharding.slimOplogForGraphLookup"); + + boost::intrusive_ptr<ExpressionContextForTest> expCtx( + new ExpressionContextForTest(_opCtx, NamespaceString::kRsOplogNamespace)); + expCtx->setResolvedNamespace(NamespaceString::kRsOplogNamespace, + {NamespaceString::kRsOplogNamespace, {}}); + expCtx->setResolvedNamespace(slimNss, + {slimNss, std::vector<BSONObj>{getSlimOplogPipeline()}}); + return expCtx; + } + + int itcount(NamespaceString nss) { + OneOffRead oof(_opCtx, Timestamp::min()); + AutoGetCollectionForRead autoColl(_opCtx, nss); + auto cursor = autoColl.getCollection()->getCursor(_opCtx); + + int ret = 0; + while (auto rec = cursor->next()) { + ++ret; + } + + return ret; + } + + void create(NamespaceString nss) { + writeConflictRetry(_opCtx, "create", nss.ns(), [&] { + AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X); + WriteUnitOfWork wunit(_opCtx); + if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { + ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1))); + } + invariant(dbRaii.getDb()->createCollection(_opCtx, nss)); + wunit.commit(); + }); + } + + template <typename T> + T requestPassthroughHandler(executor::NetworkTestEnv::FutureHandle<T>& future, + int maxBatches = -1) { + + int maxNumRequests = 1000; // No unittests would request more than this? + if (maxBatches > -1) { + // The fetcher will send a `killCursors` after the last `getMore`. + maxNumRequests = maxBatches + 1; + } + + bool hasMore = true; + for (int batchNum = 0; hasMore && batchNum < maxNumRequests; ++batchNum) { + onCommand([&](const executor::RemoteCommandRequest& request) -> StatusWith<BSONObj> { + DBDirectClient client(cc().getOperationContext()); + BSONObj result; + bool res = client.runCommand(request.dbname, request.cmdObj, result); + if (res == false || result.hasField("cursorsKilled") || + result["cursor"]["id"].Long() == 0) { + hasMore = false; + } + + return result; + }); + } + + return future.timed_get(Seconds(5)); + } + + // Writes five documents to `dataCollectionNss` that are replicated with a `destinedRecipient` + // followed by the final no-op oplog entry that signals the last oplog entry needed to be + // applied for resharding to move to the next stage. + void setupBasic(NamespaceString outputCollectionNss, + NamespaceString dataCollectionNss, + ShardId destinedRecipient) { + create(outputCollectionNss); + create(dataCollectionNss); + + _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + + // Set a failpoint to tack a `destinedRecipient` onto oplog entries. + setGlobalFailPoint("addDestinedRecipient", + BSON("mode" + << "alwaysOn" + << "data" + << BSON("destinedRecipient" << destinedRecipient.toString()))); + + // Insert five documents. Advance the majority point. + const std::int32_t docsToInsert = 5; + { + for (std::int32_t num = 0; num < docsToInsert; ++num) { + WriteUnitOfWork wuow(_opCtx); + insertDocument(dataColl.getCollection(), + InsertStatement(BSON("_id" << num << "a" << num))); + wuow.commit(); + } + } + + // Write an entry saying that fetching is complete. + { + WriteUnitOfWork wuow(_opCtx); + _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage( + _opCtx, + dataColl.getCollection()->ns(), + dataColl.getCollection()->uuid(), + BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.", + dataColl.getCollection()->ns().toString())), + BSON("type" + << "reshardFinalOp" + << "reshardingUUID" << _reshardingUUID), + boost::none, + boost::none, + boost::none, + boost::none); + wuow.commit(); + } + + repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); + + // Disable the failpoint. + setGlobalFailPoint("addDestinedRecipient", + BSON("mode" + << "off")); + } + +protected: + const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}}; + +private: + static HostAndPort makeHostAndPort(const ShardId& shardId) { + return HostAndPort(str::stream() << shardId << ":123"); + } +}; + +TEST_F(ReshardingOplogFetcherTest, TestBasic) { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + + setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + auto fetcherJob = launchAsync([&, this] { + Client::initThread("RefetchRunner", _svcCtx, nullptr); + ReshardingOplogFetcher fetcher(_reshardingUUID, + dataColl->uuid(), + {_fetchTimestamp, _fetchTimestamp}, + _donorShard, + _destinationShard, + true, + outputCollectionNss); + fetcher.useReadConcernForTest(false); + fetcher.setInitialBatchSizeForTest(2); + + fetcher.iterate(&cc()); + }); + + requestPassthroughHandler(fetcherJob); + + // Five oplog entries for resharding + the sentinel final oplog entry. + ASSERT_EQ(6, itcount(outputCollectionNss)); +} + +TEST_F(ReshardingOplogFetcherTest, TestTrackLastSeen) { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + + setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + + const int maxBatches = 1; + auto fetcherJob = launchAsync([&, this] { + Client::initThread("RefetcherRunner", _svcCtx, nullptr); + + ReshardingOplogFetcher fetcher(_reshardingUUID, + dataColl->uuid(), + {_fetchTimestamp, _fetchTimestamp}, + _donorShard, + _destinationShard, + true, + outputCollectionNss); + fetcher.useReadConcernForTest(false); + fetcher.setInitialBatchSizeForTest(2); + fetcher.setMaxBatchesForTest(maxBatches); + + fetcher.iterate(&cc()); + return fetcher.getLastSeenTimestamp(); + }); + + ReshardingDonorOplogId lastSeen = requestPassthroughHandler(fetcherJob, maxBatches); + + // Two oplog entries due to the batch size. + ASSERT_EQ(2, itcount(outputCollectionNss)); + // Assert the lastSeen value has been bumped from the original `_fetchTimestamp`. + ASSERT_GT(lastSeen.getTs(), _fetchTimestamp); +} + +TEST_F(ReshardingOplogFetcherTest, TestFallingOffOplog) { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + + setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + + auto fetcherJob = launchAsync([&, this] { + Client::initThread("RefetcherRunner", _svcCtx, nullptr); + + const Timestamp doesNotExist(1, 1); + ReshardingOplogFetcher fetcher(_reshardingUUID, + dataColl->uuid(), + {doesNotExist, doesNotExist}, + _donorShard, + _destinationShard, + true, + outputCollectionNss); + fetcher.useReadConcernForTest(false); + + try { + fetcher.iterate(&cc()); + // Test failure case. + return Status::OK(); + } catch (...) { + return exceptionToStatus(); + } + }); + + auto fetcherStatus = requestPassthroughHandler(fetcherJob); + + // Two oplog entries due to the batch size. + ASSERT_EQ(0, itcount(outputCollectionNss)); + ASSERT_EQ(ErrorCodes::OplogQueryMinTsMissing, fetcherStatus.code()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/shard_local.cpp b/src/mongo/db/s/shard_local.cpp index 0f27d3a05d5..d1d66fddb53 100644 --- a/src/mongo/db/s/shard_local.cpp +++ b/src/mongo/db/s/shard_local.cpp @@ -201,4 +201,10 @@ void ShardLocal::runFireAndForgetCommand(OperationContext* opCtx, MONGO_UNREACHABLE; } +Status ShardLocal::runAggregation(OperationContext* opCtx, + const AggregationRequest& aggRequest, + std::function<bool(const std::vector<BSONObj>& batch)> callback) { + MONGO_UNREACHABLE; +} + } // namespace mongo diff --git a/src/mongo/db/s/shard_local.h b/src/mongo/db/s/shard_local.h index df625905d7c..696bd665c3d 100644 --- a/src/mongo/db/s/shard_local.h +++ b/src/mongo/db/s/shard_local.h @@ -70,6 +70,10 @@ public: const std::string& dbName, const BSONObj& cmdObj) override; + Status runAggregation(OperationContext* opCtx, + const AggregationRequest& aggRequest, + std::function<bool(const std::vector<BSONObj>& batch)> callback); + private: StatusWith<Shard::CommandResponse> _runCommand(OperationContext* opCtx, const ReadPreferenceSetting& unused, diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index c03839ddbed..09d63233bdf 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -123,7 +123,6 @@ if not has_option('noshell') and usemozjs: 'querytests.cpp', 'replica_set_tests.cpp', 'repltests.cpp', - 'resharding_tests.cpp', 'rollbacktests.cpp', 'socktests.cpp', 'storage_timestamp_tests.cpp', diff --git a/src/mongo/dbtests/resharding_tests.cpp b/src/mongo/dbtests/resharding_tests.cpp deleted file mode 100644 index 996371a0cf4..00000000000 --- a/src/mongo/dbtests/resharding_tests.cpp +++ /dev/null @@ -1,507 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - -#include "mongo/platform/basic.h" - -#include <memory> - -#include "mongo/db/client.h" -#include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/global_settings.h" -#include "mongo/db/op_observer_impl.h" -#include "mongo/db/op_observer_registry.h" -#include "mongo/db/pipeline/expression_context_for_test.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/repl/replication_consistency_markers_mock.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/replication_coordinator_mock.h" -#include "mongo/db/repl/replication_process.h" -#include "mongo/db/repl/replication_recovery_mock.h" -#include "mongo/db/repl/storage_interface_impl.h" -#include "mongo/db/s/resharding/resharding_oplog_fetcher.h" -#include "mongo/db/s/resharding_util.h" -#include "mongo/db/server_options.h" -#include "mongo/db/service_context.h" -#include "mongo/db/storage/snapshot_manager.h" -#include "mongo/db/storage/storage_engine.h" -#include "mongo/db/storage/write_unit_of_work.h" -#include "mongo/db/vector_clock_mutable.h" -#include "mongo/dbtests/dbtests.h" -#include "mongo/logv2/log.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace { -/** - * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs. - */ -class OneOffRead { -public: - OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) { - _opCtx->recoveryUnit()->abandonSnapshot(); - if (ts.isNull()) { - _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); - } else { - _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts); - } - } - - ~OneOffRead() { - _opCtx->recoveryUnit()->abandonSnapshot(); - _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); - } - -private: - OperationContext* _opCtx; -}; - -/** - * Observed problems using ShardingMongodTestFixture: - * - * - Does not mix with dbtest. Both will initialize a ServiceContext. - * - By default uses ephemeralForTest. These tests require a storage engine that supports majority - * reads. - * - When run as a unittest (and using WT), the fixture initializes the storage engine for each test - * that is run. WT specifically installs a ServerStatusSection. The server status code asserts - * that a section is never added after a `serverStatus` command is run. Tests defined in - * `migration_manager_test` (part of the `db_s_config_server_test` unittest binary) call a - * serverStatus triggerring this assertion. - */ -class ReshardingTest { -public: - ServiceContext::UniqueOperationContext _opCtxRaii = cc().makeOperationContext(); - OperationContext* _opCtx = _opCtxRaii.get(); - ServiceContext* _svcCtx = _opCtx->getServiceContext(); - VectorClockMutable* _clock = VectorClockMutable::get(_opCtx); - // A convenience UUID. - UUID _reshardingUUID = UUID::gen(); - // Timestamp of the first oplog entry which the fixture will set up. - Timestamp _fetchTimestamp; - - ReshardingTest() { - repl::ReplSettings replSettings; - replSettings.setOplogSizeBytes(100 * 1024 * 1024); - replSettings.setReplSetString("rs0"); - setGlobalReplSettings(replSettings); - - auto replCoordinatorMock = - std::make_unique<repl::ReplicationCoordinatorMock>(_svcCtx, replSettings); - replCoordinatorMock->alwaysAllowWrites(true); - repl::ReplicationCoordinator::set(_svcCtx, std::move(replCoordinatorMock)); - repl::StorageInterface::set(_svcCtx, std::make_unique<repl::StorageInterfaceImpl>()); - repl::ReplicationProcess::set( - _svcCtx, - std::make_unique<repl::ReplicationProcess>( - repl::StorageInterface::get(_svcCtx), - std::make_unique<repl::ReplicationConsistencyMarkersMock>(), - std::make_unique<repl::ReplicationRecoveryMock>())); - - // Since the Client object persists across tests, even though the global - // ReplicationCoordinator does not, we need to clear the last op associated with the client - // to avoid the invariant in ReplClientInfo::setLastOp that the optime only goes forward. - repl::ReplClientInfo::forClient(_opCtx->getClient()).clearLastOp_forTest(); - - auto opObsRegistry = std::make_unique<OpObserverRegistry>(); - opObsRegistry->addObserver(std::make_unique<OpObserverImpl>()); - _opCtx->getServiceContext()->setOpObserver(std::move(opObsRegistry)); - - // Clean out the oplog and write one no-op entry. The timestamp of this first oplog entry - // will serve as resharding's `fetchTimestamp`. - repl::createOplog(_opCtx); - reset(NamespaceString::kRsOplogNamespace); - { - WriteUnitOfWork wuow(_opCtx); - Lock::GlobalLock lk(_opCtx, LockMode::MODE_IX); - _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage( - _opCtx, - // Choose a random, irrelevant replicated namespace. - NamespaceString::kSystemKeysNamespace, - UUID::gen(), - BSON("msg" - << "Dummy op."), - boost::none, - boost::none, - boost::none, - boost::none, - boost::none); - wuow.commit(); - repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); - } - _fetchTimestamp = queryOplog(BSONObj())["ts"].timestamp(); - std::cout << " Fetch timestamp: " << _fetchTimestamp.toString() << std::endl; - - _clock->tickClusterTimeTo(LogicalTime(Timestamp(1, 0))); - } - - ~ReshardingTest() { - try { - reset(NamespaceString("local.oplog.rs")); - } catch (...) { - FAIL("Exception while cleaning up test"); - } - } - - - /** - * Walking on ice: resetting the ReplicationCoordinator destroys the underlying - * `DropPendingCollectionReaper`. Use a truncate/dropAllIndexes to clean out a collection - * without actually dropping it. - */ - void reset(NamespaceString nss) const { - ::mongo::writeConflictRetry(_opCtx, "deleteAll", nss.ns(), [&] { - // Do not write DDL operations to the oplog. This keeps the initial oplog state for each - // test predictable. - repl::UnreplicatedWritesBlock uwb(_opCtx); - _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); - AutoGetCollection collRaii(_opCtx, nss, LockMode::MODE_X); - - if (collRaii) { - WriteUnitOfWork wunit(_opCtx); - invariant(collRaii.getWritableCollection()->truncate(_opCtx).isOK()); - if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { - ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1))); - } - collRaii.getWritableCollection()->getIndexCatalog()->dropAllIndexes(_opCtx, false); - wunit.commit(); - return; - } - - AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X); - WriteUnitOfWork wunit(_opCtx); - if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { - ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1))); - } - invariant(dbRaii.getDb()->createCollection(_opCtx, nss)); - wunit.commit(); - }); - } - - void insertDocument(const CollectionPtr& coll, const InsertStatement& stmt) { - // Insert some documents. - OpDebug* const nullOpDebug = nullptr; - const bool fromMigrate = false; - ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, fromMigrate)); - } - - BSONObj queryCollection(NamespaceString nss, const BSONObj& query) { - BSONObj ret; - ASSERT_TRUE(Helpers::findOne( - _opCtx, AutoGetCollectionForRead(_opCtx, nss).getCollection(), query, ret)) - << "Query: " << query; - return ret; - } - - BSONObj queryOplog(const BSONObj& query) { - OneOffRead oor(_opCtx, Timestamp::min()); - return queryCollection(NamespaceString::kRsOplogNamespace, query); - } - - repl::OpTime getLastApplied() { - return repl::ReplicationCoordinator::get(_opCtx)->getMyLastAppliedOpTime(); - } - - boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext() { - NamespaceString slimNss = - NamespaceString("local.system.resharding.slimOplogForGraphLookup"); - - boost::intrusive_ptr<ExpressionContextForTest> expCtx( - new ExpressionContextForTest(_opCtx, NamespaceString::kRsOplogNamespace)); - expCtx->setResolvedNamespace(NamespaceString::kRsOplogNamespace, - {NamespaceString::kRsOplogNamespace, {}}); - expCtx->setResolvedNamespace(slimNss, - {slimNss, std::vector<BSONObj>{getSlimOplogPipeline()}}); - return expCtx; - } - - int itcount(NamespaceString nss) { - OneOffRead oof(_opCtx, Timestamp::min()); - AutoGetCollectionForRead autoColl(_opCtx, nss); - auto cursor = autoColl.getCollection()->getCursor(_opCtx); - - int ret = 0; - while (auto rec = cursor->next()) { - ++ret; - } - - return ret; - } - - // Writes five documents to `dataCollectionNss` that are replicated with a `destinedRecipient` - // followed by the final no-op oplog entry that signals the last oplog entry needed to be - // applied for resharding to move to the next stage. - void setupBasic(NamespaceString outputCollectionNss, - NamespaceString dataCollectionNss, - ShardId destinedRecipient) { - reset(outputCollectionNss); - reset(dataCollectionNss); - - AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); - - // Set a failpoint to tack a `destinedRecipient` onto oplog entries. - setGlobalFailPoint("addDestinedRecipient", - BSON("mode" - << "alwaysOn" - << "data" - << BSON("destinedRecipient" << destinedRecipient.toString()))); - - // Insert five documents. Advance the majority point. - const std::int32_t docsToInsert = 5; - { - for (std::int32_t num = 0; num < docsToInsert; ++num) { - WriteUnitOfWork wuow(_opCtx); - insertDocument(dataColl.getCollection(), - InsertStatement(BSON("_id" << num << "a" << num))); - wuow.commit(); - } - } - - // Write an entry saying that fetching is complete. - { - WriteUnitOfWork wuow(_opCtx); - _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage( - _opCtx, - dataColl.getCollection()->ns(), - dataColl.getCollection()->uuid(), - BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.", - dataColl.getCollection()->ns().toString())), - BSON("type" - << "reshardFinalOp" - << "reshardingUUID" << _reshardingUUID), - boost::none, - boost::none, - boost::none, - boost::none); - wuow.commit(); - } - - repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); - _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot( - getLastApplied().getTimestamp()); - - // Disable the failpoint. - setGlobalFailPoint("addDestinedRecipient", - BSON("mode" - << "off")); - } -}; - -class RunFetchIteration : public ReshardingTest { -public: - void run() { - const NamespaceString outputCollectionNss("dbtests.outputCollection"); - reset(outputCollectionNss); - const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); - reset(dataCollectionNss); - - AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); - - // Set a failpoint to tack a `destinedRecipient` onto oplog entries. - setGlobalFailPoint("addDestinedRecipient", - BSON("mode" - << "alwaysOn" - << "data" - << BSON("destinedRecipient" - << "shard1"))); - - // Insert five documents. Advance the majority point. Insert five more documents. - const std::int32_t docsToInsert = 5; - { - for (std::int32_t num = 0; num < docsToInsert; ++num) { - WriteUnitOfWork wuow(_opCtx); - insertDocument(dataColl.getCollection(), - InsertStatement(BSON("_id" << num << "a" << num))); - wuow.commit(); - } - } - - repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); - const Timestamp firstFiveLastApplied = getLastApplied().getTimestamp(); - _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot( - firstFiveLastApplied); - { - for (std::int32_t num = docsToInsert; num < 2 * docsToInsert; ++num) { - WriteUnitOfWork wuow(_opCtx); - insertDocument(dataColl.getCollection(), - InsertStatement(BSON("_id" << num << "a" << num))); - wuow.commit(); - } - } - - // Disable the failpoint. - setGlobalFailPoint("addDestinedRecipient", - BSON("mode" - << "off")); - - repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); - const Timestamp latestLastApplied = getLastApplied().getTimestamp(); - - // The first call to `iterate` should return the first five inserts and return a - // `ReshardingDonorOplogId` matching the last applied of those five inserts. - ReshardingOplogFetcher fetcher(_reshardingUUID, - dataColl->uuid(), - {_fetchTimestamp, _fetchTimestamp}, - ShardId("fakeDonorShard"), - ShardId("shard1"), - true, - outputCollectionNss); - DBDirectClient client(_opCtx); - boost::optional<ReshardingDonorOplogId> donorOplogId = - fetcher.iterate(_opCtx, - &client, - createExpressionContext(), - {_fetchTimestamp, _fetchTimestamp}, - dataColl->uuid(), - {"shard1"}, - true, - outputCollectionNss); - - ASSERT(donorOplogId != boost::none); - ASSERT_EQ(docsToInsert, itcount(outputCollectionNss)); - ASSERT_EQ(firstFiveLastApplied, donorOplogId->getClusterTime()); - ASSERT_EQ(firstFiveLastApplied, donorOplogId->getTs()); - - // Advance the committed snapshot. A second `iterate` should return the second batch of five - // inserts. - _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot( - getLastApplied().getTimestamp()); - - donorOplogId = fetcher.iterate(_opCtx, - &client, - createExpressionContext(), - {firstFiveLastApplied, firstFiveLastApplied}, - dataColl->uuid(), - {"shard1"}, - true, - outputCollectionNss); - - ASSERT(donorOplogId != boost::none); - // Two batches of five inserts entry for the create collection oplog entry. - ASSERT_EQ((2 * docsToInsert), itcount(outputCollectionNss)); - ASSERT_EQ(latestLastApplied, donorOplogId->getClusterTime()); - ASSERT_EQ(latestLastApplied, donorOplogId->getTs()); - } -}; - -class RunConsume : public ReshardingTest { -public: - void run() { - const NamespaceString outputCollectionNss("dbtests.outputCollection"); - const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); - - const ShardId destinationShard("shard1"); - setupBasic(outputCollectionNss, dataCollectionNss, destinationShard); - - AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IS); - ReshardingOplogFetcher fetcher(_reshardingUUID, - dataColl->uuid(), - {_fetchTimestamp, _fetchTimestamp}, - ShardId("fakeDonorShard"), - destinationShard, - true, - outputCollectionNss); - DBDirectClient client(_opCtx); - fetcher.consume(&client); - - // Six oplog entries should be copied. Five inserts and the final no-op oplog entry. - ASSERT_EQ(6, fetcher.getNumOplogEntriesCopied()); - } -}; - -class InterruptConsume : public ReshardingTest { -public: - void run() { - const NamespaceString outputCollectionNss("dbtests.outputCollection"); - const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); - - const ShardId destinationShard("shard1"); - setupBasic(outputCollectionNss, dataCollectionNss, destinationShard); - - AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IS); - ReshardingOplogFetcher fetcher(_reshardingUUID, - dataColl->uuid(), - {_fetchTimestamp, _fetchTimestamp}, - ShardId("fakeDonorShard"), - destinationShard, - true, - outputCollectionNss); - - // Interrupt the fetcher. A fetcher object owns its own client, but interruption does not - // require the background job to be started. - fetcher.setKilled(); - - DBDirectClient client(_opCtx); - ASSERT_THROWS(fetcher.consume(&client), ExceptionForCat<ErrorCategory::Interruption>); - } -}; - -class AllReshardingTests : public unittest::OldStyleSuiteSpecification { -public: - AllReshardingTests() : unittest::OldStyleSuiteSpecification("ReshardingTests") {} - - // Must be evaluated at test run() time, not static-init time. - static bool shouldSkip() { - // Only run on storage engines that support snapshot reads. - auto storageEngine = cc().getServiceContext()->getStorageEngine(); - if (!storageEngine->supportsReadConcernSnapshot() || - !mongo::serverGlobalParams.enableMajorityReadConcern) { - LOGV2(5123009, - "Skipping this test because the configuration does not support majority reads.", - "storageEngine"_attr = storageGlobalParams.engine, - "enableMajorityReadConcern"_attr = - mongo::serverGlobalParams.enableMajorityReadConcern); - return true; - } - return false; - } - - template <typename T> - void addIf() { - addNameCallback(nameForTestClass<T>(), [] { - if (!shouldSkip()) - T().run(); - }); - } - - void setupTests() { - addIf<RunFetchIteration>(); - addIf<RunConsume>(); - addIf<InterruptConsume>(); - } -}; - -unittest::OldStyleSuiteInitializer<AllReshardingTests> allReshardingTests; - -} // namespace -} // namespace mongo diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h index 79f1a2b669c..82e45ef5e55 100644 --- a/src/mongo/s/client/shard.h +++ b/src/mongo/s/client/shard.h @@ -36,6 +36,7 @@ #include "mongo/client/read_preference.h" #include "mongo/db/logical_time.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/executor/remote_command_response.h" @@ -207,6 +208,18 @@ public: Milliseconds maxTimeMSOverride); /** + * Synchronously run the aggregation request, with a best effort honoring of request + * options. `callback` will be called with the batch contained in each response. `callback` + * should return `true` to execute another getmore. Returning `false` will send a + * `killCursors`. If the aggregation results are exhausted, there will be no additional calls to + * `callback`. + */ + virtual Status runAggregation( + OperationContext* opCtx, + const AggregationRequest& aggRequest, + std::function<bool(const std::vector<BSONObj>& batch)> callback) = 0; + + /** * Runs a write command against a shard. This is separate from runCommand, because write * commands return errors in a different format than regular commands do, so checking for * retriable errors must be done differently. diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 5254c24d37d..30ceec26868 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -281,7 +281,7 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand( const auto& data = dataStatus.getValue(); if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) { - // Sharding users of ReplSetMetadata do not require the wall clock time field to be set + // Sharding users of ReplSetMetadata do not require the wall clock time field to be set. auto replParseStatus = rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata); if (!replParseStatus.isOK()) { @@ -411,6 +411,91 @@ void ShardRemote::runFireAndForgetCommand(OperationContext* opCtx, .ignore(); } +Status ShardRemote::runAggregation( + OperationContext* opCtx, + const AggregationRequest& aggRequest, + std::function<bool(const std::vector<BSONObj>& batch)> callback) { + + BSONObj readPrefMetadata; + + ReadPreferenceSetting readPreference = + uassertStatusOK(ReadPreferenceSetting::fromContainingBSON( + aggRequest.getUnwrappedReadPref(), ReadPreference::SecondaryPreferred)); + + auto swHost = _targeter->findHost(opCtx, readPreference); + if (!swHost.isOK()) { + return swHost.getStatus(); + } + HostAndPort host = swHost.getValue(); + + BSONObjBuilder builder; + readPreference.toContainingBSON(&builder); + readPrefMetadata = builder.obj(); + + Status status = + Status(ErrorCodes::InternalError, "Internal error running cursor callback in command"); + auto fetcherCallback = [&status, callback](const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + // Throw out any accumulated results on error + if (!dataStatus.isOK()) { + status = dataStatus.getStatus(); + return; + } + + const auto& data = dataStatus.getValue(); + + if (data.otherFields.metadata.hasField(rpc::kReplSetMetadataFieldName)) { + // Sharding users of ReplSetMetadata do not require the wall clock time field to be set. + auto replParseStatus = + rpc::ReplSetMetadata::readFromMetadata(data.otherFields.metadata); + if (!replParseStatus.isOK()) { + status = replParseStatus.getStatus(); + return; + } + } + + if (!callback(data.documents)) { + *nextAction = Fetcher::NextAction::kNoAction; + } + + status = Status::OK(); + + if (!getMoreBob) { + return; + } + getMoreBob->append("getMore", data.cursorId); + getMoreBob->append("collection", data.nss.coll()); + }; + + Milliseconds requestTimeout(-1); + if (aggRequest.getMaxTimeMS()) { + requestTimeout = Milliseconds(aggRequest.getMaxTimeMS()); + } + + auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + Fetcher fetcher(executor.get(), + host, + aggRequest.getNamespaceString().db().toString(), + aggRequest.serializeToCommandObj().toBson(), + fetcherCallback, + readPrefMetadata, + requestTimeout, /* command network timeout */ + requestTimeout /* getMore network timeout */); + + Status scheduleStatus = fetcher.schedule(); + if (!scheduleStatus.isOK()) { + return scheduleStatus; + } + + fetcher.join(); + + updateReplSetMonitor(host, status); + + return status; +} + + StatusWith<ShardRemote::AsyncCmdHandle> ShardRemote::_scheduleCommand( OperationContext* opCtx, const ReadPreferenceSetting& readPref, diff --git a/src/mongo/s/client/shard_remote.h b/src/mongo/s/client/shard_remote.h index eb0c547c97f..e7ca9004112 100644 --- a/src/mongo/s/client/shard_remote.h +++ b/src/mongo/s/client/shard_remote.h @@ -85,6 +85,10 @@ public: const std::string& dbName, const BSONObj& cmdObj) final; + Status runAggregation(OperationContext* opCtx, + const AggregationRequest& aggRequest, + std::function<bool(const std::vector<BSONObj>& batch)> callback); + private: struct AsyncCmdHandle { HostAndPort hostTargetted; diff --git a/src/mongo/unittest/unittest.h b/src/mongo/unittest/unittest.h index 2ef1d7cd77e..76563290fff 100644 --- a/src/mongo/unittest/unittest.h +++ b/src/mongo/unittest/unittest.h @@ -49,6 +49,7 @@ #include "mongo/base/status_with.h" #include "mongo/base/string_data.h" +#include "mongo/logv2/log_debug.h" #include "mongo/logv2/log_detail.h" #include "mongo/unittest/bson_test_util.h" #include "mongo/unittest/unittest_helpers.h" |