diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2021-10-20 22:58:49 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-20 23:46:55 +0000 |
commit | 13093cdb3f878f20e8ebda8ac78f329d1b33a52f (patch) | |
tree | a0ebef270f58d852f9ad87b70b916bd6a1767563 | |
parent | 3495edd2170878da371d4de69c30a9213918d91d (diff) | |
download | mongo-13093cdb3f878f20e8ebda8ac78f329d1b33a52f.tar.gz |
SERVER-60774 Early exit in ReshardingOplogFetcher if final op fetched.
Changes the ReshardingOplogFetcher to return without doing any work when
the reshardFinalOp entry had already been fetched prior to resuming.
Also changes ReshardingDonorOplogIterator to throw if it ever sees an
entry in the local resharding oplog buffer collection after the
reshardFinalOp entry.
10 files changed, 165 insertions, 36 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp index 745e83a40f9..0efb2179306 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp @@ -162,6 +162,21 @@ void ensureTemporaryReshardingCollectionRenamed(OperationContext* opCtx, } Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection) { + auto doc = findDocWithHighestInsertedId(opCtx, collection); + if (!doc) { + return Value{}; + } + + auto value = (*doc)["_id"]; + uassert(4929300, + "Missing _id field for document in temporary resharding collection", + !value.missing()); + + return value; +} + +boost::optional<Document> findDocWithHighestInsertedId(OperationContext* opCtx, + const CollectionPtr& collection) { auto findCommand = std::make_unique<FindCommandRequest>(collection->ns()); findCommand->setLimit(1); findCommand->setSort(BSON("_id" << -1)); @@ -169,16 +184,11 @@ Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collec auto recordId = Helpers::findOne(opCtx, collection, std::move(findCommand), true /* requireIndex */); if (recordId.isNull()) { - return Value{}; + return boost::none; } auto doc = collection->docFor(opCtx, recordId).value(); - auto value = Value{doc["_id"]}; - uassert(4929300, - "Missing _id field for document in temporary resharding collection", - !value.missing()); - - return value; + return Document{doc}; } std::vector<InsertStatement> fillBatchForInsert(Pipeline& pipeline, int batchSizeLimitBytes) { diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.h b/src/mongo/db/s/resharding/resharding_data_copy_util.h index 0165284e5a8..c34768dcea1 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.h +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.h @@ -34,6 +34,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/exec/document_value/document.h" #include "mongo/db/exec/document_value/value.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/repl/oplog.h" @@ -95,6 +96,12 @@ void ensureTemporaryReshardingCollectionRenamed(OperationContext* opCtx, Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection); /** + * Returns the full document of the largest _id value in the collection. + */ +boost::optional<Document> findDocWithHighestInsertedId(OperationContext* opCtx, + const CollectionPtr& collection); + +/** * Returns a batch of documents suitable for being inserted with insertBatch(). * * The batch of documents is returned once its size exceeds batchSizeLimitBytes or the pipeline has diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp index bec1e02dec5..8ad0d2a4d3e 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp @@ -122,7 +122,8 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication:: auto oplogBufferNss = getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId()); auto minFetchTimestamp = *donor.getMinFetchTimestamp(); - auto idToResumeFrom = getOplogFetcherResumeId(opCtx, oplogBufferNss, minFetchTimestamp); + auto idToResumeFrom = getOplogFetcherResumeId( + opCtx, metadata.getReshardingUUID(), oplogBufferNss, minFetchTimestamp); invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp})); oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>( @@ -433,15 +434,25 @@ std::vector<NamespaceString> ReshardingDataReplication::ensureStashCollectionsEx } ReshardingDonorOplogId ReshardingDataReplication::getOplogFetcherResumeId( - OperationContext* opCtx, const NamespaceString& oplogBufferNss, Timestamp minFetchTimestamp) { + OperationContext* opCtx, + const UUID& reshardingUUID, + const NamespaceString& oplogBufferNss, + Timestamp minFetchTimestamp) { invariant(!opCtx->lockState()->isLocked()); AutoGetCollection coll(opCtx, oplogBufferNss, MODE_IS); if (coll) { - auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *coll); - if (!highestOplogBufferId.missing()) { + auto highestOplogBufferId = + resharding::data_copy::findDocWithHighestInsertedId(opCtx, *coll); + + if (highestOplogBufferId) { + auto oplogEntry = repl::OplogEntry{highestOplogBufferId->toBson()}; + if (isFinalOplog(oplogEntry, reshardingUUID)) { + return ReshardingOplogFetcher::kFinalOpAlreadyFetched; + } + return ReshardingDonorOplogId::parse({"getOplogFetcherResumeId"}, - highestOplogBufferId.getDocument().toBson()); + oplogEntry.get_id()->getDocument().toBson()); } } diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h index 821987f2001..fcdd77dcb68 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.h +++ b/src/mongo/db/s/resharding/resharding_data_replication.h @@ -178,6 +178,7 @@ public: const std::vector<DonorShardFetchTimestamp>& donorShards); static ReshardingDonorOplogId getOplogFetcherResumeId(OperationContext* opCtx, + const UUID& reshardingUUID, const NamespaceString& oplogBufferNss, Timestamp minFetchTimestamp); diff --git a/src/mongo/db/s/resharding/resharding_data_replication_test.cpp b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp index d04d0ec9879..e6602cb14de 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication_test.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp @@ -98,6 +98,14 @@ public: boost::none /* clusterTime */); } + const NamespaceString& sourceNss() { + return _sourceNss; + } + + const CollectionUUID& sourceUUID() { + return _sourceUUID; + } + private: RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt) { const auto version = rt.getVersion(); @@ -193,7 +201,7 @@ TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) { // The minFetchTimestamp value is used when the oplog buffer collection doesn't exist. ASSERT_BSONOBJ_BINARY_EQ( ReshardingDataReplication::getOplogFetcherResumeId( - opCtx.get(), oplogBufferNss, minFetchTimestamp) + opCtx.get(), reshardingUUID, oplogBufferNss, minFetchTimestamp) .toBSON(), (ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON())); @@ -201,21 +209,29 @@ TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) { resharding::data_copy::ensureCollectionExists(opCtx.get(), oplogBufferNss, CollectionOptions{}); ASSERT_BSONOBJ_BINARY_EQ( ReshardingDataReplication::getOplogFetcherResumeId( - opCtx.get(), oplogBufferNss, minFetchTimestamp) + opCtx.get(), reshardingUUID, oplogBufferNss, minFetchTimestamp) .toBSON(), (ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON())); auto insertFn = [&](const ReshardingDonorOplogId& oplogId) { + repl::MutableOplogEntry oplogEntry; + oplogEntry.setNss({}); + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setObject({}); + oplogEntry.setOpTime({{}, {}}); + oplogEntry.setWallClockTime({}); + oplogEntry.set_id(Value(oplogId.toBSON())); + AutoGetCollection oplogBufferColl(opCtx.get(), oplogBufferNss, MODE_IX); WriteUnitOfWork wuow(opCtx.get()); ASSERT_OK(oplogBufferColl->insertDocument( - opCtx.get(), InsertStatement{BSON("_id" << oplogId.toBSON())}, nullptr)); + opCtx.get(), InsertStatement{oplogEntry.toBSON()}, nullptr)); wuow.commit(); }; insertFn(oplogId2); ASSERT_BSONOBJ_BINARY_EQ(ReshardingDataReplication::getOplogFetcherResumeId( - opCtx.get(), oplogBufferNss, minFetchTimestamp) + opCtx.get(), reshardingUUID, oplogBufferNss, minFetchTimestamp) .toBSON(), oplogId2.toBSON()); @@ -223,7 +239,7 @@ TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) { insertFn(oplogId3); insertFn(oplogId1); ASSERT_BSONOBJ_BINARY_EQ(ReshardingDataReplication::getOplogFetcherResumeId( - opCtx.get(), oplogBufferNss, minFetchTimestamp) + opCtx.get(), reshardingUUID, oplogBufferNss, minFetchTimestamp) .toBSON(), oplogId3.toBSON()); } diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp index 3989cc1beb8..7314a354761 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp @@ -189,14 +189,15 @@ std::vector<repl::OplogEntry> ReshardingDonorOplogIterator::_fillBatch(Pipeline& numBytes += obj.objsize(); - // The ReshardingOplogFetcher may end up inserting no-op reshardProgressMark entries *after* - // the reshardFinalOp entry. This can happen when the ReshardingOplogFetcher resumes after - // a primary failover and it had already written the reshardFinalOp entry. We check each - // oplog entry for being the reshardFinalOp and halt filling the batch so that an empty - // batch is returned to ReshardingOplogApplier to signal all oplog entries from the donor - // shard were applied. if (isFinalOplog(entry)) { - break; + // The ReshardingOplogFetcher should never insert documents after the reshardFinalOp + // entry. We defensively check each oplog entry for being the reshardFinalOp and confirm + // the pipeline has been exhausted. + if (auto nextDoc = pipeline.getNext()) { + tasserted(6077499, + fmt::format("Unexpectedly found entry after reshardFinalOp: {}", + redact(nextDoc->toString()))); + } } } while (numBytes < resharding::gReshardingOplogBatchLimitBytes.load() && batch.size() < std::size_t(resharding::gReshardingOplogBatchLimitOperations.load())); diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp index b5e1b2e3212..994ff476098 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp @@ -41,6 +41,7 @@ #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/idl/server_parameter_test_util.h" #include "mongo/logv2/log.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -229,14 +230,12 @@ TEST_F(ReshardingDonorOplogIterTest, BasicExhaust) { const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1)); const auto oplog2 = makeInsertOplog(Timestamp(33, 6), BSON("y" << 1)); const auto finalOplog = makeFinalOplog(Timestamp(43, 24)); - const auto oplogBeyond = makeInsertOplog(Timestamp(123, 46), BSON("z" << 1)); DBDirectClient client(operationContext()); const auto ns = oplogNss().ns(); client.insert(ns, oplog1.toBSON()); client.insert(ns, oplog2.toBSON()); client.insert(ns, finalOplog.toBSON()); - client.insert(ns, oplogBeyond.toBSON()); ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady); auto executor = makeTaskExecutorForIterator(); @@ -289,7 +288,6 @@ TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) { const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1)); const auto oplog2 = makeInsertOplog(Timestamp(33, 6), BSON("y" << 1)); const auto finalOplog = makeFinalOplog(Timestamp(43, 24)); - const auto oplogBeyond = makeInsertOplog(Timestamp(123, 46), BSON("z" << 1)); DBDirectClient client(operationContext()); const auto ns = oplogNss().ns(); @@ -325,7 +323,6 @@ TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) { client.insert(ns, oplog2.toBSON()); } else { client.insert(ns, finalOplog.toBSON()); - client.insert(ns, oplogBeyond.toBSON()); } }}; @@ -503,7 +500,9 @@ TEST_F(ReshardingDonorOplogIterTest, BatchIncludesProgressMarkEntries) { ASSERT_TRUE(next.empty()); } -TEST_F(ReshardingDonorOplogIterTest, IgnoresProgressMarkEntriesAfterFinalOp) { +DEATH_TEST_REGEX_F(ReshardingDonorOplogIterTest, + ThrowsIfProgressMarkEntriesAfterFinalOp, + "Tripwire assertion.*6077499") { RAIIServerParameterControllerForTest controller{"reshardingOplogBatchLimitOperations", 100}; const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1)); @@ -529,13 +528,7 @@ TEST_F(ReshardingDonorOplogIterTest, IgnoresProgressMarkEntriesAfterFinalOp) { auto altClient = makeKillableClient(); AlternativeClientRegion acr(altClient); - auto next = getNextBatch(&iter, executor, factory); - ASSERT_EQ(next.size(), 2U); - ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0])); - ASSERT_BSONOBJ_EQ(getId(progressMarkOplog1), getId(next[1])); - - next = getNextBatch(&iter, executor, factory); - ASSERT_TRUE(next.empty()); + ASSERT_THROWS_CODE(getNextBatch(&iter, executor, factory), DBException, 6077499); } } // anonymous namespace diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index a4d7bb20352..23050570432 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -75,6 +75,9 @@ boost::intrusive_ptr<ExpressionContext> _makeExpressionContext(OperationContext* } } // namespace +const ReshardingDonorOplogId ReshardingOplogFetcher::kFinalOpAlreadyFetched{Timestamp::max(), + Timestamp::max()}; + ReshardingOplogFetcher::ReshardingOplogFetcher(std::unique_ptr<Env> env, UUID reshardingUUID, UUID collUUID, @@ -133,6 +136,14 @@ ExecutorFuture<void> ReshardingOplogFetcher::schedule( std::shared_ptr<executor::TaskExecutor> executor, const CancellationToken& cancelToken, CancelableOperationContextFactory factory) { + if (_startAt == kFinalOpAlreadyFetched) { + LOGV2_INFO(6077400, + "Resharding oplog fetcher resumed with no more work to do", + "donorShard"_attr = _donorShard, + "reshardingUUID"_attr = _reshardingUUID); + return ExecutorFuture(std::move(executor)); + } + return ExecutorFuture(executor) .then([this, executor, cancelToken, factory] { return _reschedule(std::move(executor), cancelToken, factory); @@ -165,6 +176,10 @@ ExecutorFuture<void> ReshardingOplogFetcher::_reschedule( }) .then([this, executor, cancelToken, factory](bool moreToCome) { if (!moreToCome) { + LOGV2_INFO(6077401, + "Resharding oplog fetcher done fetching", + "donorShard"_attr = _donorShard, + "reshardingUUID"_attr = _reshardingUUID); return ExecutorFuture(std::move(executor)); } @@ -259,6 +274,7 @@ AggregateCommandRequest ReshardingOplogFetcher::_makeAggregateCommandRequest( // concern to guarantee the postBatchResumeToken when the batch is empty is non-decreasing. // The ReshardingOplogFetcher depends on inserting documents in increasing _id order, // including for the synthetic no-op oplog entries generated from the postBatchResumeToken. + invariant(_startAt != kFinalOpAlreadyFetched); auto readConcernArgs = repl::ReadConcernArgs( boost::optional<LogicalTime>(_startAt.getClusterTime()), boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern)); diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h index bf75536d4ae..e0aad151f78 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h @@ -70,6 +70,10 @@ public: ReshardingMetrics* _metrics; }; + // Special value to use for startAt to indicate there are no more oplog entries needing to be + // fetched. + static const ReshardingDonorOplogId kFinalOpAlreadyFetched; + ReshardingOplogFetcher(std::unique_ptr<Env> env, UUID reshardingUUID, UUID collUUID, diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index f6d9ebb21bd..f62b89351c1 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -55,6 +55,7 @@ #include "mongo/db/storage/write_unit_of_work.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -665,5 +666,74 @@ TEST_F(ReshardingOplogFetcherTest, RetriesOnRemoteInterruptionError) { ASSERT_TRUE(moreToCome); } +TEST_F(ReshardingOplogFetcherTest, ImmediatelyDoneWhenFinalOpHasAlreadyBeenFetched) { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + + create(outputCollectionNss); + create(dataCollectionNss); + _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); + + const auto& collectionUUID = [&] { + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + return dataColl->uuid(); + }(); + + ReshardingOplogFetcher fetcher(makeFetcherEnv(), + _reshardingUUID, + collectionUUID, + ReshardingOplogFetcher::kFinalOpAlreadyFetched, + _donorShard, + _destinationShard, + outputCollectionNss); + + auto factory = makeCancelableOpCtx(); + auto future = fetcher.schedule(nullptr, CancellationToken::uncancelable(), factory); + + ASSERT_TRUE(future.isReady()); + ASSERT_OK(future.getNoThrow()); +} + +DEATH_TEST_REGEX_F(ReshardingOplogFetcherTest, + CannotFetchMoreWhenFinalOpHasAlreadyBeenFetched, + "Invariant failure.*_startAt != kFinalOpAlreadyFetched") { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + + create(outputCollectionNss); + create(dataCollectionNss); + _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); + + const auto& collectionUUID = [&] { + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + return dataColl->uuid(); + }(); + + auto fetcherJob = launchAsync([&, this] { + ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr); + + // We intentionally do not call fetcher.useReadConcernForTest(false) for this test case. + ReshardingOplogFetcher fetcher(makeFetcherEnv(), + _reshardingUUID, + collectionUUID, + ReshardingOplogFetcher::kFinalOpAlreadyFetched, + _donorShard, + _destinationShard, + outputCollectionNss); + fetcher.setInitialBatchSizeForTest(2); + + auto factory = makeCancelableOpCtx(); + return fetcher.iterate(&cc(), factory); + }); + + // Calling onCommand() leads to a more helpful "Expected death, found life" error when the + // invariant failure isn't triggered. + onCommand([&](const executor::RemoteCommandRequest& request) -> StatusWith<BSONObj> { + return {ErrorCodes::InternalError, "this error should never be observed"}; + }); + + (void)fetcherJob.timed_get(Seconds(5)); +} + } // namespace } // namespace mongo |