diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2021-02-18 10:49:23 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-18 11:11:48 +0000 |
commit | 0e6950610ce228b302770fecd06c6159659d614a (patch) | |
tree | ddc99a1955eeac0ebc397bacd34e1f394ca91de8 /src | |
parent | 25818a31c7d8209aa8ae22a536f94725aa7fa21e (diff) | |
download | mongo-0e6950610ce228b302770fecd06c6159659d614a.tar.gz |
Revert "SERVER-49894 Have resharding oplog fetcher resume from 'ts' component of largest _id inserted"
This reverts commit 22deeddb724e0ace7133762357b6fecd2c785443.
Diffstat (limited to 'src')
7 files changed, 37 insertions, 97 deletions
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index c9137febbf3..947c9834139 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -41,6 +41,7 @@ #include "mongo/db/client.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/exec/document_value/document.h" #include "mongo/db/exec/document_value/value.h" #include "mongo/db/pipeline/aggregation_request_helper.h" @@ -49,7 +50,6 @@ #include "mongo/db/pipeline/document_source_replace_root.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" #include "mongo/db/query/query_request_helper.h" -#include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/service_context.h" @@ -211,6 +211,32 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel return Pipeline::create(std::move(stages), std::move(expCtx)); } +Value ReshardingCollectionCloner::_findHighestInsertedId(OperationContext* opCtx) { + AutoGetCollection outputColl(opCtx, _outputNss, MODE_IS); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Resharding collection cloner's output collection '" << _outputNss + << "' did not already exist", + outputColl); + + auto findCommand = std::make_unique<FindCommand>(_outputNss); + findCommand->setLimit(1); + findCommand->setSort(BSON("_id" << -1)); + + auto recordId = + Helpers::findOne(opCtx, *outputColl, std::move(findCommand), true /* requireIndex */); + if (recordId.isNull()) { + return Value{}; + } + + auto doc = outputColl->docFor(opCtx, recordId).value(); + auto value = Value{doc["_id"]}; + uassert(4929300, + "Missing _id field for document in temporary resharding collection", + !value.missing()); + + return value; +} + std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_targetAggregationRequest( OperationContext* opCtx, const Pipeline& pipeline) { AggregateCommand request(_sourceNss, pipeline.serializeToBson()); @@ -339,15 +365,7 @@ ExecutorFuture<void> ReshardingCollectionCloner::run( return AsyncTry([this, chainCtx] { if (!chainCtx->pipeline) { chainCtx->pipeline = _withTemporaryOperationContext([&](auto* opCtx) { - auto idToResumeFrom = [&] { - AutoGetCollection outputColl(opCtx, _outputNss, MODE_IS); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() - << "Resharding collection cloner's output collection '" - << _outputNss << "' did not already exist", - outputColl); - return resharding::data_copy::findHighestInsertedId(opCtx, *outputColl); - }(); + auto idToResumeFrom = _findHighestInsertedId(opCtx); auto pipeline = _targetAggregationRequest( opCtx, *makePipeline( diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h index 0c33b8736bd..25d64d1f9c4 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.h +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h @@ -75,6 +75,8 @@ public: CancelationToken cancelToken); private: + Value _findHighestInsertedId(OperationContext* opCtx); + std::unique_ptr<Pipeline, PipelineDeleter> _targetAggregationRequest(OperationContext* opCtx, const Pipeline& pipeline); diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp index f295e1a9c8f..f870643e257 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp @@ -33,7 +33,6 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/dbhelpers.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/storage/write_unit_of_work.h" @@ -79,24 +78,4 @@ void ensureCollectionDropped(OperationContext* opCtx, }); } -Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection) { - auto findCommand = std::make_unique<FindCommand>(collection->ns()); - findCommand->setLimit(1); - findCommand->setSort(BSON("_id" << -1)); - - auto recordId = - Helpers::findOne(opCtx, collection, std::move(findCommand), true /* requireIndex */); - if (recordId.isNull()) { - return Value{}; - } - - auto doc = collection->docFor(opCtx, recordId).value(); - auto value = Value{doc["_id"]}; - uassert(4929300, - "Missing _id field for document in temporary resharding collection", - !value.missing()); - - return value; -} - } // namespace mongo::resharding::data_copy diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.h b/src/mongo/db/s/resharding/resharding_data_copy_util.h index 59322a34f26..3374b8c7228 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.h +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.h @@ -60,7 +60,5 @@ void ensureCollectionDropped(OperationContext* opCtx, const NamespaceString& nss, const boost::optional<CollectionUUID>& uuid = boost::none); -Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection); - } // namespace resharding::data_copy } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index a91bc080477..c52fb05b9eb 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -170,21 +170,6 @@ std::vector<NamespaceString> ensureStashCollectionsExist( return stashCollections; } -ReshardingDonorOplogId getIdToResumeFrom(OperationContext* opCtx, - NamespaceString oplogBufferNss, - Timestamp fetchTimestamp) { - AutoGetCollection collection(opCtx, oplogBufferNss, MODE_IS); - if (!collection) { - return ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}; - } - - auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *collection); - return highestOplogBufferId.missing() - ? ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp} - : ReshardingDonorOplogId::parse({"resharding::getIdToResumeFrom"}, - highestOplogBufferId.getDocument().toBson()); -} - } // namespace resharding std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService::constructInstance( @@ -383,7 +368,6 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin } auto* serviceContext = Client::getCurrent()->getServiceContext(); - auto fetchTimestamp = *_recipientDoc.getFetchTimestamp(); auto tempNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(), _recipientDoc.getExistingUUID()); @@ -392,7 +376,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin _recipientDoc.getNss(), _recipientDoc.getExistingUUID(), ShardingState::get(serviceContext)->shardId(), - fetchTimestamp, + *_recipientDoc.getFetchTimestamp(), std::move(tempNss)); auto scopedOpCtx = cc().makeOperationContext(); @@ -411,24 +395,18 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin const auto& recipientId = ShardingState::get(serviceContext)->shardId(); for (const auto& donor : _recipientDoc.getDonorShardsMirroring()) { - auto oplogBufferNss = - getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor.getId()); - auto opCtx = cc().makeOperationContext(); - auto idToResumeFrom = - resharding::getIdToResumeFrom(opCtx.get(), oplogBufferNss, fetchTimestamp); - invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp})); - stdx::lock_guard<Latch> lk(_mutex); _oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>( _recipientDoc.get_id(), _recipientDoc.getExistingUUID(), - // The recipient fetches oplog entries from the donor starting from the largest _id - // value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds - // to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value. - idToResumeFrom, + // The recipient fetches oplog entries from the donor starting from the fetchTimestamp, + // which corresponds to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume + // token value. + ReshardingDonorOplogId{*_recipientDoc.getFetchTimestamp(), + *_recipientDoc.getFetchTimestamp()}, donor.getId(), recipientId, - oplogBufferNss)); + getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor.getId()))); _oplogFetcherFutures.emplace_back( _oplogFetchers.back() diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index c3d0dd1e759..4e7d0e0dade 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -61,10 +61,6 @@ std::vector<NamespaceString> ensureStashCollectionsExist( const UUID& existingUUID, std::vector<DonorShardMirroringEntry> donorShards); -ReshardingDonorOplogId getIdToResumeFrom(OperationContext* opCtx, - NamespaceString oplogBufferNss, - Timestamp fetchTimestamp); - } // namespace resharding class ReshardingRecipientService final : public repl::PrimaryOnlyService { diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index a6eae7bec5e..8a03ed223c6 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -41,7 +41,6 @@ #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/s/migration_destination_manager.h" #include "mongo/db/s/resharding/resharding_recipient_service.h" -#include "mongo/db/s/resharding_util.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/logv2/log.h" @@ -587,35 +586,5 @@ TEST_F(ReshardingRecipientServiceTest, StashCollectionsHaveSameCollationAsReshar } } -TEST_F(ReshardingRecipientServiceTest, FindIdToResumeFrom) { - auto opCtx = operationContext(); - NamespaceString oplogBufferNs = getLocalOplogBufferNamespace(kReshardingUUID, ShardId("foo")); - auto timestamp0 = Timestamp(1, 0); - auto timestamp1 = Timestamp(1, 1); - auto timestamp2 = Timestamp(1, 2); - auto timestamp3 = Timestamp(1, 3); - - // Starts from FetchTimestamp since localOplogBuffer doesn't exist - ASSERT((resharding::getIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) == - ReshardingDonorOplogId{timestamp0, timestamp0})); - - DBDirectClient client(opCtx); - client.insert(oplogBufferNs.toString(), - BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp1))); - - // Makes sure to use the entry in localOplogBuffer - ASSERT((resharding::getIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) == - ReshardingDonorOplogId{timestamp3, timestamp1})); - - client.insert(oplogBufferNs.toString(), - BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp3))); - client.insert(oplogBufferNs.toString(), - BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp2))); - - // Makes sure to choose the largest timestamp - ASSERT((resharding::getIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) == - ReshardingDonorOplogId{timestamp3, timestamp3})); -} - } // namespace } // namespace mongo |