diff options
author | Uladzimir Makouski <uladzimir.makouski@mongodb.com> | 2020-11-25 07:45:36 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-25 09:53:58 +0000 |
commit | 7b92716c97e05c09f624513b03bd3eb7663aa537 (patch) | |
tree | 8691a9414e1be23f8187263bfbca191d05766b9e /src/mongo/db/s/resharding | |
parent | 1c5ab129a97535162db665db97effcc1ead0ad22 (diff) | |
download | mongo-7b92716c97e05c09f624513b03bd3eb7663aa537.tar.gz |
Revert "SERVER-51245: Have resharding oplog fetching use a Fetcher."
This reverts commit cda3a52701fe4143b06bd981b98514e69d0a93eb.
Diffstat (limited to 'src/mongo/db/s/resharding')
3 files changed, 153 insertions, 576 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index e0bd954b8c1..2dce0fee77b 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -87,71 +87,88 @@ ReshardingOplogFetcher::ReshardingOplogFetcher(UUID reshardingUUID, _donorShard(donorShard), _recipientShard(recipientShard), _doesDonorOwnMinKeyChunk(doesDonorOwnMinKeyChunk), - _toWriteInto(toWriteInto) {} + _toWriteInto(toWriteInto), + _client(getGlobalServiceContext()->makeClient( + fmt::format("OplogFetcher-{}-{}", reshardingUUID.toString(), donorShard.toString()))) {} -Future<void> ReshardingOplogFetcher::schedule(executor::TaskExecutor* executor) { - auto pf = makePromiseFuture<void>(); - _fetchedFinishPromise = std::move(pf.promise); - - _reschedule(executor); +void ReshardingOplogFetcher::consume(DBClientBase* conn) { + while (true) { + auto opCtxRaii = _client->makeOperationContext(); + opCtxRaii->checkForInterrupt(); + auto expCtx = _makeExpressionContext(opCtxRaii.get()); + boost::optional<ReshardingDonorOplogId> restartAt = iterate(opCtxRaii.get(), + conn, + expCtx, + _startAt, + _collUUID, + _recipientShard, + _doesDonorOwnMinKeyChunk, + _toWriteInto); + if (!restartAt) { + return; + } + _startAt = restartAt.get(); + } +} - return std::move(pf.future); +void ReshardingOplogFetcher::setKilled() { + _isAlive.store(false); + _client->setKilled(); } -void ReshardingOplogFetcher::_reschedule(executor::TaskExecutor* executor) { +void ReshardingOplogFetcher::schedule(executor::TaskExecutor* executor) { executor->schedule([this, executor](Status status) { - ThreadClient client( - fmt::format("OplogFetcher-{}-{}", _reshardingUUID.toString(), _donorShard.toString()), - getGlobalServiceContext()); if (!status.isOK()) { - LOGV2_INFO(5192101, "Resharding oplog fetcher aborting.", "reason"_attr = status); - _fetchedFinishPromise.setError(status); return; } - try { - if (iterate(client.get())) { - _reschedule(executor); - } else { - _fetchedFinishPromise.emplaceValue(); - } - } catch (...) { - LOGV2_INFO(5192102, "Error.", "reason"_attr = exceptionToStatus()); - _fetchedFinishPromise.setError(exceptionToStatus()); + if (_runTask()) { + schedule(executor); } }); } -bool ReshardingOplogFetcher::iterate(Client* client) { - std::shared_ptr<Shard> targetShard; - { - auto opCtxRaii = client->makeOperationContext(); - opCtxRaii->checkForInterrupt(); +bool ReshardingOplogFetcher::_runTask() { + auto opCtxRaii = _client->makeOperationContext(); + opCtxRaii->checkForInterrupt(); + + const Seconds maxStaleness(10); + ReadPreferenceSetting readPref(ReadPreference::Nearest, maxStaleness); + StatusWith<std::shared_ptr<Shard>> swDonor = + Grid::get(opCtxRaii.get())->shardRegistry()->getShard(opCtxRaii.get(), _donorShard); + if (!swDonor.isOK()) { + LOGV2_WARNING(5127203, + "Error finding shard in registry, retrying.", + "error"_attr = swDonor.getStatus()); + return true; + } - const Seconds maxStaleness(10); - ReadPreferenceSetting readPref(ReadPreference::Nearest, maxStaleness); - StatusWith<std::shared_ptr<Shard>> swDonor = - Grid::get(opCtxRaii.get())->shardRegistry()->getShard(opCtxRaii.get(), _donorShard); - if (!swDonor.isOK()) { - LOGV2_WARNING(5127203, - "Error finding shard in registry, retrying.", - "error"_attr = swDonor.getStatus()); - return true; - } - targetShard = swDonor.getValue(); + StatusWith<HostAndPort> swTargettedDonor = + swDonor.getValue()->getTargeter()->findHost(opCtxRaii.get(), readPref); + if (!swTargettedDonor.isOK()) { + LOGV2_WARNING(5127202, + "Error targetting donor, retrying.", + "error"_attr = swTargettedDonor.getStatus()); + return true; } + DBClientConnection donorConn; + if (auto status = donorConn.connect(swTargettedDonor.getValue(), "ReshardingOplogFetching"_sd); + !status.isOK()) { + LOGV2_WARNING(5127201, "Failed connecting to donor, retrying.", "error"_attr = status); + return true; + } + // Reset the OpCtx so consuming can manage short-lived OpCtx lifetimes with the current client. + opCtxRaii.reset(); + try { // Consume will throw if there's oplog entries to be copied. It only returns cleanly when // the final oplog has been seen and copied. - consume(client, targetShard.get()); + consume(&donorConn); return false; } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { + _isAlive.store(false); return false; - } catch (const ExceptionFor<ErrorCodes::OplogQueryMinTsMissing>&) { - LOGV2_ERROR( - 5192103, "Fatal resharding error while fetching.", "error"_attr = exceptionToStatus()); - throw; } catch (const DBException&) { LOGV2_WARNING( 5127200, "Error while fetching, retrying.", "error"_attr = exceptionToStatus()); @@ -159,99 +176,73 @@ bool ReshardingOplogFetcher::iterate(Client* client) { } } -void ReshardingOplogFetcher::_ensureCollection(Client* client, const NamespaceString nss) { - auto opCtxRaii = client->makeOperationContext(); - auto opCtx = opCtxRaii.get(); +boost::optional<ReshardingDonorOplogId> ReshardingOplogFetcher::iterate( + OperationContext* opCtx, + DBClientBase* conn, + boost::intrusive_ptr<ExpressionContext> expCtx, + const ReshardingDonorOplogId startAfter, + const UUID collUUID, + const ShardId& recipientShard, + const bool doesDonorOwnMinKeyChunk, + const NamespaceString toWriteToNss) { + // This method will use the input opCtx to perform writes into `toWriteToNss`. invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Create the destination collection if necessary. - writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", nss.toString(), [&] { - const CollectionPtr coll = - CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss); - if (coll) { + writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", toWriteToNss.toString(), [&] { + const CollectionPtr toWriteTo = + CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, toWriteToNss); + if (toWriteTo) { return; } WriteUnitOfWork wuow(opCtx); - AutoGetOrCreateDb db(opCtx, nss.db(), LockMode::MODE_IX); - Lock::CollectionLock collLock(opCtx, nss, MODE_IX); - db.getDb()->createCollection(opCtx, nss); + AutoGetOrCreateDb db(opCtx, toWriteToNss.db(), LockMode::MODE_IX); + Lock::CollectionLock collLock(opCtx, toWriteToNss, MODE_IX); + db.getDb()->createCollection(opCtx, toWriteToNss); wuow.commit(); }); -} -std::vector<BSONObj> ReshardingOplogFetcher::_makePipeline(Client* client) { - auto opCtxRaii = client->makeOperationContext(); - auto opCtx = opCtxRaii.get(); - auto expCtx = _makeExpressionContext(opCtx); - - return createOplogFetchingPipelineForResharding( - expCtx, _startAt, _collUUID, _recipientShard, _doesDonorOwnMinKeyChunk) - ->serializeToBson(); -} - -void ReshardingOplogFetcher::consume(Client* client, Shard* shard) { - _ensureCollection(client, _toWriteInto); - std::vector<BSONObj> serializedPipeline = _makePipeline(client); + std::vector<BSONObj> serializedPipeline = + createOplogFetchingPipelineForResharding( + expCtx, startAfter, collUUID, recipientShard, doesDonorOwnMinKeyChunk) + ->serializeToBson(); AggregationRequest aggRequest(NamespaceString::kRsOplogNamespace, serializedPipeline); - if (_useReadConcern) { - auto readConcernArgs = repl::ReadConcernArgs( - boost::optional<LogicalTime>(_startAt.getTs()), - boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern)); - aggRequest.setReadConcern(readConcernArgs.toBSONInner()); - } - + auto readConcernArgs = repl::ReadConcernArgs( + boost::optional<LogicalTime>(startAfter.getTs()), + boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern)); + aggRequest.setReadConcern(readConcernArgs.toBSONInner()); aggRequest.setHint(BSON("$natural" << 1)); - aggRequest.setRequestReshardingResumeToken(true); - - if (_initialBatchSize) { - aggRequest.setBatchSize(_initialBatchSize); - } - auto opCtxRaii = client->makeOperationContext(); - int batchesProcessed = 0; - auto svcCtx = client->getServiceContext(); - uassertStatusOK(shard->runAggregation( - opCtxRaii.get(), - aggRequest, - [this, svcCtx, &batchesProcessed](const std::vector<BSONObj>& batch) { - ThreadClient client(fmt::format("ReshardingFetcher-{}-{}", - _reshardingUUID.toString(), - _donorShard.toString()), - svcCtx, - nullptr); - auto opCtxRaii = cc().makeOperationContext(); - auto opCtx = opCtxRaii.get(); - - // Noting some possible optimizations: - // - // * Batch more inserts into larger storage transactions. - // * Parallize writing documents across multiple threads. - // * Doing either of the above while still using the underlying message buffer of bson - // objects. - AutoGetCollection toWriteTo(opCtx, _toWriteInto, LockMode::MODE_IX); - for (const BSONObj& doc : batch) { - WriteUnitOfWork wuow(opCtx); - auto nextOplog = uassertStatusOK(repl::OplogEntry::parse(doc)); - - _startAt = ReshardingDonorOplogId::parse( - {"OplogFetcherParsing"}, nextOplog.get_id()->getDocument().toBson()); - uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{doc}, nullptr)); - wuow.commit(); - ++_numOplogEntriesCopied; + const bool secondaryOk = true; + const bool useExhaust = true; + std::unique_ptr<DBClientCursor> cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest( + conn, std::move(aggRequest), secondaryOk, useExhaust)); + + // Noting some possible optimizations: + // + // * Batch more inserts into larger storage transactions. + // * Parallize writing documents across multiple threads. + // * Doing either of the above while still using the underlying message buffer of bson objects. + AutoGetCollection toWriteTo(opCtx, toWriteToNss, LockMode::MODE_IX); + ReshardingDonorOplogId lastSeen = startAfter; + while (cursor->more() && _isAlive.load()) { + WriteUnitOfWork wuow(opCtx); + BSONObj obj = cursor->next(); + auto nextOplog = uassertStatusOK(repl::OplogEntry::parse(obj)); - if (isFinalOplog(nextOplog, _reshardingUUID)) { - return false; - } - } + lastSeen = ReshardingDonorOplogId::parse({"OplogFetcherParsing"}, + nextOplog.get_id()->getDocument().toBson()); + uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{obj}, nullptr)); + wuow.commit(); + ++_numOplogEntriesCopied; - if (_maxBatches > -1 && ++batchesProcessed >= _maxBatches) { - return false; - } + if (isFinalOplog(nextOplog, _reshardingUUID)) { + return boost::none; + } + } - return true; - })); + return lastSeen; } - } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h index a5c06225b07..aeb198dfabf 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h @@ -37,7 +37,6 @@ #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/s/resharding/donor_oplog_id_gen.h" #include "mongo/db/service_context.h" -#include "mongo/s/client/shard.h" #include "mongo/s/shard_id.h" #include "mongo/util/background.h" #include "mongo/util/uuid.h" @@ -60,50 +59,61 @@ public: * - Send an aggregation request + getMores until either: * -- The "final resharding" oplog entry is found. * -- An interruption occurs. - * -- The fetcher concludes it's fallen off the oplog. * -- A different error occurs. * - * In the first two circumstances, the task will terminate. If the fetcher has fallen off the - * oplog, this is thrown as a fatal resharding exception. In the last circumstance, the task - * will be rescheduled in a way that resumes where it had left off from. + * In the first two circumstances, the task will return. In the last circumstance, the task will + * be rescheduled in a way that resumes where it had left off from. */ - Future<void> schedule(executor::TaskExecutor* executor); + void schedule(executor::TaskExecutor* exector); /** - * Given a shard, fetches and copies oplog entries until reaching an error, or coming + * Given a connection, fetches and copies oplog entries until reaching an error, or coming * across a sentinel finish oplog entry. Throws if there's more oplog entries to be copied. */ - void consume(Client* client, Shard* shard); + void consume(DBClientBase* conn); - bool iterate(Client* client); + /** + * Kill the underlying client the BackgroundJob is using to expedite cleaning up resources when + * the output is no longer necessary. The underlying `toWriteInto` collection is left intact, + * though likely incomplete. + */ + void setKilled(); - int getNumOplogEntriesCopied() const { + /** + * Returns boost::none if the last oplog entry to be copied is found. Otherwise returns the + * ReshardingDonorOplogId to resume querying from. + * + * Issues an aggregation to `DBClientBase`s starting at `startAfter` and copies the entries + * relevant to `recipientShard` into `toWriteInto`. Control is returned when the aggregation + * cursor is exhausted. + * + * Returns an identifier for the last oplog-ish document written to `toWriteInto`. + * + * This method throws. + * + * TODO SERVER-51245 Replace `DBClientBase` with a `Shard`. Right now `Shard` does not do things + * like perform aggregate commands nor does it expose a cursor/stream interface. However, using + * a `Shard` object will provide critical behavior such as advancing logical clock values on a + * response and targetting a node to send the aggregation command to. + */ + boost::optional<ReshardingDonorOplogId> iterate(OperationContext* opCtx, + DBClientBase* conn, + boost::intrusive_ptr<ExpressionContext> expCtx, + ReshardingDonorOplogId startAfter, + UUID collUUID, + const ShardId& recipientShard, + bool doesDonorOwnMinKeyChunk, + NamespaceString toWriteInto); + + int getNumOplogEntriesCopied() { return _numOplogEntriesCopied; } - ReshardingDonorOplogId getLastSeenTimestamp() const { - return _startAt; - } - - void setInitialBatchSizeForTest(int size) { - _initialBatchSize = size; - } - - void useReadConcernForTest(bool use) { - _useReadConcern = use; - } - - void setMaxBatchesForTest(int maxBatches) { - _maxBatches = maxBatches; - } - private: /** * Returns true if there's more work to do and the task should be rescheduled. */ - void _ensureCollection(Client* client, const NamespaceString nss); - std::vector<BSONObj> _makePipeline(Client* client); - void _reschedule(executor::TaskExecutor* executor); + bool _runTask(); const UUID _reshardingUUID; const UUID _collUUID; @@ -113,16 +123,9 @@ private: const bool _doesDonorOwnMinKeyChunk; const NamespaceString _toWriteInto; - Promise<void> _fetchedFinishPromise; - int _numOplogEntriesCopied = 0; - - // For testing to control behavior. + ServiceContext::UniqueClient _client; + AtomicWord<bool> _isAlive{true}; - // The aggregation batch size. This only affects the original call and not `getmore`s. - int _initialBatchSize = 0; - // Setting to false will omit the `afterClusterTime` and `majority` read concern. - bool _useReadConcern = true; - // Dictates how many batches get processed before returning control from a call to `consume`. - int _maxBatches = -1; + int _numOplogEntriesCopied = 0; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp deleted file mode 100644 index f91a8a5996c..00000000000 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ /dev/null @@ -1,417 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - -#include "mongo/platform/basic.h" - -#include <vector> - -#include "mongo/bson/bsonobj.h" -#include "mongo/db/client.h" -#include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/logical_session_cache_noop.h" -#include "mongo/db/op_observer_impl.h" -#include "mongo/db/pipeline/document_source_mock.h" -#include "mongo/db/repl/storage_interface_impl.h" -#include "mongo/db/repl/wait_for_majority_service.h" -#include "mongo/db/s/resharding/resharding_oplog_fetcher.h" -#include "mongo/db/s/resharding_util.h" -#include "mongo/db/s/shard_server_test_fixture.h" -#include "mongo/db/service_context.h" -#include "mongo/db/session_catalog_mongod.h" -#include "mongo/db/storage/write_unit_of_work.h" -#include "mongo/s/catalog/sharding_catalog_client_mock.h" -#include "mongo/s/catalog/type_shard.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace { - -using namespace unittest; - -/** - * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs. - */ -class OneOffRead { -public: - OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) { - _opCtx->recoveryUnit()->abandonSnapshot(); - if (ts.isNull()) { - _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); - } else { - _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts); - } - } - - ~OneOffRead() { - _opCtx->recoveryUnit()->abandonSnapshot(); - _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); - } - -private: - OperationContext* _opCtx; -}; - -class ReshardingOplogFetcherTest : public ShardServerTestFixture { -public: - OperationContext* _opCtx; - ServiceContext* _svcCtx; - UUID _reshardingUUID = UUID::gen(); - Timestamp _fetchTimestamp; - ShardId _donorShard; - ShardId _destinationShard; - - void setUp() { - ShardServerTestFixture::setUp(); - _opCtx = operationContext(); - _svcCtx = _opCtx->getServiceContext(); - - for (const auto& shardId : kTwoShardIdList) { - auto shardTargeter = RemoteCommandTargeterMock::get( - uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId)) - ->getTargeter()); - shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId)); - } - - WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); - - // onStepUp() relies on the storage interface to create the config.transactions table. - repl::StorageInterface::set(getServiceContext(), - std::make_unique<repl::StorageInterfaceImpl>()); - MongoDSessionCatalog::onStepUp(operationContext()); - LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>()); - _fetchTimestamp = queryOplog(BSONObj())["ts"].timestamp(); - - _donorShard = kTwoShardIdList[0]; - _destinationShard = kTwoShardIdList[1]; - } - - void tearDown() { - WaitForMajorityService::get(getServiceContext()).shutDown(); - ShardServerTestFixture::tearDown(); - } - - /** - * Override the CatalogClient to make CatalogClient::getAllShards automatically return the - * expected shards. We cannot mock the network responses for the ShardRegistry reload, since the - * ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no - * DBClientMock analogous to the NetworkInterfaceMock. - */ - std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( - std::unique_ptr<DistLockManager> distLockManager) { - - class StaticCatalogClient final : public ShardingCatalogClientMock { - public: - StaticCatalogClient(std::vector<ShardId> shardIds) - : ShardingCatalogClientMock(nullptr), _shardIds(std::move(shardIds)) {} - - StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( - OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { - std::vector<ShardType> shardTypes; - for (const auto& shardId : _shardIds) { - const ConnectionString cs = ConnectionString::forReplicaSet( - shardId.toString(), {makeHostAndPort(shardId)}); - ShardType sType; - sType.setName(cs.getSetName()); - sType.setHost(cs.toString()); - shardTypes.push_back(std::move(sType)); - }; - return repl::OpTimeWith<std::vector<ShardType>>(shardTypes); - } - - private: - const std::vector<ShardId> _shardIds; - }; - - return std::make_unique<StaticCatalogClient>(kTwoShardIdList); - } - - void insertDocument(const CollectionPtr& coll, const InsertStatement& stmt) { - // Insert some documents. - OpDebug* const nullOpDebug = nullptr; - const bool fromMigrate = false; - ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, fromMigrate)); - } - - BSONObj queryCollection(NamespaceString nss, const BSONObj& query) { - BSONObj ret; - ASSERT_TRUE(Helpers::findOne( - _opCtx, AutoGetCollectionForRead(_opCtx, nss).getCollection(), query, ret)) - << "Query: " << query; - return ret; - } - - BSONObj queryOplog(const BSONObj& query) { - OneOffRead oor(_opCtx, Timestamp::min()); - return queryCollection(NamespaceString::kRsOplogNamespace, query); - } - - repl::OpTime getLastApplied() { - return repl::ReplicationCoordinator::get(_opCtx)->getMyLastAppliedOpTime(); - } - - boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext() { - NamespaceString slimNss = - NamespaceString("local.system.resharding.slimOplogForGraphLookup"); - - boost::intrusive_ptr<ExpressionContextForTest> expCtx( - new ExpressionContextForTest(_opCtx, NamespaceString::kRsOplogNamespace)); - expCtx->setResolvedNamespace(NamespaceString::kRsOplogNamespace, - {NamespaceString::kRsOplogNamespace, {}}); - expCtx->setResolvedNamespace(slimNss, - {slimNss, std::vector<BSONObj>{getSlimOplogPipeline()}}); - return expCtx; - } - - int itcount(NamespaceString nss) { - OneOffRead oof(_opCtx, Timestamp::min()); - AutoGetCollectionForRead autoColl(_opCtx, nss); - auto cursor = autoColl.getCollection()->getCursor(_opCtx); - - int ret = 0; - while (auto rec = cursor->next()) { - ++ret; - } - - return ret; - } - - void create(NamespaceString nss) { - writeConflictRetry(_opCtx, "create", nss.ns(), [&] { - AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X); - WriteUnitOfWork wunit(_opCtx); - if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { - ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1))); - } - invariant(dbRaii.getDb()->createCollection(_opCtx, nss)); - wunit.commit(); - }); - } - - template <typename T> - T requestPassthroughHandler(executor::NetworkTestEnv::FutureHandle<T>& future, - int maxBatches = -1) { - - int maxNumRequests = 1000; // No unittests would request more than this? - if (maxBatches > -1) { - // The fetcher will send a `killCursors` after the last `getMore`. - maxNumRequests = maxBatches + 1; - } - - bool hasMore = true; - for (int batchNum = 0; hasMore && batchNum < maxNumRequests; ++batchNum) { - onCommand([&](const executor::RemoteCommandRequest& request) -> StatusWith<BSONObj> { - DBDirectClient client(cc().getOperationContext()); - BSONObj result; - bool res = client.runCommand(request.dbname, request.cmdObj, result); - if (res == false || result.hasField("cursorsKilled") || - result["cursor"]["id"].Long() == 0) { - hasMore = false; - } - - return result; - }); - } - - return future.timed_get(Seconds(5)); - } - - // Writes five documents to `dataCollectionNss` that are replicated with a `destinedRecipient` - // followed by the final no-op oplog entry that signals the last oplog entry needed to be - // applied for resharding to move to the next stage. - void setupBasic(NamespaceString outputCollectionNss, - NamespaceString dataCollectionNss, - ShardId destinedRecipient) { - create(outputCollectionNss); - create(dataCollectionNss); - - _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); - - AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); - - // Set a failpoint to tack a `destinedRecipient` onto oplog entries. - setGlobalFailPoint("addDestinedRecipient", - BSON("mode" - << "alwaysOn" - << "data" - << BSON("destinedRecipient" << destinedRecipient.toString()))); - - // Insert five documents. Advance the majority point. - const std::int32_t docsToInsert = 5; - { - for (std::int32_t num = 0; num < docsToInsert; ++num) { - WriteUnitOfWork wuow(_opCtx); - insertDocument(dataColl.getCollection(), - InsertStatement(BSON("_id" << num << "a" << num))); - wuow.commit(); - } - } - - // Write an entry saying that fetching is complete. - { - WriteUnitOfWork wuow(_opCtx); - _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage( - _opCtx, - dataColl.getCollection()->ns(), - dataColl.getCollection()->uuid(), - BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.", - dataColl.getCollection()->ns().toString())), - BSON("type" - << "reshardFinalOp" - << "reshardingUUID" << _reshardingUUID), - boost::none, - boost::none, - boost::none, - boost::none); - wuow.commit(); - } - - repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); - - // Disable the failpoint. - setGlobalFailPoint("addDestinedRecipient", - BSON("mode" - << "off")); - } - -protected: - const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}}; - -private: - static HostAndPort makeHostAndPort(const ShardId& shardId) { - return HostAndPort(str::stream() << shardId << ":123"); - } -}; - -TEST_F(ReshardingOplogFetcherTest, TestBasic) { - const NamespaceString outputCollectionNss("dbtests.outputCollection"); - const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); - - setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard); - - AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); - auto fetcherJob = launchAsync([&, this] { - Client::initThread("RefetchRunner", _svcCtx, nullptr); - ReshardingOplogFetcher fetcher(_reshardingUUID, - dataColl->uuid(), - {_fetchTimestamp, _fetchTimestamp}, - _donorShard, - _destinationShard, - true, - outputCollectionNss); - fetcher.useReadConcernForTest(false); - fetcher.setInitialBatchSizeForTest(2); - - fetcher.iterate(&cc()); - }); - - requestPassthroughHandler(fetcherJob); - - // Five oplog entries for resharding + the sentinel final oplog entry. - ASSERT_EQ(6, itcount(outputCollectionNss)); -} - -TEST_F(ReshardingOplogFetcherTest, TestTrackLastSeen) { - const NamespaceString outputCollectionNss("dbtests.outputCollection"); - const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); - - setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard); - - AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); - - const int maxBatches = 1; - auto fetcherJob = launchAsync([&, this] { - Client::initThread("RefetcherRunner", _svcCtx, nullptr); - - ReshardingOplogFetcher fetcher(_reshardingUUID, - dataColl->uuid(), - {_fetchTimestamp, _fetchTimestamp}, - _donorShard, - _destinationShard, - true, - outputCollectionNss); - fetcher.useReadConcernForTest(false); - fetcher.setInitialBatchSizeForTest(2); - fetcher.setMaxBatchesForTest(maxBatches); - - fetcher.iterate(&cc()); - return fetcher.getLastSeenTimestamp(); - }); - - ReshardingDonorOplogId lastSeen = requestPassthroughHandler(fetcherJob, maxBatches); - - // Two oplog entries due to the batch size. - ASSERT_EQ(2, itcount(outputCollectionNss)); - // Assert the lastSeen value has been bumped from the original `_fetchTimestamp`. - ASSERT_GT(lastSeen.getTs(), _fetchTimestamp); -} - -TEST_F(ReshardingOplogFetcherTest, TestFallingOffOplog) { - const NamespaceString outputCollectionNss("dbtests.outputCollection"); - const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); - - setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard); - - AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); - - auto fetcherJob = launchAsync([&, this] { - Client::initThread("RefetcherRunner", _svcCtx, nullptr); - - const Timestamp doesNotExist(1, 1); - ReshardingOplogFetcher fetcher(_reshardingUUID, - dataColl->uuid(), - {doesNotExist, doesNotExist}, - _donorShard, - _destinationShard, - true, - outputCollectionNss); - fetcher.useReadConcernForTest(false); - - try { - fetcher.iterate(&cc()); - // Test failure case. - return Status::OK(); - } catch (...) { - return exceptionToStatus(); - } - }); - - auto fetcherStatus = requestPassthroughHandler(fetcherJob); - - // Two oplog entries due to the batch size. - ASSERT_EQ(0, itcount(outputCollectionNss)); - ASSERT_EQ(ErrorCodes::OplogQueryMinTsMissing, fetcherStatus.code()); -} - -} // namespace -} // namespace mongo |