summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2021-03-22 20:51:11 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-08 19:16:47 +0000
commitaa401671d769f20e98f40b864338c7bd1c14d292 (patch)
tree27edd2840e7cc3a939bc9178d5fbb680cede3329 /src
parentbcd58f9b973e2a6839b40ceedb69dd245e72ab05 (diff)
downloadmongo-aa401671d769f20e98f40b864338c7bd1c14d292.tar.gz
SERVER-55214 Make resharding recipient shards use fetchTimestamp from each donor shard when fetching config.transactions and the oplog
Also force the no-op oplog write that is being used as the minFetchTimestamp marker for resharding into its own batch when replicating.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/namespace_string.cpp6
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/s/resharding/coordinator_document.idl5
-rw-r--r--src/mongo/db/s/resharding/recipient_document.idl7
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp44
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_test.cpp24
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.cpp69
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.h14
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp43
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h78
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp10
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service_test.cpp10
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp100
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h28
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp2
-rw-r--r--src/mongo/db/s/resharding_util.h18
-rw-r--r--src/mongo/s/resharding/common_types.idl28
-rw-r--r--src/mongo/s/resharding/type_collection_fields.idl12
20 files changed, 300 insertions, 207 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 784e2327bed..3e4ebaca039 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -121,6 +121,9 @@ const NamespaceString NamespaceString::kReshardingTxnClonerProgressNamespace(
const NamespaceString NamespaceString::kCollectionCriticalSectionsNamespace(
NamespaceString::kConfigDb, "collection_critical_sections");
+const NamespaceString NamespaceString::kForceOplogBatchBoundaryNamespace(
+ NamespaceString::kConfigDb, "system.forceOplogBatchBoundary");
+
bool NamespaceString::isListCollectionsCursorNS() const {
return coll() == listCollectionsCursorCol;
}
@@ -185,7 +188,8 @@ bool NamespaceString::isLegalClientSystemNS(
* processing each operation matches the primary's when committing that operation.
*/
bool NamespaceString::mustBeAppliedInOwnOplogBatch() const {
- return isSystemDotViews() || isServerConfigurationCollection() || isPrivilegeCollection();
+ return isSystemDotViews() || isServerConfigurationCollection() || isPrivilegeCollection() ||
+ _ns == kForceOplogBatchBoundaryNamespace.ns();
}
NamespaceString NamespaceString::makeListCollectionsNSS(StringData dbName) {
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 4a32d0891f9..5545129a4ff 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -172,6 +172,9 @@ public:
// Namespace for storing config.collectionCriticalSections documents
static const NamespaceString kCollectionCriticalSectionsNamespace;
+ // Dummy namespace used for forcing secondaries to handle an oplog entry on its own batch.
+ static const NamespaceString kForceOplogBatchBoundaryNamespace;
+
/**
* Constructs an empty NamespaceString.
*/
diff --git a/src/mongo/db/s/resharding/coordinator_document.idl b/src/mongo/db/s/resharding/coordinator_document.idl
index 3cadd2a63ff..a742988cde9 100644
--- a/src/mongo/db/s/resharding/coordinator_document.idl
+++ b/src/mongo/db/s/resharding/coordinator_document.idl
@@ -69,7 +69,6 @@ structs:
inline_chained_structs: true
chained_structs:
CommonReshardingMetadata: CommonReshardingMetadata
- FetchTimestamp: FetchTimestampStruct
ReshardingApproxCopySize: ReshardingApproxCopySizeStruct
AbortReason: AbortReasonStruct
generate_comparison_operators: false
@@ -79,6 +78,10 @@ structs:
strict: false
fields:
state: CoordinatorState
+ cloneTimestamp:
+ type: timestamp
+ description: "The timestamp for the snapshot read while cloning from the donors."
+ optional: true
donorShards:
type: array<DonorShardEntry>
description: "The list of all donor shards for this resharding operation."
diff --git a/src/mongo/db/s/resharding/recipient_document.idl b/src/mongo/db/s/resharding/recipient_document.idl
index 5efe3d36deb..5d2fe08ce8a 100644
--- a/src/mongo/db/s/resharding/recipient_document.idl
+++ b/src/mongo/db/s/resharding/recipient_document.idl
@@ -41,18 +41,21 @@ structs:
inline_chained_structs: true
chained_structs:
CommonReshardingMetadata: CommonReshardingMetadata
- FetchTimestamp: FetchTimestampStruct
generate_comparison_operators: false
# Use strict:false to avoid complications around upgrade/downgrade. This isn't technically
# required for resharding because durable state from all resharding operations is cleaned up
# before the upgrade or downgrade can complete.
strict: false
fields:
+ cloneTimestamp:
+ type: timestamp
+ optional: true
+ description: "The timestamp for the snapshot read while cloning from the donors."
# We intentionally have the mutable state nested in a subobject to make it easy to
# overwrite with a single $set.
mutableState: RecipientShardContext
donorShards:
- type: array<shard_id>
+ type: array<DonorShardFetchTimestamp>
description: "The list of donor shards that report to this recipient."
minimumOperationDurationMillis:
type: long
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 35df4d6b38b..ff5d149f8d8 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -140,10 +140,10 @@ void writeToCoordinatorStateNss(OperationContext* opCtx,
setBuilder.append(ReshardingCoordinatorDocument::kStateFieldName,
CoordinatorState_serializer(coordinatorDoc.getState()));
- if (auto fetchTimestamp = coordinatorDoc.getFetchTimestamp()) {
- // If the fetchTimestamp exists, include it in the update.
- setBuilder.append(ReshardingCoordinatorDocument::kFetchTimestampFieldName,
- *fetchTimestamp);
+ if (auto cloneTimestamp = coordinatorDoc.getCloneTimestamp()) {
+ // If the cloneTimestamp exists, include it in the update.
+ setBuilder.append(ReshardingCoordinatorDocument::kCloneTimestampFieldName,
+ *cloneTimestamp);
}
if (auto abortReason = coordinatorDoc.getAbortReason()) {
@@ -201,13 +201,21 @@ void writeToCoordinatorStateNss(OperationContext* opCtx,
*/
TypeCollectionRecipientFields constructRecipientFields(
const ReshardingCoordinatorDocument& coordinatorDoc) {
+ std::vector<DonorShardFetchTimestamp> donorShards;
+
+ for (const auto& donor : coordinatorDoc.getDonorShards()) {
+ DonorShardFetchTimestamp donorFetchTimestamp(donor.getId());
+ donorFetchTimestamp.setMinFetchTimestamp(donor.getMutableState().getMinFetchTimestamp());
+ donorShards.push_back(std::move(donorFetchTimestamp));
+ }
+
TypeCollectionRecipientFields recipientFields(
- extractShardIdsFromParticipantEntries(coordinatorDoc.getDonorShards()),
+ std::move(donorShards),
coordinatorDoc.getSourceUUID(),
coordinatorDoc.getSourceNss(),
resharding::gReshardingMinimumOperationDurationMillis.load());
- emplaceFetchTimestampIfExists(recipientFields, coordinatorDoc.getFetchTimestamp());
+ emplaceCloneTimestampIfExists(recipientFields, coordinatorDoc.getCloneTimestamp());
return recipientFields;
}
@@ -346,9 +354,18 @@ void writeToConfigCollectionsForTempNss(OperationContext* opCtx,
return BatchedCommandRequest::buildInsertOp(
CollectionType::ConfigNS, std::vector<BSONObj>{collType.toBSON()});
}
- case CoordinatorStateEnum::kCloning:
- // Update the 'state' and 'fetchTimestamp' fields in the
+ case CoordinatorStateEnum::kCloning: {
+ // Update the 'state', 'donorShards' and 'cloneTimestamp' fields in the
// 'reshardingFields.recipient' section
+
+ BSONArrayBuilder donorShardsBuilder;
+ for (const auto& donor : coordinatorDoc.getDonorShards()) {
+ DonorShardFetchTimestamp donorShardFetchTimestamp(donor.getId());
+ donorShardFetchTimestamp.setMinFetchTimestamp(
+ donor.getMutableState().getMinFetchTimestamp());
+ donorShardsBuilder.append(donorShardFetchTimestamp.toBSON());
+ }
+
return BatchedCommandRequest::buildUpdateOp(
CollectionType::ConfigNS,
BSON(CollectionType::kNssFieldName
@@ -356,12 +373,15 @@ void writeToConfigCollectionsForTempNss(OperationContext* opCtx,
BSON("$set" << BSON(
"reshardingFields.state"
<< CoordinatorState_serializer(nextState).toString()
- << "reshardingFields.recipientFields.fetchTimestamp"
- << coordinatorDoc.getFetchTimestamp().get() << "lastmod"
+ << "reshardingFields.recipientFields.cloneTimestamp"
+ << coordinatorDoc.getCloneTimestamp().get()
+ << "reshardingFields.recipientFields.donorShards"
+ << donorShardsBuilder.arr() << "lastmod"
<< opCtx->getServiceContext()->getPreciseClockSource()->now())),
false, // upsert
false // multi
);
+ }
case CoordinatorStateEnum::kDecisionPersisted:
// Remove the entry for the temporary nss
return BatchedCommandRequest::buildDeleteOp(
@@ -1396,13 +1416,13 @@ void ReshardingCoordinatorService::ReshardingCoordinator::
_updateCoordinatorDocStateAndCatalogEntries(
CoordinatorStateEnum nextState,
ReshardingCoordinatorDocument coordinatorDoc,
- boost::optional<Timestamp> fetchTimestamp,
+ boost::optional<Timestamp> cloneTimestamp,
boost::optional<ReshardingApproxCopySize> approxCopySize,
boost::optional<Status> abortReason) {
// Build new state doc for coordinator state update
ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc;
updatedCoordinatorDoc.setState(nextState);
- emplaceFetchTimestampIfExists(updatedCoordinatorDoc, std::move(fetchTimestamp));
+ emplaceCloneTimestampIfExists(updatedCoordinatorDoc, std::move(cloneTimestamp));
emplaceApproxBytesToCopyIfExists(updatedCoordinatorDoc, std::move(approxCopySize));
emplaceAbortReasonIfExists(updatedCoordinatorDoc, abortReason);
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index 9f3df91fc42..18f09fc948f 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -329,7 +329,7 @@ private:
void _updateCoordinatorDocStateAndCatalogEntries(
CoordinatorStateEnum nextState,
ReshardingCoordinatorDocument coordinatorDoc,
- boost::optional<Timestamp> fetchTimestamp = boost::none,
+ boost::optional<Timestamp> cloneTimestamp = boost::none,
boost::optional<ReshardingApproxCopySize> approxCopySize = boost::none,
boost::optional<Status> abortReason = boost::none);
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
index 45a063c20d2..75d8d961ad5 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
@@ -90,7 +90,7 @@ protected:
{DonorShardEntry(ShardId("shard0000"), {})},
{RecipientShardEntry(ShardId("shard0001"), {})});
doc.setCommonReshardingMetadata(meta);
- emplaceFetchTimestampIfExists(doc, std::move(fetchTimestamp));
+ emplaceCloneTimestampIfExists(doc, std::move(fetchTimestamp));
return doc;
}
@@ -251,12 +251,12 @@ protected:
0);
ASSERT(coordinatorDoc.getState() == expectedCoordinatorDoc.getState());
ASSERT(coordinatorDoc.getActive());
- if (expectedCoordinatorDoc.getFetchTimestamp()) {
- ASSERT(coordinatorDoc.getFetchTimestamp());
- ASSERT_EQUALS(coordinatorDoc.getFetchTimestamp().get(),
- expectedCoordinatorDoc.getFetchTimestamp().get());
+ if (expectedCoordinatorDoc.getCloneTimestamp()) {
+ ASSERT(coordinatorDoc.getCloneTimestamp());
+ ASSERT_EQUALS(coordinatorDoc.getCloneTimestamp().get(),
+ expectedCoordinatorDoc.getCloneTimestamp().get());
} else {
- ASSERT(!coordinatorDoc.getFetchTimestamp());
+ ASSERT(!coordinatorDoc.getCloneTimestamp());
}
// Confirm the (non)existence of the CoordinatorDocument abortReason.
@@ -398,12 +398,12 @@ protected:
ASSERT_EQUALS(onDiskReshardingFields.getRecipientFields()->getSourceNss(),
expectedReshardingFields.getRecipientFields()->getSourceNss());
- if (expectedReshardingFields.getRecipientFields()->getFetchTimestamp()) {
- ASSERT(onDiskReshardingFields.getRecipientFields()->getFetchTimestamp());
- ASSERT_EQUALS(onDiskReshardingFields.getRecipientFields()->getFetchTimestamp().get(),
- expectedReshardingFields.getRecipientFields()->getFetchTimestamp().get());
+ if (expectedReshardingFields.getRecipientFields()->getCloneTimestamp()) {
+ ASSERT(onDiskReshardingFields.getRecipientFields()->getCloneTimestamp());
+ ASSERT_EQUALS(onDiskReshardingFields.getRecipientFields()->getCloneTimestamp().get(),
+ expectedReshardingFields.getRecipientFields()->getCloneTimestamp().get());
} else {
- ASSERT(!onDiskReshardingFields.getRecipientFields()->getFetchTimestamp());
+ ASSERT(!onDiskReshardingFields.getRecipientFields()->getCloneTimestamp());
}
if (onDiskReshardingFields.getState() == CoordinatorStateEnum::kError) {
@@ -761,7 +761,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWithFetchTimestampSu
// Persist the updates on disk
auto expectedCoordinatorDoc = coordinatorDoc;
expectedCoordinatorDoc.setState(CoordinatorStateEnum::kCloning);
- emplaceFetchTimestampIfExists(expectedCoordinatorDoc, Timestamp(1, 1));
+ emplaceCloneTimestampIfExists(expectedCoordinatorDoc, Timestamp(1, 1));
writeStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc);
assertChunkVersionIncreasedAfterStateTransition(donorChunk, donorCollectionVersion);
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp
index 4ab52148ae7..cb2956e2085 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp
@@ -96,14 +96,14 @@ std::unique_ptr<ReshardingCollectionCloner> ReshardingDataReplication::_makeColl
std::vector<std::unique_ptr<ReshardingTxnCloner>> ReshardingDataReplication::_makeTxnCloners(
const CommonReshardingMetadata& metadata,
- const std::vector<ShardId>& donorShardIds,
- Timestamp fetchTimestamp) {
+ const std::vector<DonorShardFetchTimestamp>& donorShards) {
std::vector<std::unique_ptr<ReshardingTxnCloner>> txnCloners;
- txnCloners.reserve(donorShardIds.size());
+ txnCloners.reserve(donorShards.size());
- for (const auto& donor : donorShardIds) {
+ for (const auto& donor : donorShards) {
txnCloners.emplace_back(std::make_unique<ReshardingTxnCloner>(
- ReshardingSourceId(metadata.getReshardingUUID(), donor), fetchTimestamp));
+ ReshardingSourceId(metadata.getReshardingUUID(), donor.getShardId()),
+ *donor.getMinFetchTimestamp()));
}
return txnCloners;
@@ -113,14 +113,15 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication::
OperationContext* opCtx,
ReshardingMetrics* metrics,
const CommonReshardingMetadata& metadata,
- const std::vector<ShardId>& donorShardIds,
- Timestamp fetchTimestamp,
+ const std::vector<DonorShardFetchTimestamp>& donorShards,
const ShardId& myShardId) {
std::vector<std::unique_ptr<ReshardingOplogFetcher>> oplogFetchers;
- oplogFetchers.reserve(donorShardIds.size());
+ oplogFetchers.reserve(donorShards.size());
- for (const auto& donor : donorShardIds) {
- auto oplogBufferNss = getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor);
+ for (const auto& donor : donorShards) {
+ auto oplogBufferNss =
+ getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId());
+ auto fetchTimestamp = *donor.getMinFetchTimestamp();
auto idToResumeFrom =
resharding::getFetcherIdToResumeFrom(opCtx, oplogBufferNss, fetchTimestamp);
invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}));
@@ -133,7 +134,7 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication::
// value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds
// to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value.
std::move(idToResumeFrom),
- donor,
+ donor.getShardId(),
myShardId,
std::move(oplogBufferNss)));
}
@@ -178,23 +179,26 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::
OperationContext* opCtx,
ReshardingMetrics* metrics,
CommonReshardingMetadata metadata,
- const std::vector<ShardId>& donorShardIds,
- Timestamp fetchTimestamp,
+ const std::vector<DonorShardFetchTimestamp>& donorShards,
+ Timestamp cloneTimestamp,
ChunkManager sourceChunkMgr,
std::shared_ptr<executor::TaskExecutor> executor,
const std::vector<NamespaceString>& stashCollections,
const std::vector<std::unique_ptr<ReshardingOplogFetcher>>& oplogFetchers,
const std::vector<std::unique_ptr<ThreadPool>>& oplogApplierWorkers) {
std::vector<std::unique_ptr<ReshardingOplogApplier>> oplogAppliers;
- oplogAppliers.reserve(donorShardIds.size());
+ oplogAppliers.reserve(donorShards.size());
- for (size_t i = 0; i < donorShardIds.size(); ++i) {
- auto sourceId = ReshardingSourceId{metadata.getReshardingUUID(), donorShardIds[i]};
- auto idToResumeFrom = resharding::getApplierIdToResumeFrom(opCtx, sourceId, fetchTimestamp);
- invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}));
+ for (size_t i = 0; i < donorShards.size(); ++i) {
+ auto sourceId =
+ ReshardingSourceId{metadata.getReshardingUUID(), donorShards[i].getShardId()};
+ auto minOplogTimestamp = *donorShards[i].getMinFetchTimestamp();
+ auto idToResumeFrom =
+ resharding::getApplierIdToResumeFrom(opCtx, sourceId, minOplogTimestamp);
+ invariant((idToResumeFrom >= ReshardingDonorOplogId{minOplogTimestamp, minOplogTimestamp}));
const auto& oplogBufferNss =
- getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShardIds[i]);
+ getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShards[i].getShardId());
oplogAppliers.emplace_back(std::make_unique<ReshardingOplogApplier>(
std::make_unique<ReshardingOplogApplier::Env>(opCtx->getServiceContext(), metrics),
@@ -204,10 +208,10 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::
metadata.getSourceUUID(),
stashCollections,
i,
- fetchTimestamp,
+ cloneTimestamp,
// The recipient applies oplog entries from the donor starting from the progress value
- // in progress_applier. Otherwise, it starts at fetchTimestamp, which corresponds to
- // {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value.
+ // in progress_applier. Otherwise, it starts at cloneTimestamp, which corresponds to
+ // {clusterTime: cloneTimestamp, ts: cloneTimestamp} as a resume token value.
std::make_unique<ReshardingDonorOplogIterator>(
oplogBufferNss, std::move(idToResumeFrom), oplogFetchers[i].get()),
sourceChunkMgr,
@@ -222,8 +226,8 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m
OperationContext* opCtx,
ReshardingMetrics* metrics,
CommonReshardingMetadata metadata,
- std::vector<ShardId> donorShardIds,
- Timestamp fetchTimestamp,
+ const std::vector<DonorShardFetchTimestamp>& donorShards,
+ Timestamp cloneTimestamp,
bool cloningDone,
ShardId myShardId,
ChunkManager sourceChunkMgr,
@@ -232,24 +236,23 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m
std::vector<std::unique_ptr<ReshardingTxnCloner>> txnCloners;
if (!cloningDone) {
- collectionCloner = _makeCollectionCloner(metrics, metadata, myShardId, fetchTimestamp);
- txnCloners = _makeTxnCloners(metadata, donorShardIds, fetchTimestamp);
+ collectionCloner = _makeCollectionCloner(metrics, metadata, myShardId, cloneTimestamp);
+ txnCloners = _makeTxnCloners(metadata, donorShards);
}
- auto oplogFetchers =
- _makeOplogFetchers(opCtx, metrics, metadata, donorShardIds, fetchTimestamp, myShardId);
+ auto oplogFetchers = _makeOplogFetchers(opCtx, metrics, metadata, donorShards, myShardId);
- auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShardIds.size());
- auto oplogApplierWorkers = _makeOplogApplierWorkers(donorShardIds.size());
+ auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShards.size());
+ auto oplogApplierWorkers = _makeOplogApplierWorkers(donorShards.size());
auto stashCollections = resharding::ensureStashCollectionsExist(
- opCtx, sourceChunkMgr, metadata.getSourceUUID(), donorShardIds);
+ opCtx, sourceChunkMgr, metadata.getSourceUUID(), donorShards);
auto oplogAppliers = _makeOplogAppliers(opCtx,
metrics,
metadata,
- donorShardIds,
- fetchTimestamp,
+ donorShards,
+ cloneTimestamp,
std::move(sourceChunkMgr),
std::move(executor),
stashCollections,
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h
index e1b1d6437f6..48526e52c6b 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.h
+++ b/src/mongo/db/s/resharding/resharding_data_replication.h
@@ -144,8 +144,8 @@ public:
OperationContext* opCtx,
ReshardingMetrics* metrics,
CommonReshardingMetadata metadata,
- std::vector<ShardId> donorShardIds,
- Timestamp fetchTimestamp,
+ const std::vector<DonorShardFetchTimestamp>& donorShards,
+ Timestamp cloneTimestamp,
bool cloningDone,
ShardId myShardId,
ChunkManager sourceChunkMgr,
@@ -189,15 +189,13 @@ private:
static std::vector<std::unique_ptr<ReshardingTxnCloner>> _makeTxnCloners(
const CommonReshardingMetadata& metadata,
- const std::vector<ShardId>& donorShardIds,
- Timestamp fetchTimestamp);
+ const std::vector<DonorShardFetchTimestamp>& donorShards);
static std::vector<std::unique_ptr<ReshardingOplogFetcher>> _makeOplogFetchers(
OperationContext* opCtx,
ReshardingMetrics* metrics,
const CommonReshardingMetadata& metadata,
- const std::vector<ShardId>& donorShardIds,
- Timestamp fetchTimestamp,
+ const std::vector<DonorShardFetchTimestamp>& donorShards,
const ShardId& myShardId);
static std::shared_ptr<executor::TaskExecutor> _makeOplogFetcherExecutor(size_t numDonors);
@@ -208,8 +206,8 @@ private:
OperationContext* opCtx,
ReshardingMetrics* metrics,
CommonReshardingMetadata metadata,
- const std::vector<ShardId>& donorShardIds,
- Timestamp fetchTimestamp,
+ const std::vector<DonorShardFetchTimestamp>& donorShards,
+ Timestamp cloneTimestamp,
ChunkManager sourceChunkMgr,
std::shared_ptr<executor::TaskExecutor> executor,
const std::vector<NamespaceString>& stashCollections,
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
index 846f451ff52..e2320680222 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
@@ -334,14 +334,14 @@ ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields(
const ReshardingFields& reshardingFields) {
// The recipient state machines are created before the donor shards are prepared to donate but
// will remain idle until the donor shards are prepared to donate.
- invariant(!reshardingFields.getRecipientFields()->getFetchTimestamp());
+ invariant(!reshardingFields.getRecipientFields()->getCloneTimestamp());
RecipientShardContext recipientCtx;
recipientCtx.setState(RecipientStateEnum::kAwaitingFetchTimestamp);
auto recipientDoc = ReshardingRecipientDocument{
std::move(recipientCtx),
- reshardingFields.getRecipientFields()->getDonorShardIds(),
+ reshardingFields.getRecipientFields()->getDonorShards(),
reshardingFields.getRecipientFields()->getMinimumOperationDurationMillis()};
auto sourceNss = reshardingFields.getRecipientFields()->getSourceNss();
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
index fd61fbd7e26..7aab3e61325 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
@@ -52,7 +52,7 @@ using namespace fmt::literals;
TEST_F(ReshardingDonorRecipientCommonInternalsTest, ConstructDonorDocumentFromReshardingFields) {
OperationContext* opCtx = operationContext();
- auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard);
+ auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard.getShardId());
auto reshardingFields =
createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
@@ -66,13 +66,13 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ConstructDonorDocumentFromRe
TEST_F(ReshardingDonorRecipientCommonInternalsTest,
ConstructRecipientDocumentFromReshardingFields) {
OperationContext* opCtx = operationContext();
- auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard);
+ auto metadata =
+ makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard.getShardId());
auto reshardingFields =
createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
- appendRecipientFieldsToReshardingFields(
- reshardingFields, kShardIds, kExistingUUID, kOriginalNss);
+ appendRecipientFieldsToReshardingFields(reshardingFields, kShards, kExistingUUID, kOriginalNss);
auto recipientDoc = resharding::constructRecipientDocumentFromReshardingFields(
opCtx, kTemporaryReshardingNss, metadata, reshardingFields);
@@ -81,7 +81,7 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest,
TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) {
OperationContext* opCtx = operationContext();
- auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard);
+ auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard.getShardId());
auto reshardingFields =
createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
@@ -102,12 +102,12 @@ TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) {
TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) {
OperationContext* opCtx = operationContext();
- auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard);
+ auto metadata =
+ makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard.getShardId());
auto reshardingFields =
createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
- appendRecipientFieldsToReshardingFields(
- reshardingFields, kShardIds, kExistingUUID, kOriginalNss);
+ appendRecipientFieldsToReshardingFields(reshardingFields, kShards, kExistingUUID, kOriginalNss);
resharding::processReshardingFieldsForCollection(
opCtx, kTemporaryReshardingNss, metadata, reshardingFields);
@@ -126,7 +126,7 @@ TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) {
TEST_F(ReshardingDonorRecipientCommonTest,
CreateDonorServiceInstanceWithIncorrectCoordinatorState) {
OperationContext* opCtx = operationContext();
- auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard);
+ auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard.getShardId());
auto reshardingFields =
createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kDecisionPersisted);
@@ -142,12 +142,13 @@ TEST_F(ReshardingDonorRecipientCommonTest,
TEST_F(ReshardingDonorRecipientCommonTest,
CreateRecipientServiceInstanceWithIncorrectCoordinatorState) {
OperationContext* opCtx = operationContext();
- auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard);
+ auto metadata =
+ makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard.getShardId());
auto reshardingFields =
createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kDecisionPersisted);
appendRecipientFieldsToReshardingFields(
- reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp);
+ reshardingFields, kShards, kExistingUUID, kOriginalNss, kCloneTimestamp);
ASSERT_THROWS_CODE(resharding::processReshardingFieldsForCollection(
opCtx, kTemporaryReshardingNss, metadata, reshardingFields),
@@ -165,7 +166,7 @@ TEST_F(ReshardingDonorRecipientCommonTest,
TEST_F(ReshardingDonorRecipientCommonTest, ProcessDonorFieldsWhenShardDoesntOwnAnyChunks) {
OperationContext* opCtx = operationContext();
- auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kOtherShard);
+ auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kOtherShard.getShardId());
auto reshardingFields =
createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
@@ -184,12 +185,12 @@ TEST_F(ReshardingDonorRecipientCommonTest, ProcessDonorFieldsWhenShardDoesntOwnA
TEST_F(ReshardingDonorRecipientCommonTest, ProcessRecipientFieldsWhenShardDoesntOwnAnyChunks) {
OperationContext* opCtx = operationContext();
- auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kOtherShard);
+ auto metadata =
+ makeShardedMetadataForTemporaryReshardingCollection(opCtx, kOtherShard.getShardId());
auto reshardingFields =
createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
- appendRecipientFieldsToReshardingFields(
- reshardingFields, kShardIds, kExistingUUID, kOriginalNss);
+ appendRecipientFieldsToReshardingFields(reshardingFields, kShards, kExistingUUID, kOriginalNss);
resharding::processReshardingFieldsForCollection(
opCtx, kTemporaryReshardingNss, metadata, reshardingFields);
@@ -205,7 +206,8 @@ TEST_F(ReshardingDonorRecipientCommonTest, ProcessRecipientFieldsWhenShardDoesnt
TEST_F(ReshardingDonorRecipientCommonTest, ProcessReshardingFieldsWithoutDonorOrRecipientFields) {
OperationContext* opCtx = operationContext();
- auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard);
+ auto metadata =
+ makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard.getShardId());
auto reshardingFields =
createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
@@ -238,8 +240,8 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ClearReshardingFilteringMeta
{
AutoGetCollection autoColl(opCtx, kOriginalNss, LockMode::MODE_IS);
auto csr = CollectionShardingRuntime::get(opCtx, kOriginalNss);
- csr->setFilteringMetadata(opCtx,
- makeShardedMetadataForOriginalCollection(opCtx, kThisShard));
+ csr->setFilteringMetadata(
+ opCtx, makeShardedMetadataForOriginalCollection(opCtx, kThisShard.getShardId()));
ASSERT(csr->getCurrentMetadataIfKnown());
}
@@ -247,8 +249,9 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ClearReshardingFilteringMeta
{
AutoGetCollection autoColl(opCtx, kTemporaryReshardingNss, LockMode::MODE_IS);
auto csr = CollectionShardingRuntime::get(opCtx, kTemporaryReshardingNss);
- csr->setFilteringMetadata(
- opCtx, makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard));
+ csr->setFilteringMetadata(opCtx,
+ makeShardedMetadataForTemporaryReshardingCollection(
+ opCtx, kThisShard.getShardId()));
ASSERT(csr->getCurrentMetadataIfKnown());
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
index 72a3307cd5e..b8f4356bd17 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
@@ -63,12 +63,14 @@ public:
const OID kReshardingEpoch = OID::gen();
const UUID kReshardingUUID = UUID::gen();
- const ShardId kThisShard = ShardId("shardOne");
- const ShardId kOtherShard = ShardId("shardTwo");
+ const DonorShardFetchTimestamp kThisShard =
+ makeDonorShardFetchTimestamp(ShardId("shardOne"), Timestamp(10, 0));
+ const DonorShardFetchTimestamp kOtherShard =
+ makeDonorShardFetchTimestamp(ShardId("shardTwo"), Timestamp(20, 0));
- const std::vector<ShardId> kShardIds = {kThisShard, kOtherShard};
+ const std::vector<DonorShardFetchTimestamp> kShards = {kThisShard, kOtherShard};
- const Timestamp kFetchTimestamp = Timestamp(1, 0);
+ const Timestamp kCloneTimestamp = Timestamp(20, 0);
protected:
CollectionMetadata makeShardedMetadataForOriginalCollection(
@@ -104,7 +106,7 @@ protected:
auto chunk = ChunkType(
nss, std::move(range), ChunkVersion(1, 0, epoch, boost::none), shardThatChunkExistsOn);
ChunkManager cm(
- kThisShard,
+ kThisShard.getShardId(),
DatabaseVersion(uuid),
makeStandaloneRoutingTableHistory(RoutingTableHistory::makeNew(nss,
uuid,
@@ -119,7 +121,7 @@ protected:
boost::none);
if (!OperationShardingState::isOperationVersioned(opCtx)) {
- const auto version = cm.getVersion(kThisShard);
+ const auto version = cm.getVersion(kThisShard.getShardId());
BSONObjBuilder builder;
version.appendToCommand(&builder);
@@ -127,14 +129,15 @@ protected:
oss.initializeClientRoutingVersionsFromCommand(nss, builder.obj());
}
- return CollectionMetadata(std::move(cm), kThisShard);
+ return CollectionMetadata(std::move(cm), kThisShard.getShardId());
}
ReshardingDonorDocument makeDonorStateDoc() {
DonorShardContext donorCtx;
donorCtx.setState(DonorStateEnum::kPreparingToDonate);
- ReshardingDonorDocument doc(std::move(donorCtx), {kThisShard, kOtherShard});
+ ReshardingDonorDocument doc(std::move(donorCtx),
+ {kThisShard.getShardId(), kOtherShard.getShardId()});
NamespaceString sourceNss = kOriginalNss;
auto sourceUUID = UUID::gen();
@@ -149,7 +152,8 @@ protected:
RecipientShardContext recipCtx;
recipCtx.setState(RecipientStateEnum::kCloning);
- ReshardingRecipientDocument doc(std::move(recipCtx), {kThisShard, kOtherShard}, 1000);
+ ReshardingRecipientDocument doc(
+ std::move(recipCtx), {kThisShard.getShardId(), kOtherShard.getShardId()}, 1000);
NamespaceString sourceNss = kOriginalNss;
auto sourceUUID = UUID::gen();
@@ -158,10 +162,8 @@ protected:
doc.setCommonReshardingMetadata(std::move(commonMetadata));
- // A document in the cloning state requires a fetch timestamp.
- FetchTimestamp ts;
- ts.setFetchTimestamp(kFetchTimestamp);
- doc.setFetchTimestampStruct(ts);
+ // A document in the cloning state requires a clone timestamp.
+ doc.setCloneTimestamp(kCloneTimestamp);
return doc;
}
@@ -174,19 +176,24 @@ protected:
void appendDonorFieldsToReshardingFields(ReshardingFields& fields,
const BSONObj& reshardingKey) {
+ std::vector<ShardId> donorShardIds;
+ for (const auto& shard : kShards) {
+ donorShardIds.emplace_back(shard.getShardId());
+ }
+
fields.setDonorFields(
- TypeCollectionDonorFields(kTemporaryReshardingNss, reshardingKey, kShardIds));
+ TypeCollectionDonorFields(kTemporaryReshardingNss, reshardingKey, donorShardIds));
}
void appendRecipientFieldsToReshardingFields(
ReshardingFields& fields,
- const std::vector<ShardId> donorShardIds,
+ const std::vector<DonorShardFetchTimestamp> donorShards,
const UUID& existingUUID,
const NamespaceString& originalNss,
- const boost::optional<Timestamp>& fetchTimestamp = boost::none) {
+ const boost::optional<Timestamp>& cloneTimestamp = boost::none) {
auto recipientFields =
- TypeCollectionRecipientFields(donorShardIds, existingUUID, originalNss, 5000);
- emplaceFetchTimestampIfExists(recipientFields, fetchTimestamp);
+ TypeCollectionRecipientFields(donorShards, existingUUID, originalNss, 5000);
+ emplaceCloneTimestampIfExists(recipientFields, cloneTimestamp);
fields.setRecipientFields(std::move(recipientFields));
}
@@ -229,18 +236,37 @@ protected:
ASSERT(recipientDoc.getMutableState().getState() ==
RecipientStateEnum::kAwaitingFetchTimestamp);
- ASSERT(!recipientDoc.getFetchTimestamp());
+ ASSERT(!recipientDoc.getCloneTimestamp());
+
+ const auto donorShards = reshardingFields.getRecipientFields()->getDonorShards();
+ std::map<ShardId, DonorShardFetchTimestamp> donorShardMap;
+ for (const auto& donor : donorShards) {
+ donorShardMap.emplace(donor.getShardId(), donor);
+ }
- auto donorShardIds = reshardingFields.getRecipientFields()->getDonorShardIds();
- auto donorShardIdsSet = std::set<ShardId>(donorShardIds.begin(), donorShardIds.end());
+ for (const auto& donorShardFromRecipientDoc : recipientDoc.getDonorShards()) {
+ auto donorIter = donorShardMap.find(donorShardFromRecipientDoc.getShardId());
+ ASSERT(donorIter != donorShardMap.end());
+ ASSERT_EQ(donorIter->second.getMinFetchTimestamp().has_value(),
+ donorShardFromRecipientDoc.getMinFetchTimestamp().has_value());
- for (const auto& donorShardId : recipientDoc.getDonorShards()) {
- auto reshardingFieldsDonorShardId = donorShardIdsSet.find(donorShardId);
- ASSERT(reshardingFieldsDonorShardId != donorShardIdsSet.end());
- donorShardIdsSet.erase(reshardingFieldsDonorShardId);
+ if (donorIter->second.getMinFetchTimestamp()) {
+ ASSERT_EQ(*donorIter->second.getMinFetchTimestamp(),
+ *donorShardFromRecipientDoc.getMinFetchTimestamp());
+ }
+
+ donorShardMap.erase(donorShardFromRecipientDoc.getShardId());
}
- ASSERT(donorShardIdsSet.empty());
+ ASSERT(donorShardMap.empty());
+ }
+
+private:
+ DonorShardFetchTimestamp makeDonorShardFetchTimestamp(
+ ShardId shardId, boost::optional<Timestamp> fetchTimestamp) {
+ DonorShardFetchTimestamp donorFetchTimestamp(shardId);
+ donorFetchTimestamp.setMinFetchTimestamp(fetchTimestamp);
+ return donorFetchTimestamp;
}
};
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 62ebb7305d8..c02296ca462 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -65,8 +65,7 @@ namespace {
const WriteConcernOptions kNoWaitWriteConcern{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)};
-Timestamp generateMinFetchTimestamp(const NamespaceString& sourceNss,
- const CollectionUUID& sourceUUID) {
+Timestamp generateMinFetchTimestamp(const NamespaceString& sourceNss) {
auto opCtx = cc().makeOperationContext();
// Do a no-op write and use the OpTime as the minFetchTimestamp
@@ -86,8 +85,8 @@ Timestamp generateMinFetchTimestamp(const NamespaceString& sourceNss,
WriteUnitOfWork wuow(opCtx.get());
opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage(
opCtx.get(),
- sourceNss,
- sourceUUID,
+ NamespaceString::kForceOplogBatchBoundaryNamespace,
+ boost::none,
BSON("msg" << msg),
boost::none,
boost::none,
@@ -442,8 +441,7 @@ void ReshardingDonorService::DonorStateMachine::
_externalState->waitForCollectionFlush(opCtx.get(), _metadata.getTempReshardingNss());
}
- Timestamp minFetchTimestamp =
- generateMinFetchTimestamp(_metadata.getSourceNss(), _metadata.getSourceUUID());
+ Timestamp minFetchTimestamp = generateMinFetchTimestamp(_metadata.getSourceNss());
LOGV2_DEBUG(5390702,
2,
diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
index ae24821b064..855c316fe08 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
@@ -307,8 +307,9 @@ TEST_F(ReshardingDonorServiceTest, WritesNoOpOplogEntryToGenerateMinFetchTimesta
ErrorCodes::InterruptedDueToReplStateChange);
DBDirectClient client(opCtx.get());
- auto cursor = client.query(NamespaceString(NamespaceString::kRsOplogNamespace.ns()),
- BSON("ns" << doc.getSourceNss().toString()));
+ auto cursor =
+ client.query(NamespaceString(NamespaceString::kRsOplogNamespace.ns()),
+ BSON("ns" << NamespaceString::kForceOplogBatchBoundaryNamespace.ns()));
ASSERT_TRUE(cursor->more()) << "Found no oplog entries for source collection";
repl::OplogEntry op(cursor->next());
@@ -317,7 +318,7 @@ TEST_F(ReshardingDonorServiceTest, WritesNoOpOplogEntryToGenerateMinFetchTimesta
ASSERT_EQ(OpType_serializer(op.getOpType()), OpType_serializer(repl::OpTypeEnum::kNoop))
<< op.getEntry();
- ASSERT_EQ(op.getUuid(), doc.getSourceUUID()) << op.getEntry();
+ ASSERT_FALSE(op.getUuid()) << op.getEntry();
ASSERT_EQ(op.getObject()["msg"].type(), BSONType::String) << op.getEntry();
ASSERT_FALSE(bool(op.getObject2())) << op.getEntry();
ASSERT_FALSE(bool(op.getDestinedRecipient())) << op.getEntry();
@@ -347,9 +348,6 @@ TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBl
BSON("ns" << doc.getSourceNss().toString()));
ASSERT_TRUE(cursor->more()) << "Found no oplog entries for source collection";
- // Skip the first oplog entry returned because it is the no-op from generating the
- // minFetchTimestamp value.
- cursor->next();
for (const auto& recipientShardId : doc.getRecipientShards()) {
ASSERT_TRUE(cursor->more()) << "Didn't find finalReshardOp entry for source collection";
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 2daebd6656e..40a3729f091 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -81,13 +81,16 @@ void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) {
}
}
-void ensureFulfilledPromise(WithLock lk, SharedPromise<Timestamp>& sp, Timestamp ts) {
+void ensureFulfilledPromise(
+ WithLock lk,
+ SharedPromise<ReshardingRecipientService::RecipientStateMachine::CloneDetails>& sp,
+ ReshardingRecipientService::RecipientStateMachine::CloneDetails details) {
auto future = sp.getFuture();
if (!future.isReady()) {
- sp.emplaceValue(ts);
+ sp.emplaceValue(details);
} else {
// Ensure that we would only attempt to fulfill the promise with the same Timestamp value.
- invariant(future.get() == ts);
+ invariant(future.get().cloneTimestamp == details.cloneTimestamp);
}
}
@@ -149,10 +152,11 @@ void createTemporaryReshardingCollectionLocally(OperationContext* opCtx,
opCtx, reshardingNss, optionsAndIndexes);
}
-std::vector<NamespaceString> ensureStashCollectionsExist(OperationContext* opCtx,
- const ChunkManager& cm,
- const UUID& existingUUID,
- std::vector<ShardId> donorShards) {
+std::vector<NamespaceString> ensureStashCollectionsExist(
+ OperationContext* opCtx,
+ const ChunkManager& cm,
+ const UUID& existingUUID,
+ const std::vector<DonorShardFetchTimestamp>& donorShards) {
// Use the same collation for the stash collections as the temporary resharding collection
auto collator = cm.getDefaultCollator();
BSONObj collationSpec = collator ? collator->getSpec().toBSON() : BSONObj();
@@ -165,7 +169,7 @@ std::vector<NamespaceString> ensureStashCollectionsExist(OperationContext* opCtx
options.collation = std::move(collationSpec);
for (const auto& donor : donorShards) {
stashCollections.emplace_back(ReshardingOplogApplier::ensureStashCollectionExists(
- opCtx, existingUUID, donor, options));
+ opCtx, existingUUID, donor.getShardId(), options));
}
}
@@ -219,10 +223,10 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
: repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine>(),
_recipientService{recipientService},
_metadata{recipientDoc.getCommonReshardingMetadata()},
- _donorShardIds{recipientDoc.getDonorShards()},
_minimumOperationDuration{Milliseconds{recipientDoc.getMinimumOperationDurationMillis()}},
_recipientCtx{recipientDoc.getMutableState()},
- _fetchTimestamp{recipientDoc.getFetchTimestamp()},
+ _donorShards{recipientDoc.getDonorShards()},
+ _cloneTimestamp{recipientDoc.getCloneTimestamp()},
_markKilledExecutor(std::make_shared<ThreadPool>([] {
ThreadPool::Options options;
options.poolName = "RecipientStateMachineCancelableOpCtxPool";
@@ -391,9 +395,12 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange
auto coordinatorState = reshardingFields.getState();
if (coordinatorState >= CoordinatorStateEnum::kCloning) {
- auto fetchTimestamp = reshardingFields.getRecipientFields()->getFetchTimestamp();
- invariant(fetchTimestamp);
- ensureFulfilledPromise(lk, _allDonorsPreparedToDonate, *fetchTimestamp);
+ auto recipientFields = *reshardingFields.getRecipientFields();
+ invariant(recipientFields.getCloneTimestamp());
+ ensureFulfilledPromise(
+ lk,
+ _allDonorsPreparedToDonate,
+ {*recipientFields.getCloneTimestamp(), recipientFields.getDonorShards()});
}
if (coordinatorState >= CoordinatorStateEnum::kDecisionPersisted) {
@@ -405,14 +412,15 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
- invariant(_fetchTimestamp);
+ invariant(_cloneTimestamp);
return ExecutorFuture(**executor);
}
return _allDonorsPreparedToDonate.getFuture()
.thenRunOn(**executor)
- .then(
- [this](Timestamp fetchTimestamp) { _transitionToCreatingCollection(fetchTimestamp); });
+ .then([this](ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails) {
+ _transitionToCreatingCollection(std::move(cloneDetails));
+ });
}
void ReshardingRecipientService::RecipientStateMachine::
@@ -429,7 +437,7 @@ void ReshardingRecipientService::RecipientStateMachine::
_metadata.getTempReshardingNss(),
_metadata.getReshardingUUID(),
_metadata.getSourceUUID(),
- *_fetchTimestamp);
+ *_cloneTimestamp);
ShardKeyPattern shardKeyPattern{_metadata.getReshardingKey()};
@@ -457,7 +465,7 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(
OperationContext* opCtx,
bool cloningDone,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- invariant(_fetchTimestamp);
+ invariant(_cloneTimestamp);
auto myShardId = ShardingState::get(opCtx->getServiceContext())->shardId();
auto catalogCache = Grid::get(opCtx)->catalogCache();
@@ -467,8 +475,8 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(
return _dataReplicationFactory(opCtx,
_metrics(),
_metadata,
- _donorShardIds,
- *_fetchTimestamp,
+ _donorShards,
+ *_cloneTimestamp,
cloningDone,
std::move(myShardId),
std::move(sourceChunkMgr),
@@ -588,8 +596,9 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
})
.then([this] {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- for (const auto& donor : _donorShardIds) {
- auto stashNss = getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor);
+ for (const auto& donor : _donorShards) {
+ auto stashNss =
+ getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor.getShardId());
AutoGetCollection stashColl(opCtx.get(), stashNss, MODE_IS);
uassert(5356800,
"Resharding completed with non-empty stash collections",
@@ -661,14 +670,17 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
}
void ReshardingRecipientService::RecipientStateMachine::_transitionState(
- RecipientShardContext&& newRecipientCtx, boost::optional<Timestamp>&& fetchTimestamp) {
+ RecipientShardContext&& newRecipientCtx,
+ boost::optional<ReshardingRecipientService::RecipientStateMachine::CloneDetails>&&
+ cloneDetails) {
invariant(newRecipientCtx.getState() != RecipientStateEnum::kAwaitingFetchTimestamp);
// For logging purposes.
auto oldState = _recipientCtx.getState();
auto newState = newRecipientCtx.getState();
- _updateRecipientDocument(std::move(newRecipientCtx), std::move(fetchTimestamp));
+ _updateRecipientDocument(std::move(newRecipientCtx), std::move(cloneDetails));
+
_metrics()->setRecipientState(newState);
LOGV2_INFO(5279506,
@@ -681,10 +693,10 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCollection(
- Timestamp fetchTimestamp) {
+ ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kCreatingCollection);
- _transitionState(std::move(newRecipientCtx), fetchTimestamp);
+ _transitionState(std::move(newRecipientCtx), std::move(cloneDetails));
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToError(Status abortReason) {
@@ -808,7 +820,9 @@ void ReshardingRecipientService::RecipientStateMachine::insertStateDocument(
}
void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument(
- RecipientShardContext&& newRecipientCtx, boost::optional<Timestamp>&& fetchTimestamp) {
+ RecipientShardContext&& newRecipientCtx,
+ boost::optional<ReshardingRecipientService::RecipientStateMachine::CloneDetails>&&
+ cloneDetails) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
PersistentTaskStore<ReshardingRecipientDocument> store(
NamespaceString::kRecipientReshardingOperationsNamespace);
@@ -819,10 +833,20 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument
setBuilder.append(ReshardingRecipientDocument::kMutableStateFieldName,
newRecipientCtx.toBSON());
- if (fetchTimestamp) {
- setBuilder.append(ReshardingRecipientDocument::kFetchTimestampFieldName,
- *fetchTimestamp);
+ if (cloneDetails) {
+ setBuilder.append(ReshardingRecipientDocument::kCloneTimestampFieldName,
+ cloneDetails->cloneTimestamp);
+
+ BSONArrayBuilder donorShardsArrayBuilder;
+ for (const auto& donor : cloneDetails->donorShards) {
+ donorShardsArrayBuilder.append(donor.toBSON());
+ }
+
+ setBuilder.append(ReshardingRecipientDocument::kDonorShardsFieldName,
+ donorShardsArrayBuilder.arr());
}
+
+ setBuilder.doneFast();
}
store.update(opCtx.get(),
@@ -833,8 +857,9 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument
_recipientCtx = newRecipientCtx;
- if (fetchTimestamp) {
- _fetchTimestamp = fetchTimestamp;
+ if (cloneDetails) {
+ _cloneTimestamp = cloneDetails->cloneTimestamp;
+ _donorShards = std::move(cloneDetails->donorShards);
}
}
@@ -850,8 +875,9 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument
void ReshardingRecipientService::RecipientStateMachine::_dropOplogCollections(
OperationContext* opCtx) {
- for (const auto& donor : _donorShardIds) {
- auto reshardingSourceId = ReshardingSourceId{_metadata.getReshardingUUID(), donor};
+ for (const auto& donor : _donorShards) {
+ auto reshardingSourceId =
+ ReshardingSourceId{_metadata.getReshardingUUID(), donor.getShardId()};
// Remove the oplog applier progress doc for this donor.
PersistentTaskStore<ReshardingOplogApplierProgress> oplogApplierProgressStore(
@@ -871,11 +897,13 @@ void ReshardingRecipientService::RecipientStateMachine::_dropOplogCollections(
WriteConcernOptions());
// Drop the conflict stash collection for this donor.
- auto stashNss = getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor);
+ auto stashNss =
+ getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor.getShardId());
resharding::data_copy::ensureCollectionDropped(opCtx, stashNss);
// Drop the oplog buffer collection for this donor.
- auto oplogBufferNss = getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor);
+ auto oplogBufferNss =
+ getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor.getShardId());
resharding::data_copy::ensureCollectionDropped(opCtx, oplogBufferNss);
}
}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index c777bf8cc34..b83bfce32e1 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -53,10 +53,11 @@ void createTemporaryReshardingCollectionLocally(OperationContext* opCtx,
const UUID& existingUUID,
Timestamp fetchTimestamp);
-std::vector<NamespaceString> ensureStashCollectionsExist(OperationContext* opCtx,
- const ChunkManager& cm,
- const UUID& existingUUID,
- std::vector<ShardId> donorShards);
+std::vector<NamespaceString> ensureStashCollectionsExist(
+ OperationContext* opCtx,
+ const ChunkManager& cm,
+ const UUID& existingUUID,
+ const std::vector<DonorShardFetchTimestamp>& donorShards);
ReshardingDonorOplogId getFetcherIdToResumeFrom(OperationContext* opCtx,
NamespaceString oplogBufferNss,
@@ -101,6 +102,11 @@ public:
class ReshardingRecipientService::RecipientStateMachine final
: public repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine> {
public:
+ struct CloneDetails {
+ Timestamp cloneTimestamp;
+ std::vector<DonorShardFetchTimestamp> donorShards;
+ };
+
explicit RecipientStateMachine(const ReshardingRecipientService* recipientService,
const BSONObj& recipientDoc,
ReshardingDataReplicationFactory dataReplicationFactory);
@@ -162,10 +168,10 @@ private:
void _transitionState(RecipientStateEnum newState);
void _transitionState(RecipientShardContext&& newRecipientCtx,
- boost::optional<Timestamp>&& fetchTimestamp);
+ boost::optional<CloneDetails>&& cloneDetails);
// Transitions the on-disk and in-memory state to RecipientStateEnum::kCreatingCollection.
- void _transitionToCreatingCollection(Timestamp fetchTimestamp);
+ void _transitionToCreatingCollection(CloneDetails cloneDetails);
// Transitions the on-disk and in-memory state to RecipientStateEnum::kError.
void _transitionToError(Status abortReason);
@@ -176,9 +182,9 @@ private:
OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
// Updates the mutable portion of the on-disk and in-memory recipient document with
- // 'newRecipientCtx' and 'fetchTimestamp'.
+ // 'newRecipientCtx', 'fetchTimestamp and 'donorShards'.
void _updateRecipientDocument(RecipientShardContext&& newRecipientCtx,
- boost::optional<Timestamp>&& fetchTimestamp);
+ boost::optional<CloneDetails>&& cloneDetails);
// Removes the local recipient document from disk.
void _removeRecipientDocument();
@@ -214,13 +220,13 @@ private:
// The in-memory representation of the immutable portion of the document in
// config.localReshardingOperations.recipient.
const CommonReshardingMetadata _metadata;
- const std::vector<ShardId> _donorShardIds;
const Milliseconds _minimumOperationDuration;
// The in-memory representation of the mutable portion of the document in
// config.localReshardingOperations.recipient.
RecipientShardContext _recipientCtx;
- boost::optional<Timestamp> _fetchTimestamp;
+ std::vector<DonorShardFetchTimestamp> _donorShards;
+ boost::optional<Timestamp> _cloneTimestamp;
// ThreadPool used by CancelableOperationContext.
// CancelableOperationContext must have a thread that is always available to it to mark its
@@ -246,7 +252,7 @@ private:
// Each promise below corresponds to a state on the recipient state machine. They are listed in
// ascending order, such that the first promise below will be the first promise fulfilled.
- SharedPromise<Timestamp> _allDonorsPreparedToDonate;
+ SharedPromise<CloneDetails> _allDonorsPreparedToDonate;
SharedPromise<void> _coordinatorHasDecisionPersisted;
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 5020b0369b6..c4a5b594045 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -154,7 +154,7 @@ public:
recipientFields.setSourceUUID(uuid);
// Populating the set of donor shard ids isn't necessary to test the functionality of
// creating the temporary resharding collection.
- recipientFields.setDonorShardIds({});
+ recipientFields.setDonorShards({});
recipientFields.setMinimumOperationDurationMillis(5000);
reshardingFields.setRecipientFields(recipientFields);
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index 8b5639d7b4e..5f8647e69df 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -56,22 +56,20 @@ constexpr auto kReshardFinalOpLogType = "reshardFinalOp"_sd;
* Emplaces the 'fetchTimestamp' onto the ClassWithFetchTimestamp if the timestamp has been
* emplaced inside the boost::optional.
*/
-template <class ClassWithFetchTimestamp>
-void emplaceFetchTimestampIfExists(ClassWithFetchTimestamp& c,
- boost::optional<Timestamp> fetchTimestamp) {
- if (!fetchTimestamp) {
+template <typename ClassWithCloneTimestamp>
+void emplaceCloneTimestampIfExists(ClassWithCloneTimestamp& c,
+ boost::optional<Timestamp> cloneTimestamp) {
+ if (!cloneTimestamp) {
return;
}
- invariant(!fetchTimestamp->isNull());
+ invariant(!cloneTimestamp->isNull());
- if (auto alreadyExistingFetchTimestamp = c.getFetchTimestamp()) {
- invariant(fetchTimestamp == alreadyExistingFetchTimestamp);
+ if (auto alreadyExistingCloneTimestamp = c.getCloneTimestamp()) {
+ invariant(cloneTimestamp == alreadyExistingCloneTimestamp);
}
- FetchTimestamp fetchTimestampStruct;
- fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp));
- c.setFetchTimestampStruct(std::move(fetchTimestampStruct));
+ c.setCloneTimestamp(*cloneTimestamp);
}
/**
diff --git a/src/mongo/s/resharding/common_types.idl b/src/mongo/s/resharding/common_types.idl
index ee3d88aab52..cc34746dd93 100644
--- a/src/mongo/s/resharding/common_types.idl
+++ b/src/mongo/s/resharding/common_types.idl
@@ -164,6 +164,20 @@ structs:
description: "Number of documents on this donor shard in the collection being
resharded."
optional: true
+
+ DonorShardFetchTimestamp:
+ description: "Used for tracking donor min fetch timestamp"
+ strict: false
+ generate_comparison_operators: false
+ fields:
+ shardId:
+ type: shard_id
+ description: "A shard identifier."
+ minFetchTimestamp:
+ type: timestamp
+ optional: true
+ description: "Timestamp after which this donor shard has guaranteed that oplog
+ entries contain recipient shard information."
RecipientShardContext:
description: "Mutable state for a recipient shard tracked both locally by a recipient shard
@@ -181,20 +195,6 @@ structs:
type: RecipientState
default: kUnused
- FetchTimestamp:
- description: "Not meant to be used directly. Only use internal fields."
- # Use strict:false to avoid complications around upgrade/downgrade. This setting has no
- # practical effect because this type is only meant to be used as a chained struct with
- # inline_chained_structs:true and only the strictness setting of the wrapping struct type
- # applies.
- strict: false
- fields:
- fetchTimestamp:
- type: timestamp
- description: "Timestamp after which it is acceptable for recipient shards to
- retrieve documents from donor shards."
- optional: true
-
ReshardingApproxCopySize:
description: "Not meant to be used directly. Only use internal fields."
# Use strict:false to avoid complications around upgrade/downgrade. This setting has no
diff --git a/src/mongo/s/resharding/type_collection_fields.idl b/src/mongo/s/resharding/type_collection_fields.idl
index 97519598445..6bdd013e496 100644
--- a/src/mongo/s/resharding/type_collection_fields.idl
+++ b/src/mongo/s/resharding/type_collection_fields.idl
@@ -55,16 +55,18 @@ structs:
TypeCollectionRecipientFields:
description: "Resharding-related fields specific to recipient shards."
- inline_chained_structs: true
- chained_structs:
- FetchTimestamp: FetchTimestampStruct
# Use strict:false to avoid complications around upgrade/downgrade. This isn't technically
# required for resharding because durable state from all resharding operations is cleaned up
# before the upgrade or downgrade can complete.
strict: false
fields:
- donorShardIds:
- type: array<shard_id>
+ cloneTimestamp:
+ type: timestamp
+ optional: true
+ description: "The timestamp for the snapshot read while cloning from the donors."
+ donorShards:
+ type: array<DonorShardFetchTimestamp>
+ description: "The list of donors to clone from."
sourceUUID:
cpp_name: sourceUUID
type: uuid