summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2021-07-12 17:58:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-08-19 19:18:21 +0000
commit98937ba21a64a127d6238d641fb676bdef797cf4 (patch)
treeff5df17a4f2006a28eeb5b9f42d95d5e467555ac
parent8c0f53e5b73298c520cf09f2f3d0dc7853d58011 (diff)
downloadmongo-98937ba21a64a127d6238d641fb676bdef797cf4.tar.gz
SERVER-58081 Make _flushReshardingStateChange return instead of blocking if the critical section is held
(cherry picked from commit 2ca1f733d619809d1e712860fc0070f0cc8d81f5)
-rw-r--r--src/mongo/db/s/flush_resharding_state_change_command.cpp52
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp128
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h28
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp11
4 files changed, 161 insertions, 58 deletions
diff --git a/src/mongo/db/s/flush_resharding_state_change_command.cpp b/src/mongo/db/s/flush_resharding_state_change_command.cpp
index a159c530de0..b97dd33b5ea 100644
--- a/src/mongo/db/s/flush_resharding_state_change_command.cpp
+++ b/src/mongo/db/s/flush_resharding_state_change_command.cpp
@@ -35,19 +35,45 @@
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/privilege.h"
+#include "mongo/db/catalog_raii.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog_cache_loader.h"
+#include "mongo/s/grid.h"
#include "mongo/s/request_types/flush_resharding_state_change_gen.h"
namespace mongo {
namespace {
+void doNoopWrite(OperationContext* opCtx, const NamespaceString& nss) {
+ writeConflictRetry(
+ opCtx, "_flushReshardingStateChange no-op", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite);
+
+ const std::string msg = str::stream()
+ << "no-op for _flushReshardingStateChange on " << nss;
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage(
+ opCtx,
+ {},
+ boost::none,
+ BSON("msg" << msg),
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
+ wuow.commit();
+ });
+}
+
class FlushReshardingStateChangeCmd final : public TypedCommand<FlushReshardingStateChangeCmd> {
public:
using Request = _flushReshardingStateChange;
@@ -101,11 +127,27 @@ public:
"Can't call _flushReshardingStateChange if in read-only mode",
!storageGlobalParams.readOnly);
- onShardVersionMismatch(opCtx, ns(), boost::none /* shardVersionReceived */);
-
- CatalogCacheLoader::get(opCtx).waitForCollectionFlush(opCtx, ns());
-
- repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
+ ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor())
+ .then([svcCtx = opCtx->getServiceContext(), nss = ns()] {
+ ThreadClient tc("FlushReshardingStateChange", svcCtx);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillableByStepdown(lk);
+ }
+
+ auto opCtx = tc->makeOperationContext();
+ onShardVersionMismatch(
+ opCtx.get(), nss, boost::none /* shardVersionReceived */);
+ })
+ .onError([](const Status& status) {
+ LOGV2_WARNING(5808100,
+ "Error on deferred _flushReshardingStateChange execution",
+ "error"_attr = redact(status));
+ })
+ .getAsync([](auto) {});
+
+ // Ensure the command isn't run on a stale primary.
+ doNoopWrite(opCtx, ns());
}
};
} _flushReshardingStateChange;
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index acfa9b205cc..3ddaae7ff63 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -58,6 +58,7 @@
#include "mongo/s/request_types/abort_reshard_collection_gen.h"
#include "mongo/s/request_types/commit_reshard_collection_gen.h"
#include "mongo/s/request_types/flush_resharding_state_change_gen.h"
+#include "mongo/s/request_types/flush_routing_table_cache_updates_gen.h"
#include "mongo/s/shard_id.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/fail_point.h"
@@ -593,6 +594,13 @@ void executeMetadataChangesInTxn(
});
}
+BSONObj makeFlushRoutingTableCacheUpdatesCmd(const NamespaceString& nss) {
+ auto cmd = _flushRoutingTableCacheUpdatesWithWriteConcern(nss);
+ cmd.setSyncFromConfig(true);
+ cmd.setDbName(nss.db());
+ return CommandHelpers::appendMajorityWriteConcern(cmd.toBSON({})).getOwned();
+}
+
} // namespace
namespace resharding {
@@ -1071,9 +1079,9 @@ ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsReshard
})
.then([this, executor] {
pauseBeforeTellDonorToRefresh.pauseWhileSet();
- _tellAllDonorsToRefresh(executor);
+ _establishAllDonorsAsParticipants(executor);
})
- .then([this, executor] { _tellAllRecipientsToRefresh(executor); })
+ .then([this, executor] { _establishAllRecipientsAsParticipants(executor); })
.onCompletion([this](Status status) {
// Swap back to using operation contexts canceled upon abort until ready to
// persist the decision or unrecoverable error.
@@ -1242,7 +1250,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_commitAndFinishReshardOper
return _commit(updatedCoordinatorDoc);
})
.then([this, executor] {
- _tellAllParticipantsToRefresh(_coordinatorDoc.getSourceNss(), executor);
+ _tellAllParticipantsToCommit(_coordinatorDoc.getSourceNss(), executor);
})
.then([this] { _updateChunkImbalanceMetrics(_coordinatorDoc.getSourceNss()); })
.then([this, executor] { return _awaitAllParticipantShardsDone(executor); })
@@ -1278,6 +1286,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_commitAndFinishReshardOper
"error"_attr = redact(status));
});
}
+
SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& stepdownToken) noexcept {
@@ -1740,11 +1749,66 @@ void ReshardingCoordinatorService::ReshardingCoordinator::
installCoordinatorDoc(opCtx.get(), updatedCoordinatorDoc);
}
-void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllParticipants(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- auto recipientIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
+ auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
+ auto recipientShardIds =
+ extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
+ std::set<ShardId> participantShardIds{donorShardIds.begin(), donorShardIds.end()};
+ participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end());
+ _reshardingCoordinatorExternalState->sendCommandToShards(
+ opCtx.get(),
+ NamespaceString::kAdminDb,
+ command,
+ {participantShardIds.begin(), participantShardIds.end()},
+ **executor);
+}
+
+void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllRecipients(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command) {
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto recipientShardIds =
+ extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
+
+ _reshardingCoordinatorExternalState->sendCommandToShards(
+ opCtx.get(),
+ NamespaceString::kAdminDb,
+ command,
+ {recipientShardIds.begin(), recipientShardIds.end()},
+ **executor);
+}
+
+void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllDonors(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command) {
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
+
+ _reshardingCoordinatorExternalState->sendCommandToShards(
+ opCtx.get(),
+ NamespaceString::kAdminDb,
+ command,
+ {donorShardIds.begin(), donorShardIds.end()},
+ **executor);
+}
+
+void ReshardingCoordinatorService::ReshardingCoordinator::_establishAllDonorsAsParticipants(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ invariant(_coordinatorDoc.getState() == CoordinatorStateEnum::kPreparingToDonate);
+ auto flushCmd = makeFlushRoutingTableCacheUpdatesCmd(_coordinatorDoc.getSourceNss());
+ _sendCommandToAllDonors(executor, flushCmd);
+}
+
+void ReshardingCoordinatorService::ReshardingCoordinator::_establishAllRecipientsAsParticipants(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ invariant(_coordinatorDoc.getState() == CoordinatorStateEnum::kPreparingToDonate);
+ auto flushCmd = makeFlushRoutingTableCacheUpdatesCmd(_coordinatorDoc.getTempReshardingNss());
+ _sendCommandToAllRecipients(executor, flushCmd);
+}
+
+void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
NamespaceString nssToRefresh;
// Refresh the temporary namespace if the coordinator is in a state prior to 'kCommitting'.
// A refresh of recipients while in 'kCommitting' should be accompanied by a refresh of
@@ -1757,68 +1821,30 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRe
auto refreshCmd =
createFlushReshardingStateChangeCommand(nssToRefresh, _coordinatorDoc.getReshardingUUID());
-
- _reshardingCoordinatorExternalState->sendCommandToShards(
- opCtx.get(),
- NamespaceString::kAdminDb,
- refreshCmd,
- {recipientIds.begin(), recipientIds.end()},
- **executor);
+ _sendCommandToAllRecipients(executor, refreshCmd);
}
void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- auto donorIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
-
auto refreshCmd = createFlushReshardingStateChangeCommand(_coordinatorDoc.getSourceNss(),
_coordinatorDoc.getReshardingUUID());
- _reshardingCoordinatorExternalState->sendCommandToShards(opCtx.get(),
- NamespaceString::kAdminDb,
- refreshCmd,
- {donorIds.begin(), donorIds.end()},
- **executor);
+ _sendCommandToAllDonors(executor, refreshCmd);
}
-void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToRefresh(
+void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToCommit(
const NamespaceString& nss, const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
-
- auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
- auto recipientShardIds =
- extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
- std::set<ShardId> participantShardIds{donorShardIds.begin(), donorShardIds.end()};
- participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end());
-
auto commitCmd =
createShardsvrCommitReshardCollectionCmd(nss, _coordinatorDoc.getReshardingUUID());
- _reshardingCoordinatorExternalState->sendCommandToShards(
- opCtx.get(),
- NamespaceString::kAdminDb,
- commitCmd,
- {participantShardIds.begin(), participantShardIds.end()},
- **executor);
+ _sendCommandToAllParticipants(executor, commitCmd);
}
void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToAbort(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor, bool isUserAborted) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
-
- auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
- auto recipientShardIds =
- extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
- std::set<ShardId> participantShardIds{donorShardIds.begin(), donorShardIds.end()};
- participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end());
-
ShardsvrAbortReshardCollection abortCmd(_coordinatorDoc.getReshardingUUID(), isUserAborted);
abortCmd.setDbName("admin");
-
- sharding_util::sendCommandToShards(opCtx.get(),
- NamespaceString::kAdminDb,
- abortCmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField
- << WriteConcernOptions::Majority)),
- {participantShardIds.begin(), participantShardIds.end()},
- **executor);
+ _sendCommandToAllParticipants(executor,
+ abortCmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField
+ << WriteConcernOptions::Majority)));
}
void ReshardingCoordinatorService::ReshardingCoordinator::_updateChunkImbalanceMetrics(
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index ec479666469..ec34d0c2a88 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -411,6 +411,30 @@ private:
boost::optional<Status> abortReason = boost::none);
/**
+ * Sends the command to the specified participants asynchronously.
+ */
+ void _sendCommandToAllParticipants(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command);
+ void _sendCommandToAllDonors(const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const BSONObj& command);
+ void _sendCommandToAllRecipients(const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const BSONObj& command);
+
+ /**
+ * Sends '_flushRoutingTableCacheUpdatesWithWriteConcern' to ensure donor state machine creation
+ * by the time the refresh completes.
+ */
+ void _establishAllDonorsAsParticipants(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Sends '_flushRoutingTableCacheUpdatesWithWriteConcern' to ensure recipient state machine
+ * creation by the time the refresh completes.
+ */
+ void _establishAllRecipientsAsParticipants(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
* Sends '_flushReshardingStateChange' to all recipient shards.
*
* When the coordinator is in a state before 'kCommitting', refreshes the temporary
@@ -425,9 +449,9 @@ private:
void _tellAllDonorsToRefresh(const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
/**
- * Sends '_flushReshardingStateChange' for the original namespace to all participant shards.
+ * Sends '_shardsvrCommitReshardCollection' to all participant shards.
*/
- void _tellAllParticipantsToRefresh(
+ void _tellAllParticipantsToCommit(
const NamespaceString& nss, const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
/**
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 8e1a936a524..f7342dec5cd 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -344,6 +344,17 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_finishReshardin
_dropOriginalCollectionThenTransitionToDone();
} else if (_donorCtx.getState() != DonorStateEnum::kDone) {
+ {
+ // Unblock the RecoverRefreshThread as quickly as possible when aborting.
+ stdx::lock_guard<Latch> lk(_mutex);
+ ensureFulfilledPromise(lk,
+ _critSecWasAcquired,
+ {ErrorCodes::ReshardCollectionAborted, "aborted"});
+ ensureFulfilledPromise(lk,
+ _critSecWasPromoted,
+ {ErrorCodes::ReshardCollectionAborted, "aborted"});
+ }
+
// If aborted, the donor must be allowed to transition to done from any state.
_transitionState(DonorStateEnum::kDone);
}