diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp | 417 |
1 files changed, 417 insertions, 0 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp new file mode 100644 index 00000000000..f91a8a5996c --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -0,0 +1,417 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include <vector> + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/client.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/logical_session_cache_noop.h" +#include "mongo/db/op_observer_impl.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/s/resharding/resharding_oplog_fetcher.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/db/service_context.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/storage/write_unit_of_work.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using namespace unittest; + +/** + * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs. + */ +class OneOffRead { +public: + OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) { + _opCtx->recoveryUnit()->abandonSnapshot(); + if (ts.isNull()) { + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + } else { + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts); + } + } + + ~OneOffRead() { + _opCtx->recoveryUnit()->abandonSnapshot(); + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + } + +private: + OperationContext* _opCtx; +}; + +class ReshardingOplogFetcherTest : public ShardServerTestFixture { +public: + OperationContext* _opCtx; + ServiceContext* _svcCtx; + UUID _reshardingUUID = UUID::gen(); + Timestamp _fetchTimestamp; + ShardId _donorShard; + ShardId _destinationShard; + + void setUp() { + ShardServerTestFixture::setUp(); + _opCtx = operationContext(); + _svcCtx = _opCtx->getServiceContext(); + + for (const auto& shardId : kTwoShardIdList) { + auto shardTargeter = RemoteCommandTargeterMock::get( + uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId)) + ->getTargeter()); + shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId)); + } + + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + + // onStepUp() relies on the storage interface to create the config.transactions table. + repl::StorageInterface::set(getServiceContext(), + std::make_unique<repl::StorageInterfaceImpl>()); + MongoDSessionCatalog::onStepUp(operationContext()); + LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>()); + _fetchTimestamp = queryOplog(BSONObj())["ts"].timestamp(); + + _donorShard = kTwoShardIdList[0]; + _destinationShard = kTwoShardIdList[1]; + } + + void tearDown() { + WaitForMajorityService::get(getServiceContext()).shutDown(); + ShardServerTestFixture::tearDown(); + } + + /** + * Override the CatalogClient to make CatalogClient::getAllShards automatically return the + * expected shards. We cannot mock the network responses for the ShardRegistry reload, since the + * ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no + * DBClientMock analogous to the NetworkInterfaceMock. + */ + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) { + + class StaticCatalogClient final : public ShardingCatalogClientMock { + public: + StaticCatalogClient(std::vector<ShardId> shardIds) + : ShardingCatalogClientMock(nullptr), _shardIds(std::move(shardIds)) {} + + StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( + OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { + std::vector<ShardType> shardTypes; + for (const auto& shardId : _shardIds) { + const ConnectionString cs = ConnectionString::forReplicaSet( + shardId.toString(), {makeHostAndPort(shardId)}); + ShardType sType; + sType.setName(cs.getSetName()); + sType.setHost(cs.toString()); + shardTypes.push_back(std::move(sType)); + }; + return repl::OpTimeWith<std::vector<ShardType>>(shardTypes); + } + + private: + const std::vector<ShardId> _shardIds; + }; + + return std::make_unique<StaticCatalogClient>(kTwoShardIdList); + } + + void insertDocument(const CollectionPtr& coll, const InsertStatement& stmt) { + // Insert some documents. + OpDebug* const nullOpDebug = nullptr; + const bool fromMigrate = false; + ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, fromMigrate)); + } + + BSONObj queryCollection(NamespaceString nss, const BSONObj& query) { + BSONObj ret; + ASSERT_TRUE(Helpers::findOne( + _opCtx, AutoGetCollectionForRead(_opCtx, nss).getCollection(), query, ret)) + << "Query: " << query; + return ret; + } + + BSONObj queryOplog(const BSONObj& query) { + OneOffRead oor(_opCtx, Timestamp::min()); + return queryCollection(NamespaceString::kRsOplogNamespace, query); + } + + repl::OpTime getLastApplied() { + return repl::ReplicationCoordinator::get(_opCtx)->getMyLastAppliedOpTime(); + } + + boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext() { + NamespaceString slimNss = + NamespaceString("local.system.resharding.slimOplogForGraphLookup"); + + boost::intrusive_ptr<ExpressionContextForTest> expCtx( + new ExpressionContextForTest(_opCtx, NamespaceString::kRsOplogNamespace)); + expCtx->setResolvedNamespace(NamespaceString::kRsOplogNamespace, + {NamespaceString::kRsOplogNamespace, {}}); + expCtx->setResolvedNamespace(slimNss, + {slimNss, std::vector<BSONObj>{getSlimOplogPipeline()}}); + return expCtx; + } + + int itcount(NamespaceString nss) { + OneOffRead oof(_opCtx, Timestamp::min()); + AutoGetCollectionForRead autoColl(_opCtx, nss); + auto cursor = autoColl.getCollection()->getCursor(_opCtx); + + int ret = 0; + while (auto rec = cursor->next()) { + ++ret; + } + + return ret; + } + + void create(NamespaceString nss) { + writeConflictRetry(_opCtx, "create", nss.ns(), [&] { + AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X); + WriteUnitOfWork wunit(_opCtx); + if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { + ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1))); + } + invariant(dbRaii.getDb()->createCollection(_opCtx, nss)); + wunit.commit(); + }); + } + + template <typename T> + T requestPassthroughHandler(executor::NetworkTestEnv::FutureHandle<T>& future, + int maxBatches = -1) { + + int maxNumRequests = 1000; // No unittests would request more than this? + if (maxBatches > -1) { + // The fetcher will send a `killCursors` after the last `getMore`. + maxNumRequests = maxBatches + 1; + } + + bool hasMore = true; + for (int batchNum = 0; hasMore && batchNum < maxNumRequests; ++batchNum) { + onCommand([&](const executor::RemoteCommandRequest& request) -> StatusWith<BSONObj> { + DBDirectClient client(cc().getOperationContext()); + BSONObj result; + bool res = client.runCommand(request.dbname, request.cmdObj, result); + if (res == false || result.hasField("cursorsKilled") || + result["cursor"]["id"].Long() == 0) { + hasMore = false; + } + + return result; + }); + } + + return future.timed_get(Seconds(5)); + } + + // Writes five documents to `dataCollectionNss` that are replicated with a `destinedRecipient` + // followed by the final no-op oplog entry that signals the last oplog entry needed to be + // applied for resharding to move to the next stage. + void setupBasic(NamespaceString outputCollectionNss, + NamespaceString dataCollectionNss, + ShardId destinedRecipient) { + create(outputCollectionNss); + create(dataCollectionNss); + + _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + + // Set a failpoint to tack a `destinedRecipient` onto oplog entries. + setGlobalFailPoint("addDestinedRecipient", + BSON("mode" + << "alwaysOn" + << "data" + << BSON("destinedRecipient" << destinedRecipient.toString()))); + + // Insert five documents. Advance the majority point. + const std::int32_t docsToInsert = 5; + { + for (std::int32_t num = 0; num < docsToInsert; ++num) { + WriteUnitOfWork wuow(_opCtx); + insertDocument(dataColl.getCollection(), + InsertStatement(BSON("_id" << num << "a" << num))); + wuow.commit(); + } + } + + // Write an entry saying that fetching is complete. + { + WriteUnitOfWork wuow(_opCtx); + _opCtx->getServiceContext()->getOpObserver()->onInternalOpMessage( + _opCtx, + dataColl.getCollection()->ns(), + dataColl.getCollection()->uuid(), + BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.", + dataColl.getCollection()->ns().toString())), + BSON("type" + << "reshardFinalOp" + << "reshardingUUID" << _reshardingUUID), + boost::none, + boost::none, + boost::none, + boost::none); + wuow.commit(); + } + + repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); + + // Disable the failpoint. + setGlobalFailPoint("addDestinedRecipient", + BSON("mode" + << "off")); + } + +protected: + const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}}; + +private: + static HostAndPort makeHostAndPort(const ShardId& shardId) { + return HostAndPort(str::stream() << shardId << ":123"); + } +}; + +TEST_F(ReshardingOplogFetcherTest, TestBasic) { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + + setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + auto fetcherJob = launchAsync([&, this] { + Client::initThread("RefetchRunner", _svcCtx, nullptr); + ReshardingOplogFetcher fetcher(_reshardingUUID, + dataColl->uuid(), + {_fetchTimestamp, _fetchTimestamp}, + _donorShard, + _destinationShard, + true, + outputCollectionNss); + fetcher.useReadConcernForTest(false); + fetcher.setInitialBatchSizeForTest(2); + + fetcher.iterate(&cc()); + }); + + requestPassthroughHandler(fetcherJob); + + // Five oplog entries for resharding + the sentinel final oplog entry. + ASSERT_EQ(6, itcount(outputCollectionNss)); +} + +TEST_F(ReshardingOplogFetcherTest, TestTrackLastSeen) { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + + setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + + const int maxBatches = 1; + auto fetcherJob = launchAsync([&, this] { + Client::initThread("RefetcherRunner", _svcCtx, nullptr); + + ReshardingOplogFetcher fetcher(_reshardingUUID, + dataColl->uuid(), + {_fetchTimestamp, _fetchTimestamp}, + _donorShard, + _destinationShard, + true, + outputCollectionNss); + fetcher.useReadConcernForTest(false); + fetcher.setInitialBatchSizeForTest(2); + fetcher.setMaxBatchesForTest(maxBatches); + + fetcher.iterate(&cc()); + return fetcher.getLastSeenTimestamp(); + }); + + ReshardingDonorOplogId lastSeen = requestPassthroughHandler(fetcherJob, maxBatches); + + // Two oplog entries due to the batch size. + ASSERT_EQ(2, itcount(outputCollectionNss)); + // Assert the lastSeen value has been bumped from the original `_fetchTimestamp`. + ASSERT_GT(lastSeen.getTs(), _fetchTimestamp); +} + +TEST_F(ReshardingOplogFetcherTest, TestFallingOffOplog) { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + + setupBasic(outputCollectionNss, dataCollectionNss, _destinationShard); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + + auto fetcherJob = launchAsync([&, this] { + Client::initThread("RefetcherRunner", _svcCtx, nullptr); + + const Timestamp doesNotExist(1, 1); + ReshardingOplogFetcher fetcher(_reshardingUUID, + dataColl->uuid(), + {doesNotExist, doesNotExist}, + _donorShard, + _destinationShard, + true, + outputCollectionNss); + fetcher.useReadConcernForTest(false); + + try { + fetcher.iterate(&cc()); + // Test failure case. + return Status::OK(); + } catch (...) { + return exceptionToStatus(); + } + }); + + auto fetcherStatus = requestPassthroughHandler(fetcherJob); + + // Two oplog entries due to the batch size. + ASSERT_EQ(0, itcount(outputCollectionNss)); + ASSERT_EQ(ErrorCodes::OplogQueryMinTsMissing, fetcherStatus.code()); +} + +} // namespace +} // namespace mongo |