summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2021-06-15 14:43:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-08-11 20:38:34 +0000
commit900707746299e4684fa6b7f29beb65f6ff13b97c (patch)
tree6324cfd18872eed8e2bb3140464dda668559153e
parent3be1e000ea09a88202cf7bf754d4d490ba2863e7 (diff)
downloadmongo-900707746299e4684fa6b7f29beb65f6ff13b97c.tar.gz
SERVER-50937 Refactor resharding coordinator to consolidate distinct error handling into separate phases.
(cherry picked from commit cbddf73dc78aa6a208fe3a43ca5e8674f67d5b87)
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp136
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h21
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp376
-rw-r--r--src/mongo/db/s/resharding/resharding_service_test_helpers.cpp3
5 files changed, 367 insertions, 171 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index d27aea7d17e..7bd2b48d0a7 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -430,6 +430,7 @@ env.Library(
target='sharding_mongod_test_fixture',
source=[
'sharding_mongod_test_fixture.cpp',
+ 'resharding/resharding_service_test_helpers.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/repl/drop_pending_collection_reaper',
@@ -502,7 +503,6 @@ env.CppUnitTest(
'resharding/resharding_recipient_service_external_state_test.cpp',
'resharding/resharding_recipient_service_test.cpp',
'resharding/resharding_donor_service_test.cpp',
- 'resharding/resharding_service_test_helpers.cpp',
'resharding/resharding_txn_cloner_test.cpp',
'session_catalog_migration_destination_test.cpp',
'session_catalog_migration_source_test.cpp',
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index e438004e78f..514da4f01b3 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -699,7 +699,7 @@ void writeParticipantShardsAndTempCollInfo(
writeToConfigCollectionsForTempNss(
opCtx, updatedCoordinatorDoc, chunkVersion, CollationSpec::kSimpleSpec, txnNumber);
- insertChunkAndTagDocsForTempNss(opCtx, std::move(initialChunks), zones, txnNumber);
+ insertChunkAndTagDocsForTempNss(opCtx, initialChunks, zones, txnNumber);
});
}
@@ -981,6 +981,12 @@ ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(
}())),
_reshardingCoordinatorExternalState(externalState) {
_reshardingCoordinatorObserver = std::make_shared<ReshardingCoordinatorObserver>();
+
+ // If the coordinator is recovering from step-up, make sure to properly initialize the
+ // promises to reflect the latest state of this resharding operation.
+ if (coordinatorDoc.getState() != CoordinatorStateEnum::kUnused) {
+ _reshardingCoordinatorObserver->onReshardingParticipantTransition(coordinatorDoc);
+ }
}
void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc(
@@ -1044,50 +1050,96 @@ BSONObj createShardsvrCommitReshardCollectionCmd(const NamespaceString& nss,
BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
}
-ExecutorFuture<ReshardingCoordinatorDocument>
-ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToPersistDecision(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor) noexcept {
+ExecutorFuture<void>
+ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsReshardingStarted(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ if (_coordinatorDoc.getState() > CoordinatorStateEnum::kPreparingToDonate) {
+ return ExecutorFuture<void>(**executor, Status::OK());
+ }
+
+ return ExecutorFuture<void>(**executor)
+ .then([this] {
+ // Ensure the flushes to create participant state machines don't get interrupted
+ // upon abort.
+ _cancelableOpCtxFactory.emplace(_ctHolder->getStepdownToken(), _markKilledExecutor);
+ })
+ .then([this, executor] {
+ pauseBeforeTellDonorToRefresh.pauseWhileSet();
+ _tellAllDonorsToRefresh(executor);
+ })
+ .then([this, executor] { _tellAllRecipientsToRefresh(executor); })
+ .onCompletion([this, self = shared_from_this()](Status status) {
+ // Swap back to using operation contexts canceled upon abort until ready to
+ // persist the decision or unrecoverable error.
+ _cancelableOpCtxFactory.emplace(_ctHolder->getAbortToken(), _markKilledExecutor);
+
+ return status;
+ });
+}
+
+ExecutorFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::_initializeCoordinator(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
return ExecutorFuture<void>(**executor)
.then([this, executor] { _insertCoordDocAndChangeOrigCollEntry(); })
.then([this, executor] { _calculateParticipantsAndChunksThenWriteToDisk(); })
.onCompletion([this, executor](Status status) {
if (_ctHolder->isSteppingOrShuttingDown()) {
- // Propagate any errors from the coordinator stepping down.
return ExecutorFuture<void>(**executor, status);
}
if (_coordinatorDoc.getState() < CoordinatorStateEnum::kPreparingToDonate) {
- // Propagate any errors if the coordinator failed before transitioning to
- // kPreparingToDonate, meaning participants were never and should never be made
- // aware of this failed resharding operation.
- invariant(!status.isOK());
return ExecutorFuture<void>(**executor, status);
}
// Regardless of error or non-error, guarantee that once the coordinator completes its
// transition to kPreparingToDonate, participants are aware of the resharding operation
// and their state machines are created.
- return ExecutorFuture<void>(**executor)
- .then([this] {
- // Ensure the flushes to create participant state machines don't get interrupted
- // upon abort.
- _cancelableOpCtxFactory.emplace(_ctHolder->getStepdownToken(),
- _markKilledExecutor);
- })
- .then([this, executor] {
- pauseBeforeTellDonorToRefresh.pauseWhileSet();
-
- _tellAllDonorsToRefresh(executor);
- })
- .then([this, executor] { _tellAllRecipientsToRefresh(executor); })
- .then([this] {
- // Swap back to using operation contexts canceled upon abort until ready to
- // persist the decision or unrecoverable error.
- _cancelableOpCtxFactory.emplace(_ctHolder->getAbortToken(),
- _markKilledExecutor);
- })
- .then([status] { return status; });
+ return _tellAllParticipantsReshardingStarted(executor);
})
+ .onError([this, self = shared_from_this(), executor](Status status) -> Status {
+ {
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get());
+ }
+
+ if (_ctHolder->isSteppingOrShuttingDown()) {
+ return status;
+ }
+
+ auto nss = _coordinatorDoc.getSourceNss();
+ LOGV2(4956903,
+ "Resharding failed",
+ "namespace"_attr = nss.ns(),
+ "newShardKeyPattern"_attr = _coordinatorDoc.getReshardingKey(),
+ "error"_attr = status);
+
+ if (_ctHolder->isAborted()) {
+ // If the abort cancellation token was triggered, implying that a user ran the abort
+ // command, override status with a resharding abort error.
+ //
+ // Note for debugging purposes: Ensure the original error status is recorded in the
+ // logs before replacing it.
+ status = {ErrorCodes::ReshardCollectionAborted, "aborted"};
+ }
+
+ if (_coordinatorDoc.getState() == CoordinatorStateEnum::kUnused) {
+ return status;
+ }
+
+ if (_coordinatorDoc.getState() < CoordinatorStateEnum::kPreparingToDonate) {
+ _onAbortCoordinatorOnly(executor, status);
+ } else {
+ _onAbortCoordinatorAndParticipants(executor, status);
+ }
+
+ return status;
+ });
+}
+
+ExecutorFuture<ReshardingCoordinatorDocument>
+ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToCommit(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) noexcept {
+ return ExecutorFuture<void>(**executor)
.then([this, executor] { return _awaitAllDonorsReadyToDonate(executor); })
.then([this, executor] { _tellAllRecipientsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsFinishedCloning(executor); })
@@ -1126,28 +1178,21 @@ ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToPersistDeci
status = {ErrorCodes::ReshardCollectionAborted, "aborted"};
}
- if (_coordinatorDoc.getState() == CoordinatorStateEnum::kUnused) {
- return status;
- }
+ invariant(_coordinatorDoc.getState() >= CoordinatorStateEnum::kPreparingToDonate);
+
+ _onAbortCoordinatorAndParticipants(executor, status);
- if (_coordinatorDoc.getState() < CoordinatorStateEnum::kPreparingToDonate) {
- // Participants were never made aware of the resharding operation. Abort without
- // waiting for participant acknowledgement.
- _onAbortCoordinatorOnly(executor, status);
- } else {
- _onAbortCoordinatorAndParticipants(executor, status);
- }
return status;
});
}
ExecutorFuture<void>
-ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisionAndFinishReshardOperation(
+ReshardingCoordinatorService::ReshardingCoordinator::_commitAndFinishReshardOperation(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const ReshardingCoordinatorDocument& updatedCoordinatorDoc) noexcept {
return ExecutorFuture<void>(**executor)
.then([this, self = shared_from_this(), executor, updatedCoordinatorDoc] {
- return _persistDecision(updatedCoordinatorDoc);
+ return _commit(updatedCoordinatorDoc);
})
.then([this, self = shared_from_this(), executor] {
_tellAllParticipantsToRefresh(_coordinatorDoc.getSourceNss(), executor);
@@ -1181,10 +1226,13 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
_markKilledExecutor->startup();
_cancelableOpCtxFactory.emplace(_ctHolder->getAbortToken(), _markKilledExecutor);
- return _runUntilReadyToPersistDecision(executor)
+ return _initializeCoordinator(executor)
+ .then([this, self = shared_from_this(), executor] {
+ return _runUntilReadyToCommit(executor);
+ })
.then([this, self = shared_from_this(), executor](
const ReshardingCoordinatorDocument& updatedCoordinatorDoc) {
- return _persistDecisionAndFinishReshardOperation(executor, updatedCoordinatorDoc);
+ return _commitAndFinishReshardOperation(executor, updatedCoordinatorDoc);
})
.onCompletion([this, self = shared_from_this(), executor](Status status) {
if (!_ctHolder->isSteppingOrShuttingDown() &&
@@ -1519,7 +1567,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsInStrict
.thenRunOn(**executor);
}
-Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_persistDecision(
+Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_commit(
const ReshardingCoordinatorDocument& coordinatorDoc) {
if (_coordinatorDoc.getState() > CoordinatorStateEnum::kBlockingWrites) {
return Status::OK();
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index b0a6a3a40c1..6f52a19e11e 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -277,19 +277,31 @@ private:
};
/**
+ * Construct the initial chunks splits and write down the initial coordinator state to storage.
+ */
+ ExecutorFuture<void> _initializeCoordinator(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
* Runs resharding up through preparing to persist the decision.
*/
- ExecutorFuture<ReshardingCoordinatorDocument> _runUntilReadyToPersistDecision(
+ ExecutorFuture<ReshardingCoordinatorDocument> _runUntilReadyToCommit(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) noexcept;
/**
* Runs resharding through persisting the decision until cleanup.
*/
- ExecutorFuture<void> _persistDecisionAndFinishReshardOperation(
+ ExecutorFuture<void> _commitAndFinishReshardOperation(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const ReshardingCoordinatorDocument& updatedCoordinatorDoc) noexcept;
/**
+ * Inform all of the donors and recipients of this resharding operation to begin.
+ */
+ ExecutorFuture<void> _tellAllParticipantsReshardingStarted(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
* Runs abort cleanup logic when only the coordinator is aware of the resharding operation.
*
* Only safe to call if an unrecoverable error is encountered before the coordinator completes
@@ -373,7 +385,7 @@ private:
*
* Transitions to 'kCommitting'.
*/
- Future<void> _persistDecision(const ReshardingCoordinatorDocument& updatedDoc);
+ Future<void> _commit(const ReshardingCoordinatorDocument& updatedDoc);
/**
* Waits on _reshardingCoordinatorObserver to notify that:
@@ -459,8 +471,7 @@ private:
boost::optional<CancelableOperationContextFactory> _cancelableOpCtxFactory;
/**
- * Must be locked while the `_canEnterCritical` or `_completionPromise`
- * promises are fulfilled.
+ * Must be locked while the `_canEnterCritical` promise is being fulfilled.
*/
mutable Mutex _fulfillmentMutex =
MONGO_MAKE_LATCH("ReshardingCoordinatorService::_fulfillmentMutex");
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
index 36699779728..071ec5c94ae 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
@@ -42,7 +42,9 @@
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/config/config_server_test_fixture.h"
#include "mongo/db/s/resharding/resharding_coordinator_service.h"
+#include "mongo/db/s/resharding/resharding_op_observer.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
+#include "mongo/db/s/resharding/resharding_service_test_helpers.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/s/transaction_coordinator_service.h"
#include "mongo/idl/server_parameter_test_util.h"
@@ -55,107 +57,23 @@
namespace mongo {
namespace {
-class OpObserverForTest;
-class PauseDuringStateTransitions;
+using CoordinatorStateTransitionController =
+ resharding_service_test_helpers::StateTransitionController<CoordinatorStateEnum>;
+using OpObserverForTest =
+ resharding_service_test_helpers::OpObserverForTest<CoordinatorStateEnum,
+ ReshardingCoordinatorDocument>;
+using PauseDuringStateTransitions =
+ resharding_service_test_helpers::PauseDuringStateTransitions<CoordinatorStateEnum>;
-class CoordinatorStateTransitionController {
+class CoordinatorOpObserverForTest : public OpObserverForTest {
public:
- CoordinatorStateTransitionController() = default;
+ CoordinatorOpObserverForTest(std::shared_ptr<CoordinatorStateTransitionController> controller)
+ : OpObserverForTest(std::move(controller),
+ NamespaceString::kConfigReshardingOperationsNamespace) {}
- void waitUntilStateIsReached(CoordinatorStateEnum state) {
- stdx::unique_lock lk(_mutex);
- _waitUntilUnpausedCond.wait(lk, [this, state] { return _state == state; });
+ CoordinatorStateEnum getState(const ReshardingCoordinatorDocument& coordinatorDoc) override {
+ return coordinatorDoc.getState();
}
-
-private:
- friend OpObserverForTest;
- friend PauseDuringStateTransitions;
-
- void setPauseDuringTransition(CoordinatorStateEnum state) {
- stdx::lock_guard lk(_mutex);
- _pauseDuringTransition.insert(state);
- }
-
- void unsetPauseDuringTransition(CoordinatorStateEnum state) {
- stdx::lock_guard lk(_mutex);
- _pauseDuringTransition.erase(state);
- _pauseDuringTransitionCond.notify_all();
- }
-
- void notifyNewStateAndWaitUntilUnpaused(OperationContext* opCtx,
- CoordinatorStateEnum newState) {
- stdx::unique_lock lk(_mutex);
- _state = newState;
- _waitUntilUnpausedCond.notify_all();
- opCtx->waitForConditionOrInterrupt(_pauseDuringTransitionCond, lk, [this, newState] {
- return _pauseDuringTransition.count(newState) == 0;
- });
- }
-
- Mutex _mutex = MONGO_MAKE_LATCH("CoordinatorStateTransitionController::_mutex");
- stdx::condition_variable _pauseDuringTransitionCond;
- stdx::condition_variable _waitUntilUnpausedCond;
-
- std::set<CoordinatorStateEnum> _pauseDuringTransition;
- CoordinatorStateEnum _state = CoordinatorStateEnum::kUnused;
-};
-
-class PauseDuringStateTransitions {
-public:
- PauseDuringStateTransitions(CoordinatorStateTransitionController* controller,
- CoordinatorStateEnum state)
- : PauseDuringStateTransitions(controller, std::vector<CoordinatorStateEnum>{state}) {}
-
- PauseDuringStateTransitions(CoordinatorStateTransitionController* controller,
- std::vector<CoordinatorStateEnum> states)
- : _controller{controller}, _states{std::move(states)} {
- for (auto state : _states) {
- _controller->setPauseDuringTransition(state);
- }
- }
-
- ~PauseDuringStateTransitions() {
- for (auto state : _states) {
- _controller->unsetPauseDuringTransition(state);
- }
- }
-
- PauseDuringStateTransitions(const PauseDuringStateTransitions&) = delete;
- PauseDuringStateTransitions& operator=(const PauseDuringStateTransitions&) = delete;
-
- PauseDuringStateTransitions(PauseDuringStateTransitions&&) = delete;
- PauseDuringStateTransitions& operator=(PauseDuringStateTransitions&&) = delete;
-
- void wait(CoordinatorStateEnum state) {
- _controller->waitUntilStateIsReached(state);
- }
-
- void unset(CoordinatorStateEnum state) {
- _controller->unsetPauseDuringTransition(state);
- }
-
-private:
- CoordinatorStateTransitionController* const _controller;
- const std::vector<CoordinatorStateEnum> _states;
-};
-
-class OpObserverForTest : public OpObserverNoop {
-public:
- OpObserverForTest(std::shared_ptr<CoordinatorStateTransitionController> controller)
- : _controller{std::move(controller)} {}
-
- void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override {
- if (args.nss != NamespaceString::kConfigReshardingOperationsNamespace) {
- return;
- }
- auto doc =
- ReshardingCoordinatorDocument::parse({"OpObserverForTest"}, args.updateArgs.updatedDoc);
-
- _controller->notifyNewStateAndWaitUntilUnpaused(opCtx, doc.getState());
- }
-
-private:
- std::shared_ptr<CoordinatorStateTransitionController> _controller;
};
class ExternalStateForTest : public ReshardingCoordinatorExternalState {
@@ -184,7 +102,8 @@ class ExternalStateForTest : public ReshardingCoordinatorExternalState {
version.incMinor();
}
}
- return ParticipantShardsAndChunks({{}, {}, initialChunks});
+ return ParticipantShardsAndChunks(
+ {coordinatorDoc.getDonorShards(), coordinatorDoc.getRecipientShards(), initialChunks});
}
void sendCommandToShards(OperationContext* opCtx,
@@ -247,7 +166,9 @@ public:
invariant(_opObserverRegistry);
_opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
- _opObserverRegistry->addObserver(std::make_unique<OpObserverForTest>(_controller));
+ _opObserverRegistry->addObserver(std::make_unique<ReshardingOpObserver>());
+ _opObserverRegistry->addObserver(
+ std::make_unique<CoordinatorOpObserverForTest>(_controller));
_registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext());
auto service = makeService(getServiceContext());
@@ -283,8 +204,61 @@ public:
return doc;
}
- void notifyDonorsReadyToDonate(ReshardingCoordinator& coordinator,
- ReshardingCoordinatorDocument& coordDoc) {
+ std::shared_ptr<ReshardingCoordinatorService::ReshardingCoordinator> getCoordinator(
+ OperationContext* opCtx, repl::PrimaryOnlyService::InstanceID instanceId) {
+ auto coordinator = getCoordinatorIfExists(opCtx, instanceId);
+ ASSERT_TRUE(bool(coordinator));
+ return coordinator;
+ }
+
+ std::shared_ptr<ReshardingCoordinatorService::ReshardingCoordinator> getCoordinatorIfExists(
+ OperationContext* opCtx, repl::PrimaryOnlyService::InstanceID instanceId) {
+ auto coordinatorOpt = ReshardingCoordinatorService::ReshardingCoordinator::lookup(
+ opCtx, _service, instanceId);
+ if (!coordinatorOpt) {
+ return nullptr;
+ }
+
+ auto coordinator = *coordinatorOpt;
+ return coordinator ? coordinator : nullptr;
+ }
+
+ ReshardingCoordinatorDocument getCoordinatorDoc(OperationContext* opCtx) {
+ DBDirectClient client(opCtx);
+
+ auto doc = client.findOne(NamespaceString::kConfigReshardingOperationsNamespace.ns(), {});
+ IDLParserErrorContext errCtx("reshardingCoordFromTest");
+ return ReshardingCoordinatorDocument::parse(errCtx, doc);
+ }
+
+ void replaceCoordinatorDoc(OperationContext* opCtx,
+ const ReshardingCoordinatorDocument& newDoc) {
+ DBDirectClient client(opCtx);
+
+ const BSONObj query(BSON(ReshardingCoordinatorDocument::kReshardingUUIDFieldName
+ << newDoc.getReshardingUUID()));
+ client.update(
+ NamespaceString::kConfigReshardingOperationsNamespace.ns(), {}, newDoc.toBSON());
+ }
+
+ void waitUntilCommittedCoordinatorDocReach(OperationContext* opCtx,
+ CoordinatorStateEnum state) {
+ DBDirectClient client(opCtx);
+
+ while (true) {
+ auto coordinatorDoc = getCoordinatorDoc(opCtx);
+
+ if (coordinatorDoc.getState() == state) {
+ break;
+ }
+
+ sleepmillis(50);
+ }
+ }
+
+ void makeDonorsReadyToDonate(OperationContext* opCtx) {
+ auto coordDoc = getCoordinatorDoc(opCtx);
+
auto donorShards = coordDoc.getDonorShards();
DonorShardContext donorCtx;
donorCtx.setState(DonorStateEnum::kDonatingInitialData);
@@ -294,11 +268,12 @@ public:
}
coordDoc.setDonorShards(donorShards);
- coordinator.getObserver()->onReshardingParticipantTransition(coordDoc);
+ replaceCoordinatorDoc(opCtx, coordDoc);
}
- void notifyRecipientsFinishedCloning(ReshardingCoordinator& coordinator,
- ReshardingCoordinatorDocument& coordDoc) {
+ void makeRecipientsFinishedCloning(OperationContext* opCtx) {
+ auto coordDoc = getCoordinatorDoc(opCtx);
+
auto shards = coordDoc.getRecipientShards();
RecipientShardContext ctx;
ctx.setState(RecipientStateEnum::kApplying);
@@ -307,11 +282,12 @@ public:
}
coordDoc.setRecipientShards(shards);
- coordinator.getObserver()->onReshardingParticipantTransition(coordDoc);
+ replaceCoordinatorDoc(opCtx, coordDoc);
}
- void notifyRecipientsInStrictConsistency(ReshardingCoordinator& coordinator,
- ReshardingCoordinatorDocument& coordDoc) {
+ void makeRecipientsBeInStrictConsistency(OperationContext* opCtx) {
+ auto coordDoc = getCoordinatorDoc(opCtx);
+
auto shards = coordDoc.getRecipientShards();
RecipientShardContext ctx;
ctx.setState(RecipientStateEnum::kStrictConsistency);
@@ -320,11 +296,11 @@ public:
}
coordDoc.setRecipientShards(shards);
- coordinator.getObserver()->onReshardingParticipantTransition(coordDoc);
+ replaceCoordinatorDoc(opCtx, coordDoc);
}
- void notifyDonorsDone(ReshardingCoordinator& coordinator,
- ReshardingCoordinatorDocument& coordDoc) {
+ void makeDonorsProceedToDone(OperationContext* opCtx) {
+ auto coordDoc = getCoordinatorDoc(opCtx);
auto donorShards = coordDoc.getDonorShards();
DonorShardContext donorCtx;
donorCtx.setState(DonorStateEnum::kDone);
@@ -332,13 +308,12 @@ public:
shard.setMutableState(donorCtx);
}
coordDoc.setDonorShards(donorShards);
- coordDoc.setState(CoordinatorStateEnum::kCommitting);
- coordinator.getObserver()->onReshardingParticipantTransition(coordDoc);
+ replaceCoordinatorDoc(opCtx, coordDoc);
}
- void notifyRecipientsDone(ReshardingCoordinator& coordinator,
- ReshardingCoordinatorDocument& coordDoc) {
+ void makeRecipientsProceedToDone(OperationContext* opCtx) {
+ auto coordDoc = getCoordinatorDoc(opCtx);
auto shards = coordDoc.getRecipientShards();
RecipientShardContext ctx;
ctx.setState(RecipientStateEnum::kDone);
@@ -346,9 +321,8 @@ public:
shard.setMutableState(ctx);
}
coordDoc.setRecipientShards(shards);
- coordDoc.setState(CoordinatorStateEnum::kCommitting);
- coordinator.getObserver()->onReshardingParticipantTransition(coordDoc);
+ replaceCoordinatorDoc(opCtx, coordDoc);
}
CollectionType makeOriginalCollectionCatalogEntry(
@@ -464,6 +438,51 @@ public:
return donorChunk;
}
+ void stepUp(OperationContext* opCtx) {
+ auto replCoord = repl::ReplicationCoordinator::get(getServiceContext());
+ auto currOpTime = replCoord->getMyLastAppliedOpTime();
+
+ // Advance the term and last applied opTime. We retain the timestamp component of the
+ // current last applied opTime to avoid log messages from
+ // ReplClientInfo::setLastOpToSystemLastOpTime() about the opTime having moved backwards.
+ ++_term;
+ auto newOpTime = repl::OpTime{currOpTime.getTimestamp(), _term};
+
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ ASSERT_OK(replCoord->updateTerm(opCtx, _term));
+ replCoord->setMyLastAppliedOpTimeAndWallTime({newOpTime, {}});
+
+ _registry->onStepUpComplete(opCtx, _term);
+ }
+
+ void stepDown(OperationContext* opCtx) {
+ ASSERT_OK(repl::ReplicationCoordinator::get(getServiceContext())
+ ->setFollowerMode(repl::MemberState::RS_SECONDARY));
+ _registry->onStepDown();
+
+ // Some opCtx can be created via AlternativeClientRegion and not tied to any resharding
+ // cancellation token, so we also need to simulate the repl step down killOp thread.
+
+ auto serviceCtx = opCtx->getServiceContext();
+ for (ServiceContext::LockedClientsCursor cursor(serviceCtx);
+ Client* client = cursor.next();) {
+ stdx::lock_guard<Client> lk(*client);
+ if (client->isFromSystemConnection() && !client->canKillSystemOperationInStepdown(lk)) {
+ continue;
+ }
+
+ OperationContext* toKill = client->getOperationContext();
+
+ if (toKill && !toKill->isKillPending() && toKill->getOpID() != opCtx->getOpID()) {
+ auto locker = toKill->lockState();
+ if (toKill->shouldAlwaysInterruptAtStepDownOrUp() ||
+ locker->wasGlobalLockTakenInModeConflictingWithWrites()) {
+ serviceCtx->killOperation(lk, toKill);
+ }
+ }
+ }
+ }
+
repl::PrimaryOnlyService* _service = nullptr;
std::shared_ptr<CoordinatorStateTransitionController> _controller;
@@ -497,6 +516,8 @@ public:
"reshardingMinimumOperationDurationMillis", 0};
const std::vector<ShardId> _recipientShards = {{"shard0001"}};
+
+ long long _term = 0;
};
TEST_F(ReshardingCoordinatorServiceTest, ReshardingCoordinatorSuccessfullyTransitionsTokDone) {
@@ -524,31 +545,144 @@ TEST_F(ReshardingCoordinatorServiceTest, ReshardingCoordinatorSuccessfullyTransi
doc.setPresetReshardedChunks(presetReshardedChunks);
auto coordinator = ReshardingCoordinator::getOrCreate(opCtx, _service, doc.toBSON());
+
stateTransitionsGuard.wait(CoordinatorStateEnum::kPreparingToDonate);
stateTransitionsGuard.unset(CoordinatorStateEnum::kPreparingToDonate);
- notifyDonorsReadyToDonate(*coordinator, doc);
+ waitUntilCommittedCoordinatorDocReach(opCtx, CoordinatorStateEnum::kPreparingToDonate);
+ makeDonorsReadyToDonate(opCtx);
stateTransitionsGuard.wait(CoordinatorStateEnum::kCloning);
stateTransitionsGuard.unset(CoordinatorStateEnum::kCloning);
+ waitUntilCommittedCoordinatorDocReach(opCtx, CoordinatorStateEnum::kCloning);
- notifyRecipientsFinishedCloning(*coordinator, doc);
+ makeRecipientsFinishedCloning(opCtx);
stateTransitionsGuard.wait(CoordinatorStateEnum::kApplying);
stateTransitionsGuard.unset(CoordinatorStateEnum::kApplying);
+ waitUntilCommittedCoordinatorDocReach(opCtx, CoordinatorStateEnum::kApplying);
coordinator->onOkayToEnterCritical();
stateTransitionsGuard.wait(CoordinatorStateEnum::kBlockingWrites);
stateTransitionsGuard.unset(CoordinatorStateEnum::kBlockingWrites);
+ waitUntilCommittedCoordinatorDocReach(opCtx, CoordinatorStateEnum::kBlockingWrites);
- notifyRecipientsInStrictConsistency(*coordinator, doc);
+ makeRecipientsBeInStrictConsistency(opCtx);
stateTransitionsGuard.wait(CoordinatorStateEnum::kCommitting);
stateTransitionsGuard.unset(CoordinatorStateEnum::kCommitting);
- notifyDonorsDone(*coordinator, doc);
- notifyRecipientsDone(*coordinator, doc);
+ waitUntilCommittedCoordinatorDocReach(opCtx, CoordinatorStateEnum::kCommitting);
+
+ makeDonorsProceedToDone(opCtx);
+ makeRecipientsProceedToDone(opCtx);
coordinator->getCompletionFuture().get(opCtx);
}
+/**
+ * Test stepping down right when coordinator doc is being updated. Causing the change to be
+ * rolled back and redo the work again on step up.
+ */
+TEST_F(ReshardingCoordinatorServiceTest, StepDownStepUpEachTransition) {
+ const std::vector<CoordinatorStateEnum> coordinatorStates{
+ // Skip kInitializing, as we don't write that state transition to storage.
+ CoordinatorStateEnum::kPreparingToDonate,
+ CoordinatorStateEnum::kCloning,
+ CoordinatorStateEnum::kApplying,
+ CoordinatorStateEnum::kBlockingWrites,
+ CoordinatorStateEnum::kCommitting,
+ CoordinatorStateEnum::kDone};
+ PauseDuringStateTransitions stateTransitionsGuard{controller(), coordinatorStates};
+
+ auto doc = insertStateAndCatalogEntries(CoordinatorStateEnum::kUnused, _originalEpoch);
+ auto opCtx = operationContext();
+ auto donorChunk = makeAndInsertChunksForDonorShard(
+ _originalUUID, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()});
+
+ auto initialChunks =
+ makeChunks(_reshardingUUID, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()});
+
+ std::vector<ReshardedChunk> presetReshardedChunks;
+ for (const auto& chunk : initialChunks) {
+ presetReshardedChunks.emplace_back(chunk.getShard(), chunk.getMin(), chunk.getMax());
+ }
+
+ doc.setPresetReshardedChunks(presetReshardedChunks);
+
+ (void)ReshardingCoordinator::getOrCreate(opCtx, _service, doc.toBSON());
+ auto instanceId =
+ BSON(ReshardingCoordinatorDocument::kReshardingUUIDFieldName << doc.getReshardingUUID());
+
+ for (const auto state : coordinatorStates) {
+ auto coordinator = getCoordinator(opCtx, instanceId);
+
+ LOGV2(5093701,
+ "Running step down test case",
+ "stepDownAfter"_attr = (CoordinatorState_serializer(state).toString()));
+
+ switch (state) {
+ case CoordinatorStateEnum::kCloning: {
+ makeDonorsReadyToDonate(opCtx);
+ break;
+ }
+
+ case CoordinatorStateEnum::kApplying: {
+ makeRecipientsFinishedCloning(opCtx);
+ break;
+ }
+
+ case CoordinatorStateEnum::kBlockingWrites: {
+ // Pretend that the recipients are already ready to commit.
+ coordinator->onOkayToEnterCritical();
+ break;
+ }
+
+ case CoordinatorStateEnum::kCommitting: {
+ makeRecipientsBeInStrictConsistency(opCtx);
+ break;
+ }
+
+ case CoordinatorStateEnum::kDone: {
+ makeDonorsProceedToDone(opCtx);
+ makeRecipientsProceedToDone(opCtx);
+ break;
+ }
+
+ default:
+ break;
+ }
+
+ if (state != CoordinatorStateEnum::kDone) {
+ // 'done' state is never written to storage so don't wait for it
+ stateTransitionsGuard.wait(state);
+ }
+
+ stepDown(opCtx);
+
+ ASSERT_EQ(coordinator->getCompletionFuture().getNoThrow(),
+ ErrorCodes::InterruptedDueToReplStateChange);
+
+ coordinator.reset();
+ stepUp(opCtx);
+
+ stateTransitionsGuard.unset(state);
+
+ if (state == CoordinatorStateEnum::kBlockingWrites) {
+ // We have to fake this again as the effects are not persistent.
+ coordinator = getCoordinator(opCtx, instanceId);
+ coordinator->onOkayToEnterCritical();
+ }
+
+ if (state != CoordinatorStateEnum::kDone) {
+ // 'done' state is never written to storage so don't wait for it.
+ waitUntilCommittedCoordinatorDocReach(opCtx, state);
+ }
+ }
+
+ // Join the coordinator if it has not yet been cleaned up.
+ if (auto coordinator = getCoordinatorIfExists(opCtx, instanceId)) {
+ coordinator->getCompletionFuture().get(opCtx);
+ }
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp b/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp
index e512fde17f3..01b800167e6 100644
--- a/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp
+++ b/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp
@@ -142,12 +142,15 @@ void OpObserverForTest<StateEnum, ReshardingDocument>::onUpdate(OperationContext
template class StateTransitionController<DonorStateEnum>;
template class StateTransitionController<RecipientStateEnum>;
+template class StateTransitionController<CoordinatorStateEnum>;
template class PauseDuringStateTransitions<DonorStateEnum>;
template class PauseDuringStateTransitions<RecipientStateEnum>;
+template class PauseDuringStateTransitions<CoordinatorStateEnum>;
template class OpObserverForTest<DonorStateEnum, ReshardingDonorDocument>;
template class OpObserverForTest<RecipientStateEnum, ReshardingRecipientDocument>;
+template class OpObserverForTest<CoordinatorStateEnum, ReshardingCoordinatorDocument>;
} // namespace resharding_service_test_helpers
} // namespace mongo