summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2021-02-18 10:49:23 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-18 11:11:48 +0000
commit0e6950610ce228b302770fecd06c6159659d614a (patch)
treeddc99a1955eeac0ebc397bacd34e1f394ca91de8 /src
parent25818a31c7d8209aa8ae22a536f94725aa7fa21e (diff)
downloadmongo-0e6950610ce228b302770fecd06c6159659d614a.tar.gz
Revert "SERVER-49894 Have resharding oplog fetcher resume from 'ts' component of largest _id inserted"
This reverts commit 22deeddb724e0ace7133762357b6fecd2c785443.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.cpp38
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_data_copy_util.cpp21
-rw-r--r--src/mongo/db/s/resharding/resharding_data_copy_util.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp36
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h4
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp31
7 files changed, 37 insertions, 97 deletions
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
index c9137febbf3..947c9834139 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/client.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/pipeline/aggregation_request_helper.h"
@@ -49,7 +50,6 @@
#include "mongo/db/pipeline/document_source_replace_root.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
#include "mongo/db/query/query_request_helper.h"
-#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/service_context.h"
@@ -211,6 +211,32 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel
return Pipeline::create(std::move(stages), std::move(expCtx));
}
+Value ReshardingCollectionCloner::_findHighestInsertedId(OperationContext* opCtx) {
+ AutoGetCollection outputColl(opCtx, _outputNss, MODE_IS);
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Resharding collection cloner's output collection '" << _outputNss
+ << "' did not already exist",
+ outputColl);
+
+ auto findCommand = std::make_unique<FindCommand>(_outputNss);
+ findCommand->setLimit(1);
+ findCommand->setSort(BSON("_id" << -1));
+
+ auto recordId =
+ Helpers::findOne(opCtx, *outputColl, std::move(findCommand), true /* requireIndex */);
+ if (recordId.isNull()) {
+ return Value{};
+ }
+
+ auto doc = outputColl->docFor(opCtx, recordId).value();
+ auto value = Value{doc["_id"]};
+ uassert(4929300,
+ "Missing _id field for document in temporary resharding collection",
+ !value.missing());
+
+ return value;
+}
+
std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_targetAggregationRequest(
OperationContext* opCtx, const Pipeline& pipeline) {
AggregateCommand request(_sourceNss, pipeline.serializeToBson());
@@ -339,15 +365,7 @@ ExecutorFuture<void> ReshardingCollectionCloner::run(
return AsyncTry([this, chainCtx] {
if (!chainCtx->pipeline) {
chainCtx->pipeline = _withTemporaryOperationContext([&](auto* opCtx) {
- auto idToResumeFrom = [&] {
- AutoGetCollection outputColl(opCtx, _outputNss, MODE_IS);
- uassert(ErrorCodes::NamespaceNotFound,
- str::stream()
- << "Resharding collection cloner's output collection '"
- << _outputNss << "' did not already exist",
- outputColl);
- return resharding::data_copy::findHighestInsertedId(opCtx, *outputColl);
- }();
+ auto idToResumeFrom = _findHighestInsertedId(opCtx);
auto pipeline = _targetAggregationRequest(
opCtx,
*makePipeline(
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h
index 0c33b8736bd..25d64d1f9c4 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.h
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h
@@ -75,6 +75,8 @@ public:
CancelationToken cancelToken);
private:
+ Value _findHighestInsertedId(OperationContext* opCtx);
+
std::unique_ptr<Pipeline, PipelineDeleter> _targetAggregationRequest(OperationContext* opCtx,
const Pipeline& pipeline);
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
index f295e1a9c8f..f870643e257 100644
--- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/dbhelpers.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/storage/write_unit_of_work.h"
@@ -79,24 +78,4 @@ void ensureCollectionDropped(OperationContext* opCtx,
});
}
-Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection) {
- auto findCommand = std::make_unique<FindCommand>(collection->ns());
- findCommand->setLimit(1);
- findCommand->setSort(BSON("_id" << -1));
-
- auto recordId =
- Helpers::findOne(opCtx, collection, std::move(findCommand), true /* requireIndex */);
- if (recordId.isNull()) {
- return Value{};
- }
-
- auto doc = collection->docFor(opCtx, recordId).value();
- auto value = Value{doc["_id"]};
- uassert(4929300,
- "Missing _id field for document in temporary resharding collection",
- !value.missing());
-
- return value;
-}
-
} // namespace mongo::resharding::data_copy
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.h b/src/mongo/db/s/resharding/resharding_data_copy_util.h
index 59322a34f26..3374b8c7228 100644
--- a/src/mongo/db/s/resharding/resharding_data_copy_util.h
+++ b/src/mongo/db/s/resharding/resharding_data_copy_util.h
@@ -60,7 +60,5 @@ void ensureCollectionDropped(OperationContext* opCtx,
const NamespaceString& nss,
const boost::optional<CollectionUUID>& uuid = boost::none);
-Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection);
-
} // namespace resharding::data_copy
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index a91bc080477..c52fb05b9eb 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -170,21 +170,6 @@ std::vector<NamespaceString> ensureStashCollectionsExist(
return stashCollections;
}
-ReshardingDonorOplogId getIdToResumeFrom(OperationContext* opCtx,
- NamespaceString oplogBufferNss,
- Timestamp fetchTimestamp) {
- AutoGetCollection collection(opCtx, oplogBufferNss, MODE_IS);
- if (!collection) {
- return ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp};
- }
-
- auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *collection);
- return highestOplogBufferId.missing()
- ? ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}
- : ReshardingDonorOplogId::parse({"resharding::getIdToResumeFrom"},
- highestOplogBufferId.getDocument().toBson());
-}
-
} // namespace resharding
std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService::constructInstance(
@@ -383,7 +368,6 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
}
auto* serviceContext = Client::getCurrent()->getServiceContext();
- auto fetchTimestamp = *_recipientDoc.getFetchTimestamp();
auto tempNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(),
_recipientDoc.getExistingUUID());
@@ -392,7 +376,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
_recipientDoc.getNss(),
_recipientDoc.getExistingUUID(),
ShardingState::get(serviceContext)->shardId(),
- fetchTimestamp,
+ *_recipientDoc.getFetchTimestamp(),
std::move(tempNss));
auto scopedOpCtx = cc().makeOperationContext();
@@ -411,24 +395,18 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
const auto& recipientId = ShardingState::get(serviceContext)->shardId();
for (const auto& donor : _recipientDoc.getDonorShardsMirroring()) {
- auto oplogBufferNss =
- getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor.getId());
- auto opCtx = cc().makeOperationContext();
- auto idToResumeFrom =
- resharding::getIdToResumeFrom(opCtx.get(), oplogBufferNss, fetchTimestamp);
- invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}));
-
stdx::lock_guard<Latch> lk(_mutex);
_oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>(
_recipientDoc.get_id(),
_recipientDoc.getExistingUUID(),
- // The recipient fetches oplog entries from the donor starting from the largest _id
- // value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds
- // to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value.
- idToResumeFrom,
+ // The recipient fetches oplog entries from the donor starting from the fetchTimestamp,
+ // which corresponds to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume
+ // token value.
+ ReshardingDonorOplogId{*_recipientDoc.getFetchTimestamp(),
+ *_recipientDoc.getFetchTimestamp()},
donor.getId(),
recipientId,
- oplogBufferNss));
+ getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor.getId())));
_oplogFetcherFutures.emplace_back(
_oplogFetchers.back()
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index c3d0dd1e759..4e7d0e0dade 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -61,10 +61,6 @@ std::vector<NamespaceString> ensureStashCollectionsExist(
const UUID& existingUUID,
std::vector<DonorShardMirroringEntry> donorShards);
-ReshardingDonorOplogId getIdToResumeFrom(OperationContext* opCtx,
- NamespaceString oplogBufferNss,
- Timestamp fetchTimestamp);
-
} // namespace resharding
class ReshardingRecipientService final : public repl::PrimaryOnlyService {
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index a6eae7bec5e..8a03ed223c6 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -41,7 +41,6 @@
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/s/migration_destination_manager.h"
#include "mongo/db/s/resharding/resharding_recipient_service.h"
-#include "mongo/db/s/resharding_util.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/logv2/log.h"
@@ -587,35 +586,5 @@ TEST_F(ReshardingRecipientServiceTest, StashCollectionsHaveSameCollationAsReshar
}
}
-TEST_F(ReshardingRecipientServiceTest, FindIdToResumeFrom) {
- auto opCtx = operationContext();
- NamespaceString oplogBufferNs = getLocalOplogBufferNamespace(kReshardingUUID, ShardId("foo"));
- auto timestamp0 = Timestamp(1, 0);
- auto timestamp1 = Timestamp(1, 1);
- auto timestamp2 = Timestamp(1, 2);
- auto timestamp3 = Timestamp(1, 3);
-
- // Starts from FetchTimestamp since localOplogBuffer doesn't exist
- ASSERT((resharding::getIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) ==
- ReshardingDonorOplogId{timestamp0, timestamp0}));
-
- DBDirectClient client(opCtx);
- client.insert(oplogBufferNs.toString(),
- BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp1)));
-
- // Makes sure to use the entry in localOplogBuffer
- ASSERT((resharding::getIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) ==
- ReshardingDonorOplogId{timestamp3, timestamp1}));
-
- client.insert(oplogBufferNs.toString(),
- BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp3)));
- client.insert(oplogBufferNs.toString(),
- BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp2)));
-
- // Makes sure to choose the largest timestamp
- ASSERT((resharding::getIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) ==
- ReshardingDonorOplogId{timestamp3, timestamp3}));
-}
-
} // namespace
} // namespace mongo