diff options
10 files changed, 165 insertions, 36 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp index 745e83a40f9..0efb2179306 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp @@ -162,6 +162,21 @@ void ensureTemporaryReshardingCollectionRenamed(OperationContext* opCtx, } Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection) { + auto doc = findDocWithHighestInsertedId(opCtx, collection); + if (!doc) { + return Value{}; + } + + auto value = (*doc)["_id"]; + uassert(4929300, + "Missing _id field for document in temporary resharding collection", + !value.missing()); + + return value; +} + +boost::optional<Document> findDocWithHighestInsertedId(OperationContext* opCtx, + const CollectionPtr& collection) { auto findCommand = std::make_unique<FindCommandRequest>(collection->ns()); findCommand->setLimit(1); findCommand->setSort(BSON("_id" << -1)); @@ -169,16 +184,11 @@ Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collec auto recordId = Helpers::findOne(opCtx, collection, std::move(findCommand), true /* requireIndex */); if (recordId.isNull()) { - return Value{}; + return boost::none; } auto doc = collection->docFor(opCtx, recordId).value(); - auto value = Value{doc["_id"]}; - uassert(4929300, - "Missing _id field for document in temporary resharding collection", - !value.missing()); - - return value; + return Document{doc}; } std::vector<InsertStatement> fillBatchForInsert(Pipeline& pipeline, int batchSizeLimitBytes) { diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.h b/src/mongo/db/s/resharding/resharding_data_copy_util.h index 0165284e5a8..c34768dcea1 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.h +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.h @@ -34,6 +34,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/exec/document_value/document.h" #include "mongo/db/exec/document_value/value.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/repl/oplog.h" @@ -95,6 +96,12 @@ void ensureTemporaryReshardingCollectionRenamed(OperationContext* opCtx, Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection); /** + * Returns the full document of the largest _id value in the collection. + */ +boost::optional<Document> findDocWithHighestInsertedId(OperationContext* opCtx, + const CollectionPtr& collection); + +/** * Returns a batch of documents suitable for being inserted with insertBatch(). * * The batch of documents is returned once its size exceeds batchSizeLimitBytes or the pipeline has diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp index bec1e02dec5..8ad0d2a4d3e 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp @@ -122,7 +122,8 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication:: auto oplogBufferNss = getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId()); auto minFetchTimestamp = *donor.getMinFetchTimestamp(); - auto idToResumeFrom = getOplogFetcherResumeId(opCtx, oplogBufferNss, minFetchTimestamp); + auto idToResumeFrom = getOplogFetcherResumeId( + opCtx, metadata.getReshardingUUID(), oplogBufferNss, minFetchTimestamp); invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp})); oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>( @@ -433,15 +434,25 @@ std::vector<NamespaceString> ReshardingDataReplication::ensureStashCollectionsEx } ReshardingDonorOplogId ReshardingDataReplication::getOplogFetcherResumeId( - OperationContext* opCtx, const NamespaceString& oplogBufferNss, Timestamp minFetchTimestamp) { + OperationContext* opCtx, + const UUID& reshardingUUID, + const NamespaceString& oplogBufferNss, + Timestamp minFetchTimestamp) { invariant(!opCtx->lockState()->isLocked()); AutoGetCollection coll(opCtx, oplogBufferNss, MODE_IS); if (coll) { - auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *coll); - if (!highestOplogBufferId.missing()) { + auto highestOplogBufferId = + resharding::data_copy::findDocWithHighestInsertedId(opCtx, *coll); + + if (highestOplogBufferId) { + auto oplogEntry = repl::OplogEntry{highestOplogBufferId->toBson()}; + if (isFinalOplog(oplogEntry, reshardingUUID)) { + return ReshardingOplogFetcher::kFinalOpAlreadyFetched; + } + return ReshardingDonorOplogId::parse({"getOplogFetcherResumeId"}, - highestOplogBufferId.getDocument().toBson()); + oplogEntry.get_id()->getDocument().toBson()); } } diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h index 821987f2001..fcdd77dcb68 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.h +++ b/src/mongo/db/s/resharding/resharding_data_replication.h @@ -178,6 +178,7 @@ public: const std::vector<DonorShardFetchTimestamp>& donorShards); static ReshardingDonorOplogId getOplogFetcherResumeId(OperationContext* opCtx, + const UUID& reshardingUUID, const NamespaceString& oplogBufferNss, Timestamp minFetchTimestamp); diff --git a/src/mongo/db/s/resharding/resharding_data_replication_test.cpp b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp index d04d0ec9879..e6602cb14de 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication_test.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp @@ -98,6 +98,14 @@ public: boost::none /* clusterTime */); } + const NamespaceString& sourceNss() { + return _sourceNss; + } + + const CollectionUUID& sourceUUID() { + return _sourceUUID; + } + private: RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt) { const auto version = rt.getVersion(); @@ -193,7 +201,7 @@ TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) { // The minFetchTimestamp value is used when the oplog buffer collection doesn't exist. ASSERT_BSONOBJ_BINARY_EQ( ReshardingDataReplication::getOplogFetcherResumeId( - opCtx.get(), oplogBufferNss, minFetchTimestamp) + opCtx.get(), reshardingUUID, oplogBufferNss, minFetchTimestamp) .toBSON(), (ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON())); @@ -201,21 +209,29 @@ TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) { resharding::data_copy::ensureCollectionExists(opCtx.get(), oplogBufferNss, CollectionOptions{}); ASSERT_BSONOBJ_BINARY_EQ( ReshardingDataReplication::getOplogFetcherResumeId( - opCtx.get(), oplogBufferNss, minFetchTimestamp) + opCtx.get(), reshardingUUID, oplogBufferNss, minFetchTimestamp) .toBSON(), (ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON())); auto insertFn = [&](const ReshardingDonorOplogId& oplogId) { + repl::MutableOplogEntry oplogEntry; + oplogEntry.setNss({}); + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setObject({}); + oplogEntry.setOpTime({{}, {}}); + oplogEntry.setWallClockTime({}); + oplogEntry.set_id(Value(oplogId.toBSON())); + AutoGetCollection oplogBufferColl(opCtx.get(), oplogBufferNss, MODE_IX); WriteUnitOfWork wuow(opCtx.get()); ASSERT_OK(oplogBufferColl->insertDocument( - opCtx.get(), InsertStatement{BSON("_id" << oplogId.toBSON())}, nullptr)); + opCtx.get(), InsertStatement{oplogEntry.toBSON()}, nullptr)); wuow.commit(); }; insertFn(oplogId2); ASSERT_BSONOBJ_BINARY_EQ(ReshardingDataReplication::getOplogFetcherResumeId( - opCtx.get(), oplogBufferNss, minFetchTimestamp) + opCtx.get(), reshardingUUID, oplogBufferNss, minFetchTimestamp) .toBSON(), oplogId2.toBSON()); @@ -223,7 +239,7 @@ TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) { insertFn(oplogId3); insertFn(oplogId1); ASSERT_BSONOBJ_BINARY_EQ(ReshardingDataReplication::getOplogFetcherResumeId( - opCtx.get(), oplogBufferNss, minFetchTimestamp) + opCtx.get(), reshardingUUID, oplogBufferNss, minFetchTimestamp) .toBSON(), oplogId3.toBSON()); } diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp index 3989cc1beb8..7314a354761 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp @@ -189,14 +189,15 @@ std::vector<repl::OplogEntry> ReshardingDonorOplogIterator::_fillBatch(Pipeline& numBytes += obj.objsize(); - // The ReshardingOplogFetcher may end up inserting no-op reshardProgressMark entries *after* - // the reshardFinalOp entry. This can happen when the ReshardingOplogFetcher resumes after - // a primary failover and it had already written the reshardFinalOp entry. We check each - // oplog entry for being the reshardFinalOp and halt filling the batch so that an empty - // batch is returned to ReshardingOplogApplier to signal all oplog entries from the donor - // shard were applied. if (isFinalOplog(entry)) { - break; + // The ReshardingOplogFetcher should never insert documents after the reshardFinalOp + // entry. We defensively check each oplog entry for being the reshardFinalOp and confirm + // the pipeline has been exhausted. + if (auto nextDoc = pipeline.getNext()) { + tasserted(6077499, + fmt::format("Unexpectedly found entry after reshardFinalOp: {}", + redact(nextDoc->toString()))); + } } } while (numBytes < resharding::gReshardingOplogBatchLimitBytes.load() && batch.size() < std::size_t(resharding::gReshardingOplogBatchLimitOperations.load())); diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp index b5e1b2e3212..994ff476098 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp @@ -41,6 +41,7 @@ #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/idl/server_parameter_test_util.h" #include "mongo/logv2/log.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -229,14 +230,12 @@ TEST_F(ReshardingDonorOplogIterTest, BasicExhaust) { const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1)); const auto oplog2 = makeInsertOplog(Timestamp(33, 6), BSON("y" << 1)); const auto finalOplog = makeFinalOplog(Timestamp(43, 24)); - const auto oplogBeyond = makeInsertOplog(Timestamp(123, 46), BSON("z" << 1)); DBDirectClient client(operationContext()); const auto ns = oplogNss().ns(); client.insert(ns, oplog1.toBSON()); client.insert(ns, oplog2.toBSON()); client.insert(ns, finalOplog.toBSON()); - client.insert(ns, oplogBeyond.toBSON()); ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady); auto executor = makeTaskExecutorForIterator(); @@ -289,7 +288,6 @@ TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) { const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1)); const auto oplog2 = makeInsertOplog(Timestamp(33, 6), BSON("y" << 1)); const auto finalOplog = makeFinalOplog(Timestamp(43, 24)); - const auto oplogBeyond = makeInsertOplog(Timestamp(123, 46), BSON("z" << 1)); DBDirectClient client(operationContext()); const auto ns = oplogNss().ns(); @@ -325,7 +323,6 @@ TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) { client.insert(ns, oplog2.toBSON()); } else { client.insert(ns, finalOplog.toBSON()); - client.insert(ns, oplogBeyond.toBSON()); } }}; @@ -503,7 +500,9 @@ TEST_F(ReshardingDonorOplogIterTest, BatchIncludesProgressMarkEntries) { ASSERT_TRUE(next.empty()); } -TEST_F(ReshardingDonorOplogIterTest, IgnoresProgressMarkEntriesAfterFinalOp) { +DEATH_TEST_REGEX_F(ReshardingDonorOplogIterTest, + ThrowsIfProgressMarkEntriesAfterFinalOp, + "Tripwire assertion.*6077499") { RAIIServerParameterControllerForTest controller{"reshardingOplogBatchLimitOperations", 100}; const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1)); @@ -529,13 +528,7 @@ TEST_F(ReshardingDonorOplogIterTest, IgnoresProgressMarkEntriesAfterFinalOp) { auto altClient = makeKillableClient(); AlternativeClientRegion acr(altClient); - auto next = getNextBatch(&iter, executor, factory); - ASSERT_EQ(next.size(), 2U); - ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0])); - ASSERT_BSONOBJ_EQ(getId(progressMarkOplog1), getId(next[1])); - - next = getNextBatch(&iter, executor, factory); - ASSERT_TRUE(next.empty()); + ASSERT_THROWS_CODE(getNextBatch(&iter, executor, factory), DBException, 6077499); } } // anonymous namespace diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index a4d7bb20352..23050570432 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -75,6 +75,9 @@ boost::intrusive_ptr<ExpressionContext> _makeExpressionContext(OperationContext* } } // namespace +const ReshardingDonorOplogId ReshardingOplogFetcher::kFinalOpAlreadyFetched{Timestamp::max(), + Timestamp::max()}; + ReshardingOplogFetcher::ReshardingOplogFetcher(std::unique_ptr<Env> env, UUID reshardingUUID, UUID collUUID, @@ -133,6 +136,14 @@ ExecutorFuture<void> ReshardingOplogFetcher::schedule( std::shared_ptr<executor::TaskExecutor> executor, const CancellationToken& cancelToken, CancelableOperationContextFactory factory) { + if (_startAt == kFinalOpAlreadyFetched) { + LOGV2_INFO(6077400, + "Resharding oplog fetcher resumed with no more work to do", + "donorShard"_attr = _donorShard, + "reshardingUUID"_attr = _reshardingUUID); + return ExecutorFuture(std::move(executor)); + } + return ExecutorFuture(executor) .then([this, executor, cancelToken, factory] { return _reschedule(std::move(executor), cancelToken, factory); @@ -165,6 +176,10 @@ ExecutorFuture<void> ReshardingOplogFetcher::_reschedule( }) .then([this, executor, cancelToken, factory](bool moreToCome) { if (!moreToCome) { + LOGV2_INFO(6077401, + "Resharding oplog fetcher done fetching", + "donorShard"_attr = _donorShard, + "reshardingUUID"_attr = _reshardingUUID); return ExecutorFuture(std::move(executor)); } @@ -259,6 +274,7 @@ AggregateCommandRequest ReshardingOplogFetcher::_makeAggregateCommandRequest( // concern to guarantee the postBatchResumeToken when the batch is empty is non-decreasing. // The ReshardingOplogFetcher depends on inserting documents in increasing _id order, // including for the synthetic no-op oplog entries generated from the postBatchResumeToken. + invariant(_startAt != kFinalOpAlreadyFetched); auto readConcernArgs = repl::ReadConcernArgs( boost::optional<LogicalTime>(_startAt.getClusterTime()), boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern)); diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h index bf75536d4ae..e0aad151f78 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h @@ -70,6 +70,10 @@ public: ReshardingMetrics* _metrics; }; + // Special value to use for startAt to indicate there are no more oplog entries needing to be + // fetched. + static const ReshardingDonorOplogId kFinalOpAlreadyFetched; + ReshardingOplogFetcher(std::unique_ptr<Env> env, UUID reshardingUUID, UUID collUUID, diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index f6d9ebb21bd..f62b89351c1 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -55,6 +55,7 @@ #include "mongo/db/storage/write_unit_of_work.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -665,5 +666,74 @@ TEST_F(ReshardingOplogFetcherTest, RetriesOnRemoteInterruptionError) { ASSERT_TRUE(moreToCome); } +TEST_F(ReshardingOplogFetcherTest, ImmediatelyDoneWhenFinalOpHasAlreadyBeenFetched) { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + + create(outputCollectionNss); + create(dataCollectionNss); + _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); + + const auto& collectionUUID = [&] { + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + return dataColl->uuid(); + }(); + + ReshardingOplogFetcher fetcher(makeFetcherEnv(), + _reshardingUUID, + collectionUUID, + ReshardingOplogFetcher::kFinalOpAlreadyFetched, + _donorShard, + _destinationShard, + outputCollectionNss); + + auto factory = makeCancelableOpCtx(); + auto future = fetcher.schedule(nullptr, CancellationToken::uncancelable(), factory); + + ASSERT_TRUE(future.isReady()); + ASSERT_OK(future.getNoThrow()); +} + +DEATH_TEST_REGEX_F(ReshardingOplogFetcherTest, + CannotFetchMoreWhenFinalOpHasAlreadyBeenFetched, + "Invariant failure.*_startAt != kFinalOpAlreadyFetched") { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + + create(outputCollectionNss); + create(dataCollectionNss); + _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx); + + const auto& collectionUUID = [&] { + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + return dataColl->uuid(); + }(); + + auto fetcherJob = launchAsync([&, this] { + ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr); + + // We intentionally do not call fetcher.useReadConcernForTest(false) for this test case. + ReshardingOplogFetcher fetcher(makeFetcherEnv(), + _reshardingUUID, + collectionUUID, + ReshardingOplogFetcher::kFinalOpAlreadyFetched, + _donorShard, + _destinationShard, + outputCollectionNss); + fetcher.setInitialBatchSizeForTest(2); + + auto factory = makeCancelableOpCtx(); + return fetcher.iterate(&cc(), factory); + }); + + // Calling onCommand() leads to a more helpful "Expected death, found life" error when the + // invariant failure isn't triggered. + onCommand([&](const executor::RemoteCommandRequest& request) -> StatusWith<BSONObj> { + return {ErrorCodes::InternalError, "this error should never be observed"}; + }); + + (void)fetcherJob.timed_get(Seconds(5)); +} + } // namespace } // namespace mongo |