summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp50
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h11
2 files changed, 42 insertions, 19 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 4fde7935a0f..c5177cffeb6 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -353,6 +353,19 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx,
txnNumber,
expectedNumZonesModified);
}
+
+/**
+ * Extracts the ShardId from each Donor/RecipientShardEntry in participantShardEntries.
+ */
+template <class T>
+std::vector<ShardId> extractShardIds(const std::vector<T>& participantShardEntries) {
+ std::vector<ShardId> shardIds;
+ std::transform(participantShardEntries.begin(),
+ participantShardEntries.end(),
+ shardIds.begin(),
+ [](auto& shardEntry) { return shardEntry.getId(); });
+ return shardIds;
+}
} // namespace
namespace resharding {
@@ -511,13 +524,13 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
ExecutorFuture<void>(**executor)
.then([this, executor] { return _init(executor); })
- .then([this] { _tellAllRecipientsToRefresh(); })
+ .then([this, executor] { _tellAllRecipientsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsCreatedCollection(executor); })
- .then([this] { _tellAllDonorsToRefresh(); })
+ .then([this, executor] { _tellAllDonorsToRefresh(executor); })
.then([this, executor] { return _awaitAllDonorsReadyToDonate(executor); })
- .then([this] { _tellAllRecipientsToRefresh(); })
+ .then([this, executor] { _tellAllRecipientsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsFinishedCloning(executor); })
- .then([this] { _tellAllDonorsToRefresh(); })
+ .then([this, executor] { _tellAllDonorsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); })
.then([this](const ReshardingCoordinatorDocument& updatedStateDoc) {
return _commit(updatedStateDoc);
@@ -531,11 +544,11 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run(
return;
})
.then([this, executor] { return _awaitAllRecipientsRenamedCollection(executor); })
- .then([this] { _tellAllDonorsToRefresh(); })
+ .then([this, executor] { _tellAllDonorsToRefresh(executor); })
.then([this, executor] { return _awaitAllDonorsDroppedOriginalCollection(executor); })
- .then([this] { _tellAllRecipientsToRefresh(); })
- .then([this] { _tellAllDonorsToRefresh(); })
- .onError([this](Status status) {
+ .then([this, executor] { _tellAllRecipientsToRefresh(executor); })
+ .then([this, executor] { _tellAllDonorsToRefresh(executor); })
+ .onError([this, executor](Status status) {
stdx::lock_guard<Latch> lg(_mutex);
if (_completionPromise.getFuture().isReady()) {
// interrupt() was called before we got here.
@@ -551,8 +564,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run(
"error"_attr = status);
// TODO wait for donors and recipients to abort the operation and clean up state
- _tellAllRecipientsToRefresh();
- _tellAllDonorsToRefresh();
+ _tellAllRecipientsToRefresh(executor);
+ _tellAllDonorsToRefresh(executor);
return status;
})
@@ -774,10 +787,19 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_runUpdates(
_coordinatorDoc = updatedCoordinatorDoc;
}
-// TODO
-void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh() {}
+void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ auto opCtx = cc().makeOperationContext();
+ auto recipientIds = extractShardIds(_coordinatorDoc.getRecipientShards());
+ tellShardsToRefresh(
+ opCtx.get(), recipientIds, _coordinatorDoc.getTempReshardingNss(), **executor);
+}
-// TODO
-void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh() {}
+void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ auto opCtx = cc().makeOperationContext();
+ auto donorIds = extractShardIds(_coordinatorDoc.getDonorShards());
+ tellShardsToRefresh(opCtx.get(), donorIds, _coordinatorDoc.getNss(), **executor);
+}
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index df5c0aa4fd0..0f1a6ba316d 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -206,15 +206,16 @@ private:
boost::optional<Timestamp> fetchTimestamp = boost::none);
/**
- * Sends 'flushRoutingTableCacheUpdates' for the temporary namespace to all recipient
- * shards.
+ * Sends 'flushRoutingTableCacheUpdatesWithWriteConcern' for the temporary namespace to all
+ * recipient shards.
*/
- void _tellAllRecipientsToRefresh();
+ void _tellAllRecipientsToRefresh(const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
/**
- * Sends 'flushRoutingTableCacheUpdates' for the original namespace to all donor shards.
+ * Sends 'flushRoutingTableCacheUpdatesWithWriteConcern' for the original namespace to all donor
+ * shards.
*/
- void _tellAllDonorsToRefresh();
+ void _tellAllDonorsToRefresh(const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
// The unique key for a given resharding operation. InstanceID is an alias for BSONObj. The
// value of this is the UUID that will be used as the collection UUID for the new sharded