From fb8d9d4ce62b053c83495100c9181679c9b08872 Mon Sep 17 00:00:00 2001 From: Yuhong Zhang Date: Thu, 11 Feb 2021 20:48:50 +0000 Subject: SERVER-49894 Have resharding oplog fetcher resume from 'ts' component of largest _id inserted --- .../s/resharding/resharding_collection_cloner.cpp | 39 ++++++---------------- .../db/s/resharding/resharding_collection_cloner.h | 2 -- .../db/s/resharding/resharding_data_copy_util.cpp | 20 +++++++++++ .../db/s/resharding/resharding_data_copy_util.h | 2 ++ .../s/resharding/resharding_recipient_service.cpp | 36 ++++++++++++++++---- .../db/s/resharding/resharding_recipient_service.h | 4 +++ .../resharding_recipient_service_test.cpp | 31 +++++++++++++++++ 7 files changed, 97 insertions(+), 37 deletions(-) diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index 947c9834139..237f23e7d67 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -41,7 +41,6 @@ #include "mongo/db/client.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" -#include "mongo/db/dbhelpers.h" #include "mongo/db/exec/document_value/document.h" #include "mongo/db/exec/document_value/value.h" #include "mongo/db/pipeline/aggregation_request_helper.h" @@ -49,7 +48,9 @@ #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_replace_root.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" +#include "mongo/db/query/query_request.h" #include "mongo/db/query/query_request_helper.h" +#include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/service_context.h" @@ -211,32 +212,6 @@ std::unique_ptr ReshardingCollectionCloner::makePipel return Pipeline::create(std::move(stages), std::move(expCtx)); } -Value ReshardingCollectionCloner::_findHighestInsertedId(OperationContext* opCtx) { - AutoGetCollection outputColl(opCtx, _outputNss, MODE_IS); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "Resharding collection cloner's output collection '" << _outputNss - << "' did not already exist", - outputColl); - - auto findCommand = std::make_unique(_outputNss); - findCommand->setLimit(1); - findCommand->setSort(BSON("_id" << -1)); - - auto recordId = - Helpers::findOne(opCtx, *outputColl, std::move(findCommand), true /* requireIndex */); - if (recordId.isNull()) { - return Value{}; - } - - auto doc = outputColl->docFor(opCtx, recordId).value(); - auto value = Value{doc["_id"]}; - uassert(4929300, - "Missing _id field for document in temporary resharding collection", - !value.missing()); - - return value; -} - std::unique_ptr ReshardingCollectionCloner::_targetAggregationRequest( OperationContext* opCtx, const Pipeline& pipeline) { AggregateCommand request(_sourceNss, pipeline.serializeToBson()); @@ -365,7 +340,15 @@ ExecutorFuture ReshardingCollectionCloner::run( return AsyncTry([this, chainCtx] { if (!chainCtx->pipeline) { chainCtx->pipeline = _withTemporaryOperationContext([&](auto* opCtx) { - auto idToResumeFrom = _findHighestInsertedId(opCtx); + auto idToResumeFrom = [&] { + AutoGetCollection outputColl(opCtx, _outputNss, MODE_IS); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() + << "Resharding collection cloner's output collection '" + << _outputNss << "' did not already exist", + outputColl); + return resharding::data_copy::findHighestInsertedId(opCtx, *outputColl); + }(); auto pipeline = _targetAggregationRequest( opCtx, *makePipeline( diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h index d40209ccf03..d6883b9a062 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.h +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h @@ -71,8 +71,6 @@ public: CancelationToken cancelToken); private: - Value _findHighestInsertedId(OperationContext* opCtx); - std::unique_ptr _targetAggregationRequest(OperationContext* opCtx, const Pipeline& pipeline); diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp index f870643e257..17ce44dad77 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp @@ -33,6 +33,7 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/storage/write_unit_of_work.h" @@ -78,4 +79,23 @@ void ensureCollectionDropped(OperationContext* opCtx, }); } +Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection) { + auto qr = std::make_unique(collection->ns()); + qr->setLimit(1); + qr->setSort(BSON("_id" << -1)); + + auto recordId = Helpers::findOne(opCtx, collection, std::move(qr), true /* requireIndex */); + if (!recordId.isNormal()) { + return Value{}; + } + + auto doc = collection->docFor(opCtx, recordId).value(); + auto value = Value{doc["_id"]}; + uassert(4929300, + "Missing _id field for document in temporary resharding collection", + !value.missing()); + + return value; +} + } // namespace mongo::resharding::data_copy diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.h b/src/mongo/db/s/resharding/resharding_data_copy_util.h index 3374b8c7228..59322a34f26 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.h +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.h @@ -60,5 +60,7 @@ void ensureCollectionDropped(OperationContext* opCtx, const NamespaceString& nss, const boost::optional& uuid = boost::none); +Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection); + } // namespace resharding::data_copy } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index d846bf9290c..7c12ade6d38 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -169,6 +169,21 @@ std::vector ensureStashCollectionsExist( return stashCollections; } +ReshardingDonorOplogId getIdToResumeFrom(OperationContext* opCtx, + NamespaceString oplogBufferNss, + Timestamp fetchTimestamp) { + AutoGetCollection collection(opCtx, oplogBufferNss, MODE_IS); + if (!collection) { + return ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}; + } + + auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *collection); + return highestOplogBufferId.missing() + ? ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp} + : ReshardingDonorOplogId::parse({"resharding::getIdToResumeFrom"}, + highestOplogBufferId.getDocument().toBson()); +} + } // namespace resharding std::shared_ptr ReshardingRecipientService::constructInstance( @@ -351,6 +366,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin } auto* serviceContext = Client::getCurrent()->getServiceContext(); + auto fetchTimestamp = *_recipientDoc.getFetchTimestamp(); auto tempNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(), _recipientDoc.getExistingUUID()); @@ -359,7 +375,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin _recipientDoc.getNss(), _recipientDoc.getExistingUUID(), ShardingState::get(serviceContext)->shardId(), - *_recipientDoc.getFetchTimestamp(), + fetchTimestamp, std::move(tempNss)); auto numDonors = _recipientDoc.getDonorShardsMirroring().size(); @@ -373,18 +389,24 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin const auto& recipientId = ShardingState::get(serviceContext)->shardId(); for (const auto& donor : _recipientDoc.getDonorShardsMirroring()) { + auto oplogBufferNss = + getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor.getId()); + auto opCtx = cc().makeOperationContext(); + auto idToResumeFrom = + resharding::getIdToResumeFrom(opCtx.get(), oplogBufferNss, fetchTimestamp); + invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp})); + stdx::lock_guard lk(_mutex); _oplogFetchers.emplace_back(std::make_unique( _recipientDoc.get_id(), _recipientDoc.getExistingUUID(), - // The recipient fetches oplog entries from the donor starting from the fetchTimestamp, - // which corresponds to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume - // token value. - ReshardingDonorOplogId{*_recipientDoc.getFetchTimestamp(), - *_recipientDoc.getFetchTimestamp()}, + // The recipient fetches oplog entries from the donor starting from the largest _id + // value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds + // to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value. + idToResumeFrom, donor.getId(), recipientId, - getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor.getId()))); + oplogBufferNss)); _oplogFetcherFutures.emplace_back( _oplogFetchers.back() diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index 0e508783b5c..b570bc95c48 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -60,6 +60,10 @@ std::vector ensureStashCollectionsExist( const UUID& existingUUID, std::vector donorShards); +ReshardingDonorOplogId getIdToResumeFrom(OperationContext* opCtx, + NamespaceString oplogBufferNss, + Timestamp fetchTimestamp); + } // namespace resharding class ReshardingRecipientService final : public repl::PrimaryOnlyService { diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index 8a03ed223c6..a6eae7bec5e 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -41,6 +41,7 @@ #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/s/migration_destination_manager.h" #include "mongo/db/s/resharding/resharding_recipient_service.h" +#include "mongo/db/s/resharding_util.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/logv2/log.h" @@ -586,5 +587,35 @@ TEST_F(ReshardingRecipientServiceTest, StashCollectionsHaveSameCollationAsReshar } } +TEST_F(ReshardingRecipientServiceTest, FindIdToResumeFrom) { + auto opCtx = operationContext(); + NamespaceString oplogBufferNs = getLocalOplogBufferNamespace(kReshardingUUID, ShardId("foo")); + auto timestamp0 = Timestamp(1, 0); + auto timestamp1 = Timestamp(1, 1); + auto timestamp2 = Timestamp(1, 2); + auto timestamp3 = Timestamp(1, 3); + + // Starts from FetchTimestamp since localOplogBuffer doesn't exist + ASSERT((resharding::getIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) == + ReshardingDonorOplogId{timestamp0, timestamp0})); + + DBDirectClient client(opCtx); + client.insert(oplogBufferNs.toString(), + BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp1))); + + // Makes sure to use the entry in localOplogBuffer + ASSERT((resharding::getIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) == + ReshardingDonorOplogId{timestamp3, timestamp1})); + + client.insert(oplogBufferNs.toString(), + BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp3))); + client.insert(oplogBufferNs.toString(), + BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp2))); + + // Makes sure to choose the largest timestamp + ASSERT((resharding::getIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) == + ReshardingDonorOplogId{timestamp3, timestamp3})); +} + } // namespace } // namespace mongo -- cgit v1.2.1