diff options
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator_service.cpp | 50 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator_service.h | 11 |
2 files changed, 42 insertions, 19 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 4fde7935a0f..c5177cffeb6 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -353,6 +353,19 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx, txnNumber, expectedNumZonesModified); } + +/** + * Extracts the ShardId from each Donor/RecipientShardEntry in participantShardEntries. + */ +template <class T> +std::vector<ShardId> extractShardIds(const std::vector<T>& participantShardEntries) { + std::vector<ShardId> shardIds; + std::transform(participantShardEntries.begin(), + participantShardEntries.end(), + shardIds.begin(), + [](auto& shardEntry) { return shardEntry.getId(); }); + return shardIds; +} } // namespace namespace resharding { @@ -511,13 +524,13 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run( std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { ExecutorFuture<void>(**executor) .then([this, executor] { return _init(executor); }) - .then([this] { _tellAllRecipientsToRefresh(); }) + .then([this, executor] { _tellAllRecipientsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsCreatedCollection(executor); }) - .then([this] { _tellAllDonorsToRefresh(); }) + .then([this, executor] { _tellAllDonorsToRefresh(executor); }) .then([this, executor] { return _awaitAllDonorsReadyToDonate(executor); }) - .then([this] { _tellAllRecipientsToRefresh(); }) + .then([this, executor] { _tellAllRecipientsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsFinishedCloning(executor); }) - .then([this] { _tellAllDonorsToRefresh(); }) + .then([this, executor] { _tellAllDonorsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); }) .then([this](const ReshardingCoordinatorDocument& updatedStateDoc) { return _commit(updatedStateDoc); @@ -531,11 +544,11 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run( return; }) .then([this, executor] { return _awaitAllRecipientsRenamedCollection(executor); }) - .then([this] { _tellAllDonorsToRefresh(); }) + .then([this, executor] { _tellAllDonorsToRefresh(executor); }) .then([this, executor] { return _awaitAllDonorsDroppedOriginalCollection(executor); }) - .then([this] { _tellAllRecipientsToRefresh(); }) - .then([this] { _tellAllDonorsToRefresh(); }) - .onError([this](Status status) { + .then([this, executor] { _tellAllRecipientsToRefresh(executor); }) + .then([this, executor] { _tellAllDonorsToRefresh(executor); }) + .onError([this, executor](Status status) { stdx::lock_guard<Latch> lg(_mutex); if (_completionPromise.getFuture().isReady()) { // interrupt() was called before we got here. @@ -551,8 +564,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run( "error"_attr = status); // TODO wait for donors and recipients to abort the operation and clean up state - _tellAllRecipientsToRefresh(); - _tellAllDonorsToRefresh(); + _tellAllRecipientsToRefresh(executor); + _tellAllDonorsToRefresh(executor); return status; }) @@ -774,10 +787,19 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_runUpdates( _coordinatorDoc = updatedCoordinatorDoc; } -// TODO -void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh() {} +void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + auto opCtx = cc().makeOperationContext(); + auto recipientIds = extractShardIds(_coordinatorDoc.getRecipientShards()); + tellShardsToRefresh( + opCtx.get(), recipientIds, _coordinatorDoc.getTempReshardingNss(), **executor); +} -// TODO -void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh() {} +void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + auto opCtx = cc().makeOperationContext(); + auto donorIds = extractShardIds(_coordinatorDoc.getDonorShards()); + tellShardsToRefresh(opCtx.get(), donorIds, _coordinatorDoc.getNss(), **executor); +} } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index df5c0aa4fd0..0f1a6ba316d 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -206,15 +206,16 @@ private: boost::optional<Timestamp> fetchTimestamp = boost::none); /** - * Sends 'flushRoutingTableCacheUpdates' for the temporary namespace to all recipient - * shards. + * Sends 'flushRoutingTableCacheUpdatesWithWriteConcern' for the temporary namespace to all + * recipient shards. */ - void _tellAllRecipientsToRefresh(); + void _tellAllRecipientsToRefresh(const std::shared_ptr<executor::ScopedTaskExecutor>& executor); /** - * Sends 'flushRoutingTableCacheUpdates' for the original namespace to all donor shards. + * Sends 'flushRoutingTableCacheUpdatesWithWriteConcern' for the original namespace to all donor + * shards. */ - void _tellAllDonorsToRefresh(); + void _tellAllDonorsToRefresh(const std::shared_ptr<executor::ScopedTaskExecutor>& executor); // The unique key for a given resharding operation. InstanceID is an alias for BSONObj. The // value of this is the UUID that will be used as the collection UUID for the new sharded |