summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYuhong Zhang <yuhong.zhang@mongodb.com>2021-02-11 20:48:50 +0000
committerYuhong Zhang <yuhong.zhang@mongodb.com>2021-02-17 18:27:30 +0000
commitfb8d9d4ce62b053c83495100c9181679c9b08872 (patch)
treea4280a9f05b4492c0ede5520fff58488fb9af22c
parentf89e0d56229062e37748e710422120a1a40c8a8f (diff)
downloadmongo-server-49894.tar.gz
SERVER-49894 Have resharding oplog fetcher resume from 'ts' component of largest _id insertedserver-49894
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.cpp39
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_data_copy_util.cpp20
-rw-r--r--src/mongo/db/s/resharding/resharding_data_copy_util.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp36
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h4
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp31
7 files changed, 97 insertions, 37 deletions
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
index 947c9834139..237f23e7d67 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
@@ -41,7 +41,6 @@
#include "mongo/db/client.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
-#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/pipeline/aggregation_request_helper.h"
@@ -49,7 +48,9 @@
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_replace_root.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
+#include "mongo/db/query/query_request.h"
#include "mongo/db/query/query_request_helper.h"
+#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/service_context.h"
@@ -211,32 +212,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel
return Pipeline::create(std::move(stages), std::move(expCtx));
}
-Value ReshardingCollectionCloner::_findHighestInsertedId(OperationContext* opCtx) {
- AutoGetCollection outputColl(opCtx, _outputNss, MODE_IS);
- uassert(ErrorCodes::NamespaceNotFound,
- str::stream() << "Resharding collection cloner's output collection '" << _outputNss
- << "' did not already exist",
- outputColl);
-
- auto findCommand = std::make_unique<FindCommand>(_outputNss);
- findCommand->setLimit(1);
- findCommand->setSort(BSON("_id" << -1));
-
- auto recordId =
- Helpers::findOne(opCtx, *outputColl, std::move(findCommand), true /* requireIndex */);
- if (recordId.isNull()) {
- return Value{};
- }
-
- auto doc = outputColl->docFor(opCtx, recordId).value();
- auto value = Value{doc["_id"]};
- uassert(4929300,
- "Missing _id field for document in temporary resharding collection",
- !value.missing());
-
- return value;
-}
-
std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_targetAggregationRequest(
OperationContext* opCtx, const Pipeline& pipeline) {
AggregateCommand request(_sourceNss, pipeline.serializeToBson());
@@ -365,7 +340,15 @@ ExecutorFuture<void> ReshardingCollectionCloner::run(
return AsyncTry([this, chainCtx] {
if (!chainCtx->pipeline) {
chainCtx->pipeline = _withTemporaryOperationContext([&](auto* opCtx) {
- auto idToResumeFrom = _findHighestInsertedId(opCtx);
+ auto idToResumeFrom = [&] {
+ AutoGetCollection outputColl(opCtx, _outputNss, MODE_IS);
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream()
+ << "Resharding collection cloner's output collection '"
+ << _outputNss << "' did not already exist",
+ outputColl);
+ return resharding::data_copy::findHighestInsertedId(opCtx, *outputColl);
+ }();
auto pipeline = _targetAggregationRequest(
opCtx,
*makePipeline(
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h
index d40209ccf03..d6883b9a062 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.h
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h
@@ -71,8 +71,6 @@ public:
CancelationToken cancelToken);
private:
- Value _findHighestInsertedId(OperationContext* opCtx);
-
std::unique_ptr<Pipeline, PipelineDeleter> _targetAggregationRequest(OperationContext* opCtx,
const Pipeline& pipeline);
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
index f870643e257..17ce44dad77 100644
--- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/storage/write_unit_of_work.h"
@@ -78,4 +79,23 @@ void ensureCollectionDropped(OperationContext* opCtx,
});
}
+Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection) {
+ auto qr = std::make_unique<QueryRequest>(collection->ns());
+ qr->setLimit(1);
+ qr->setSort(BSON("_id" << -1));
+
+ auto recordId = Helpers::findOne(opCtx, collection, std::move(qr), true /* requireIndex */);
+ if (!recordId.isNormal()) {
+ return Value{};
+ }
+
+ auto doc = collection->docFor(opCtx, recordId).value();
+ auto value = Value{doc["_id"]};
+ uassert(4929300,
+ "Missing _id field for document in temporary resharding collection",
+ !value.missing());
+
+ return value;
+}
+
} // namespace mongo::resharding::data_copy
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.h b/src/mongo/db/s/resharding/resharding_data_copy_util.h
index 3374b8c7228..59322a34f26 100644
--- a/src/mongo/db/s/resharding/resharding_data_copy_util.h
+++ b/src/mongo/db/s/resharding/resharding_data_copy_util.h
@@ -60,5 +60,7 @@ void ensureCollectionDropped(OperationContext* opCtx,
const NamespaceString& nss,
const boost::optional<CollectionUUID>& uuid = boost::none);
+Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection);
+
} // namespace resharding::data_copy
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index d846bf9290c..7c12ade6d38 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -169,6 +169,21 @@ std::vector<NamespaceString> ensureStashCollectionsExist(
return stashCollections;
}
+ReshardingDonorOplogId getIdToResumeFrom(OperationContext* opCtx,
+ NamespaceString oplogBufferNss,
+ Timestamp fetchTimestamp) {
+ AutoGetCollection collection(opCtx, oplogBufferNss, MODE_IS);
+ if (!collection) {
+ return ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp};
+ }
+
+ auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *collection);
+ return highestOplogBufferId.missing()
+ ? ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}
+ : ReshardingDonorOplogId::parse({"resharding::getIdToResumeFrom"},
+ highestOplogBufferId.getDocument().toBson());
+}
+
} // namespace resharding
std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService::constructInstance(
@@ -351,6 +366,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
}
auto* serviceContext = Client::getCurrent()->getServiceContext();
+ auto fetchTimestamp = *_recipientDoc.getFetchTimestamp();
auto tempNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(),
_recipientDoc.getExistingUUID());
@@ -359,7 +375,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
_recipientDoc.getNss(),
_recipientDoc.getExistingUUID(),
ShardingState::get(serviceContext)->shardId(),
- *_recipientDoc.getFetchTimestamp(),
+ fetchTimestamp,
std::move(tempNss));
auto numDonors = _recipientDoc.getDonorShardsMirroring().size();
@@ -373,18 +389,24 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
const auto& recipientId = ShardingState::get(serviceContext)->shardId();
for (const auto& donor : _recipientDoc.getDonorShardsMirroring()) {
+ auto oplogBufferNss =
+ getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor.getId());
+ auto opCtx = cc().makeOperationContext();
+ auto idToResumeFrom =
+ resharding::getIdToResumeFrom(opCtx.get(), oplogBufferNss, fetchTimestamp);
+ invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}));
+
stdx::lock_guard<Latch> lk(_mutex);
_oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>(
_recipientDoc.get_id(),
_recipientDoc.getExistingUUID(),
- // The recipient fetches oplog entries from the donor starting from the fetchTimestamp,
- // which corresponds to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume
- // token value.
- ReshardingDonorOplogId{*_recipientDoc.getFetchTimestamp(),
- *_recipientDoc.getFetchTimestamp()},
+ // The recipient fetches oplog entries from the donor starting from the largest _id
+ // value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds
+ // to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value.
+ idToResumeFrom,
donor.getId(),
recipientId,
- getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor.getId())));
+ oplogBufferNss));
_oplogFetcherFutures.emplace_back(
_oplogFetchers.back()
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 0e508783b5c..b570bc95c48 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -60,6 +60,10 @@ std::vector<NamespaceString> ensureStashCollectionsExist(
const UUID& existingUUID,
std::vector<DonorShardMirroringEntry> donorShards);
+ReshardingDonorOplogId getIdToResumeFrom(OperationContext* opCtx,
+ NamespaceString oplogBufferNss,
+ Timestamp fetchTimestamp);
+
} // namespace resharding
class ReshardingRecipientService final : public repl::PrimaryOnlyService {
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 8a03ed223c6..a6eae7bec5e 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/s/migration_destination_manager.h"
#include "mongo/db/s/resharding/resharding_recipient_service.h"
+#include "mongo/db/s/resharding_util.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/logv2/log.h"
@@ -586,5 +587,35 @@ TEST_F(ReshardingRecipientServiceTest, StashCollectionsHaveSameCollationAsReshar
}
}
+TEST_F(ReshardingRecipientServiceTest, FindIdToResumeFrom) {
+ auto opCtx = operationContext();
+ NamespaceString oplogBufferNs = getLocalOplogBufferNamespace(kReshardingUUID, ShardId("foo"));
+ auto timestamp0 = Timestamp(1, 0);
+ auto timestamp1 = Timestamp(1, 1);
+ auto timestamp2 = Timestamp(1, 2);
+ auto timestamp3 = Timestamp(1, 3);
+
+ // Starts from FetchTimestamp since localOplogBuffer doesn't exist
+ ASSERT((resharding::getIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) ==
+ ReshardingDonorOplogId{timestamp0, timestamp0}));
+
+ DBDirectClient client(opCtx);
+ client.insert(oplogBufferNs.toString(),
+ BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp1)));
+
+ // Makes sure to use the entry in localOplogBuffer
+ ASSERT((resharding::getIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) ==
+ ReshardingDonorOplogId{timestamp3, timestamp1}));
+
+ client.insert(oplogBufferNs.toString(),
+ BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp3)));
+ client.insert(oplogBufferNs.toString(),
+ BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp2)));
+
+ // Makes sure to choose the largest timestamp
+ ASSERT((resharding::getIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) ==
+ ReshardingDonorOplogId{timestamp3, timestamp3}));
+}
+
} // namespace
} // namespace mongo