summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2022-08-11 19:19:51 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-09 17:17:19 +0000
commitc9e875975a43c819387095033bdabff82cc9cb96 (patch)
tree4b218e81a3c229444945c88d0f4f336868578d90 /src
parentdab0694cd327eb0f7e540de5dee97c69f84ea45d (diff)
downloadmongo-c9e875975a43c819387095033bdabff82cc9cb96.tar.gz
SERVER-68706 Global Index Cloner should wait for abort/commit from coordinator
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/global_index/global_index_cloner.idl15
-rw-r--r--src/mongo/db/s/global_index/global_index_cloning_service.cpp225
-rw-r--r--src/mongo/db/s/global_index/global_index_cloning_service.h66
-rw-r--r--src/mongo/db/s/global_index/global_index_cloning_service_test.cpp161
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp25
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service_test.cpp21
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp23
-rw-r--r--src/mongo/db/s/resharding/resharding_service_test_helpers.cpp159
-rw-r--r--src/mongo/db/s/resharding/resharding_service_test_helpers.h146
10 files changed, 405 insertions, 437 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 2a988b0879b..3565b4cd45f 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -552,7 +552,6 @@ env.Library(
target='sharding_mongod_test_fixture',
source=[
'sharding_mongod_test_fixture.cpp',
- 'resharding/resharding_service_test_helpers.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/repl/drop_pending_collection_reaper',
diff --git a/src/mongo/db/s/global_index/global_index_cloner.idl b/src/mongo/db/s/global_index/global_index_cloner.idl
index 81ab433f171..cbfdb4935d0 100644
--- a/src/mongo/db/s/global_index/global_index_cloner.idl
+++ b/src/mongo/db/s/global_index/global_index_cloner.idl
@@ -42,8 +42,19 @@ enums:
values:
kUnused: "unused"
kCloning: "cloning"
+ kReadyToCommit: "ready-to-commit"
+ # done is not a real state but only used in tests.
+ kDone: "done"
structs:
+ GlobalIndexClonerMutableState:
+ description: "Contains the mutable state for global index cloning operation."
+ strict: false
+ fields:
+ state:
+ type: GlobalIndexClonerState
+ description: "The current state of cloner."
+
GlobalIndexClonerDoc:
description: "Document containing the state and specs of global index cloning operation."
strict: false
@@ -69,6 +80,4 @@ structs:
type: timestamp
description: "The minimum timestamp to use for fetching documents from the source
collection"
- state:
- type: GlobalIndexClonerState
- description: "The current state of cloner."
+ mutableState: GlobalIndexClonerMutableState
diff --git a/src/mongo/db/s/global_index/global_index_cloning_service.cpp b/src/mongo/db/s/global_index/global_index_cloning_service.cpp
index 0002d7ab032..fd3648bd6bc 100644
--- a/src/mongo/db/s/global_index/global_index_cloning_service.cpp
+++ b/src/mongo/db/s/global_index/global_index_cloning_service.cpp
@@ -36,6 +36,9 @@
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/ops/delete.h"
#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/global_index/global_index_cloning_external_state.h"
#include "mongo/db/s/global_index/global_index_server_parameters_gen.h"
#include "mongo/db/s/global_index/global_index_util.h"
@@ -88,6 +91,12 @@ GlobalIndexCloningService::CloningStateMachine::CloningStateMachine(
GlobalIndexClonerDoc clonerDoc)
: _serviceContext(serviceContext),
_cloningService(cloningService),
+ _indexCollectionUUID(clonerDoc.getIndexCollectionUUID()),
+ _sourceNss(clonerDoc.getNss()),
+ _sourceCollUUID(clonerDoc.getCollectionUUID()),
+ _indexName(clonerDoc.getIndexName()),
+ _indexSpec(clonerDoc.getIndexSpec().getOwned()),
+ _minFetchTimestamp(clonerDoc.getMinFetchTimestamp()),
_execForCancelableOpCtx(std::make_shared<ThreadPool>([] {
ThreadPool::Options options;
options.poolName = "GlobalIndexCloningServiceCancelableOpCtxPool";
@@ -95,7 +104,7 @@ GlobalIndexCloningService::CloningStateMachine::CloningStateMachine(
options.maxThreads = 1;
return options;
}())),
- _clonerState(std::move(clonerDoc)),
+ _mutableState(clonerDoc.getMutableState()),
_fetcherFactory(std::move(fetcherFactory)),
_externalState(std::move(externalState)) {}
@@ -108,31 +117,34 @@ SemiFuture<void> GlobalIndexCloningService::CloningStateMachine::run(
_init(executor);
- return ExecutorFuture(**executor)
- .then([this, executor, abortToken] { return _persistStateDocument(executor, abortToken); })
- .then([this, executor, abortToken] { return _runUntilDoneCloning(executor, abortToken); })
- // TODO: SERVER-68706 wait from coordinator to commit or abort.
- .onCompletion([this, stepdownToken](const Status& status) {
- _retryingCancelableOpCtxFactory.emplace(stepdownToken, _execForCancelableOpCtx);
- return status;
- })
- .then([this, executor, stepdownToken] { return _cleanup(executor, stepdownToken); })
- .thenRunOn(_cloningService->getInstanceCleanupExecutor())
- .onError([](const Status& status) {
- LOGV2(
- 6755903, "Global index cloner encountered an error", "error"_attr = redact(status));
- return status;
- })
- .onCompletion([this, self = shared_from_this()](const Status& status) {
- if (!_completionPromise.getFuture().isReady()) {
- if (status.isOK()) {
- _completionPromise.emplaceValue();
- } else {
- _completionPromise.setError(status);
- }
- }
- })
- .semi();
+ _readyToCommitPromise.setFrom(ExecutorFuture(**executor)
+ .then([this, executor, abortToken] {
+ return _persistStateDocument(executor, abortToken);
+ })
+ .then([this, executor, abortToken] {
+ return _runUntilDoneCloning(executor, abortToken);
+ })
+ .unsafeToInlineFuture());
+
+ _completionPromise.setFrom(
+ _readyToCommitPromise.getFuture()
+ .thenRunOn(**executor)
+ .then([this, stepdownToken] {
+ _retryingCancelableOpCtxFactory.emplace(stepdownToken, _execForCancelableOpCtx);
+ })
+ .then([this, executor, stepdownToken] {
+ return future_util::withCancellation(_waitForCleanupPromise.getFuture(),
+ stepdownToken);
+ })
+ .then([this, executor, stepdownToken] { return _cleanup(executor, stepdownToken); })
+ .unsafeToInlineFuture()
+ .tapError([](const Status& status) {
+ LOGV2(6755903,
+ "Global index cloner encountered an error",
+ "error"_attr = redact(status));
+ }));
+
+ return _completionPromise.getFuture().semi();
}
void GlobalIndexCloningService::CloningStateMachine::interrupt(Status status) {}
@@ -152,7 +164,11 @@ ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_cleanup(
return ExecutorFuture(**executor)
.then([this, executor, stepdownToken, &cancelableFactory] {
auto opCtx = cancelableFactory.makeOperationContext(&cc());
- _removeStateDocument(opCtx.get());
+ PersistentTaskStore<GlobalIndexClonerDoc> store(
+ _cloningService->getStateDocumentsNS());
+ store.remove(opCtx.get(),
+ BSON(GlobalIndexClonerDoc::kIndexCollectionUUIDFieldName
+ << _indexCollectionUUID));
});
})
.onTransientError([](const auto& status) {})
@@ -163,33 +179,28 @@ ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_cleanup(
void GlobalIndexCloningService::CloningStateMachine::_init(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- _inserter = std::make_unique<GlobalIndexInserter>(_clonerState.getNss(),
- _clonerState.getIndexName(),
- _clonerState.getIndexCollectionUUID(),
- **executor);
+ _inserter = std::make_unique<GlobalIndexInserter>(
+ _sourceNss, _indexName, _indexCollectionUUID, **executor);
auto client = _serviceContext->makeClient("globalIndexClonerServiceInit");
AlternativeClientRegion clientRegion(client);
auto opCtx = _serviceContext->makeOperationContext(Client::getCurrent());
- auto routingInfo =
- _externalState->getShardedCollectionRoutingInfo(opCtx.get(), _clonerState.getNss());
+ auto routingInfo = _externalState->getShardedCollectionRoutingInfo(opCtx.get(), _sourceNss);
uassert(6755901,
- str::stream() << "Cannot create global index on unsharded ns "
- << _clonerState.getNss().ns(),
+ str::stream() << "Cannot create global index on unsharded ns " << _sourceNss.ns(),
routingInfo.isSharded());
auto myShardId = _externalState->myShardId(_serviceContext);
- auto indexKeyPattern =
- _clonerState.getIndexSpec().getObjectField(IndexDescriptor::kKeyPatternFieldName);
- _fetcher = _fetcherFactory->make(_clonerState.getNss(),
- _clonerState.getCollectionUUID(),
- _clonerState.getIndexCollectionUUID(),
+ auto indexKeyPattern = _indexSpec.getObjectField(IndexDescriptor::kKeyPatternFieldName);
+ _fetcher = _fetcherFactory->make(_sourceNss,
+ _sourceCollUUID,
+ _indexCollectionUUID,
myShardId,
- _clonerState.getMinFetchTimestamp(),
+ _minFetchTimestamp,
routingInfo.getShardKeyPattern().getKeyPattern(),
indexKeyPattern.getOwned());
}
@@ -205,14 +216,17 @@ ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_runUntilDo
})
.then([this, executor, cancelToken, cancelableFactory] {
return _clone(executor, cancelToken, cancelableFactory);
+ })
+ .then([this, executor, cancelToken, cancelableFactory] {
+ return _transitionToReadyToCommit(executor, cancelToken);
+ })
+ .then([this, cancelToken](const repl::OpTime& readyToCommitOpTime) {
+ return WaitForMajorityService::get(_serviceContext)
+ .waitUntilMajority(readyToCommitOpTime, cancelToken);
});
})
- .onTransientError([](const Status& status) {
-
- })
- .onUnrecoverableError([](const Status& status) {
-
- })
+ .onTransientError([](const Status& status) {})
+ .onUnrecoverableError([](const Status& status) {})
.until<Status>([](const Status& status) { return status.isOK(); })
.on(**executor, cancelToken);
}
@@ -230,11 +244,12 @@ void GlobalIndexCloningService::CloningStateMachine::checkIfOptionsConflict(
const BSONObj& stateDoc) const {
auto newCloning =
GlobalIndexClonerDoc::parse(IDLParserContext("globalIndexCloningCheckConflict"), stateDoc);
+
uassert(6755900,
- str::stream() << "new global index " << stateDoc << " is incompatible with ongoing "
- << _clonerState.toBSON(),
- newCloning.getNss() == _clonerState.getNss() &&
- newCloning.getCollectionUUID() == _clonerState.getCollectionUUID());
+ str::stream() << "New global index " << stateDoc
+ << " is incompatible with ongoing global index build in namespace: "
+ << _sourceNss << ", uuid: " << _sourceCollUUID,
+ newCloning.getNss() == _sourceNss && newCloning.getCollectionUUID() == _sourceCollUUID);
}
CancellationToken GlobalIndexCloningService::CloningStateMachine::_initAbortSource(
@@ -251,7 +266,7 @@ CancellationToken GlobalIndexCloningService::CloningStateMachine::_initAbortSour
ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_persistStateDocument(
std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& cancelToken) {
- if (_clonerState.getState() > GlobalIndexClonerStateEnum::kUnused) {
+ if (_getState() > GlobalIndexClonerStateEnum::kUnused) {
return ExecutorFuture<void>(**executor, Status::OK());
}
@@ -259,14 +274,17 @@ ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_persistSta
->withAutomaticRetry([this, executor](auto& cancelableFactory) {
auto opCtx = cancelableFactory.makeOperationContext(Client::getCurrent());
- GlobalIndexClonerDoc newDoc(_clonerState);
- newDoc.setState(GlobalIndexClonerStateEnum::kCloning);
+ auto newDoc = _makeClonerDoc();
+ newDoc.getMutableState().setState(GlobalIndexClonerStateEnum::kCloning);
PersistentTaskStore<GlobalIndexClonerDoc> store(_cloningService->getStateDocumentsNS());
store.add(opCtx.get(), newDoc, kNoWaitWriteConcern);
- std::swap(_clonerState, newDoc);
-
LOGV2(6755904, "Persisted global index state document");
+
+ {
+ stdx::unique_lock lk(_mutex);
+ _mutableState.setState(GlobalIndexClonerStateEnum::kCloning);
+ }
})
.onTransientError([](const Status& status) {})
.onUnrecoverableError([](const Status& status) {})
@@ -274,33 +292,43 @@ ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_persistSta
.on(**executor, cancelToken);
}
-void GlobalIndexCloningService::CloningStateMachine::_removeStateDocument(OperationContext* opCtx) {
- const auto& nss = _cloningService->getStateDocumentsNS();
- writeConflictRetry(
- opCtx, "GlobalIndexCloningStateMachine::removeStateDocument", nss.toString(), [&] {
- AutoGetCollection coll(opCtx, nss, MODE_IX);
+ExecutorFuture<repl::OpTime>
+GlobalIndexCloningService::CloningStateMachine::_transitionToReadyToCommit(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& cancelToken) {
+ if (_getState() > GlobalIndexClonerStateEnum::kReadyToCommit) {
+ // If we recovered from disk, then primary only service would have already waited for
+ // majority, so just return an empty opTime.
+ return ExecutorFuture<repl::OpTime>(**executor, repl::OpTime());
+ }
- if (!coll) {
- return;
- }
+ return _retryingCancelableOpCtxFactory
+ ->withAutomaticRetry([this, executor](auto& cancelableFactory) {
+ auto opCtx = cancelableFactory.makeOperationContext(Client::getCurrent());
- WriteUnitOfWork wuow(opCtx);
+ PersistentTaskStore<GlobalIndexClonerDoc> store(_cloningService->getStateDocumentsNS());
+
+ auto mutableState = _getMutableState();
+ mutableState.setState(GlobalIndexClonerStateEnum::kReadyToCommit);
- // Set the promise when the delete commits, this is to ensure that any interruption that
- // happens later won't result in setting an error on the completion promise.
- opCtx->recoveryUnit()->onCommit([this](boost::optional<Timestamp> unusedCommitTime) {
- _completionPromise.emplaceValue();
- });
+ BSONObj update(BSON("$set" << BSON(GlobalIndexClonerDoc::kMutableStateFieldName
+ << mutableState.toBSON())));
+ store.update(
+ opCtx.get(),
+ BSON(GlobalIndexClonerDoc::kIndexCollectionUUIDFieldName << _indexCollectionUUID),
+ update);
- deleteObjects(opCtx,
- *coll,
- nss,
- BSON(GlobalIndexClonerDoc::kIndexCollectionUUIDFieldName
- << _clonerState.getIndexCollectionUUID()),
- true /* justOne */);
+ {
+ stdx::unique_lock lk(_mutex);
+ _mutableState = mutableState;
+ }
- wuow.commit();
- });
+ return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ })
+ .onTransientError([](const Status& status) {})
+ .onUnrecoverableError([](const Status& status) {})
+ .until<StatusWith<repl::OpTime>>(
+ [](const StatusWith<repl::OpTime>& status) { return status.isOK(); })
+ .on(**executor, cancelToken);
}
void GlobalIndexCloningService::CloningStateMachine::_initializeCollections(
@@ -308,8 +336,7 @@ void GlobalIndexCloningService::CloningStateMachine::_initializeCollections(
auto cancelableOpCtx = cancelableOpCtxFactory.makeOperationContext(Client::getCurrent());
auto opCtx = cancelableOpCtx.get();
- resharding::data_copy::ensureCollectionExists(
- opCtx, skipIdNss(_clonerState.getNss(), _clonerState.getIndexName()), {});
+ resharding::data_copy::ensureCollectionExists(opCtx, skipIdNss(_sourceNss, _indexName), {});
}
ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_clone(
@@ -389,5 +416,43 @@ void GlobalIndexCloningService::CloningStateMachine::_ensureCollection(Operation
});
}
+void GlobalIndexCloningService::CloningStateMachine::cleanup() {
+ stdx::unique_lock lk(_mutex);
+
+ if (!_waitForCleanupPromise.getFuture().isReady()) {
+ _waitForCleanupPromise.emplaceValue();
+ }
+
+ // TODO: SERVER-67563 Implement abort
+}
+
+GlobalIndexClonerStateEnum GlobalIndexCloningService::CloningStateMachine::_getState() const {
+ stdx::unique_lock lk(_mutex);
+ return _mutableState.getState();
+}
+
+GlobalIndexClonerMutableState GlobalIndexCloningService::CloningStateMachine::_getMutableState()
+ const {
+ stdx::unique_lock lk(_mutex);
+ return _mutableState;
+}
+
+GlobalIndexClonerDoc GlobalIndexCloningService::CloningStateMachine::_makeClonerDoc() const {
+ GlobalIndexClonerDoc clonerDoc;
+ clonerDoc.setIndexCollectionUUID(_indexCollectionUUID);
+ clonerDoc.setNss(_sourceNss);
+ clonerDoc.setCollectionUUID(_sourceCollUUID);
+ clonerDoc.setIndexName(_indexName);
+ clonerDoc.setIndexSpec(_indexSpec);
+ clonerDoc.setMinFetchTimestamp(_minFetchTimestamp);
+
+ {
+ stdx::unique_lock lk(_mutex);
+ clonerDoc.setMutableState(_mutableState);
+ }
+
+ return clonerDoc;
+}
+
} // namespace global_index
} // namespace mongo
diff --git a/src/mongo/db/s/global_index/global_index_cloning_service.h b/src/mongo/db/s/global_index/global_index_cloning_service.h
index 6342938e529..67d3ecdc456 100644
--- a/src/mongo/db/s/global_index/global_index_cloning_service.h
+++ b/src/mongo/db/s/global_index/global_index_cloning_service.h
@@ -94,6 +94,10 @@ public:
void abort();
+ SharedSemiFuture<void> getReadyToCommitFuture() const {
+ return _readyToCommitPromise.getFuture();
+ }
+
/**
* Returns a Future that will be resolved when all work associated with this Instance is done
* making forward progress.
@@ -111,6 +115,12 @@ public:
*/
void abort(bool isUserCancelled);
+ /**
+ * Tells this cloner to perform cleanup. This can cause this cloner to abort if it is still
+ * running.
+ */
+ void cleanup();
+
void checkIfOptionsConflict(const BSONObj& stateDoc) const final;
private:
@@ -137,10 +147,9 @@ private:
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& cancelToken);
- /**
- * Deletes the state document from storage.
- */
- void _removeStateDocument(OperationContext* opCtx);
+ ExecutorFuture<repl::OpTime> _transitionToReadyToCommit(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& cancelToken);
/**
* Performs the entire cloning process.
@@ -181,39 +190,60 @@ private:
*/
void _ensureCollection(OperationContext* opCtx, const NamespaceString& nss);
- ServiceContext* const _serviceContext;
+ GlobalIndexClonerStateEnum _getState() const;
+ GlobalIndexClonerMutableState _getMutableState() const;
+ GlobalIndexClonerDoc _makeClonerDoc() const;
+
+ /**********************************************************************************
+ * Thread safety legend
+ *
+ * (TS) - Thread safe. Object can be accessed concurrently without additional mutex.
+ * (NC) - No concurrent access pattern. So can be used without mutex.
+ * (M) - Mutex required.
+ */
+
+ ServiceContext* const _serviceContext; // (TS)
// The primary-only service instance corresponding to the cloner instance. Not owned.
- const GlobalIndexCloningService* const _cloningService;
+ const GlobalIndexCloningService* const _cloningService; // (TS)
+
+ const UUID _indexCollectionUUID;
+ const NamespaceString _sourceNss;
+ const UUID _sourceCollUUID;
+ const std::string _indexName;
+ const BSONObj _indexSpec;
+ const Timestamp _minFetchTimestamp;
// A separate executor different from the one supplied by the primary only service is needed
// because the one from POS can be shut down during step down. This will ensure that the
// operation context created from the cancelableOpCtxFactory can be interrupted when the cancel
// token is aborted during step down.
- const std::shared_ptr<ThreadPool> _execForCancelableOpCtx;
+ const std::shared_ptr<ThreadPool> _execForCancelableOpCtx; // (TS)
boost::optional<resharding::RetryingCancelableOperationContextFactory>
- _retryingCancelableOpCtxFactory;
+ _retryingCancelableOpCtxFactory; // (TS)
- Mutex _mutex = MONGO_MAKE_LATCH("GlobalIndexCloningStateMachine::_mutex");
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("GlobalIndexCloningStateMachine::_mutex");
- GlobalIndexClonerDoc _clonerState;
+ GlobalIndexClonerMutableState _mutableState; // (NC)
// Canceled when there is an unrecoverable error or stepdown.
- boost::optional<CancellationSource> _abortSource;
+ boost::optional<CancellationSource> _abortSource; // (M)
- std::unique_ptr<GlobalIndexClonerFetcherFactoryInterface> _fetcherFactory;
- std::unique_ptr<GlobalIndexClonerFetcherInterface> _fetcher;
- std::unique_ptr<GlobalIndexInserter> _inserter;
+ std::unique_ptr<GlobalIndexClonerFetcherFactoryInterface> _fetcherFactory; // (TS)
+ std::unique_ptr<GlobalIndexClonerFetcherInterface> _fetcher; // (NC)
+ std::unique_ptr<GlobalIndexInserter> _inserter; // (NC)
// Keeps track if there is still a posibility that we still have documents that needs to be
// fetched from the source collection.
- bool _hasMoreToFetch{true};
+ bool _hasMoreToFetch{true}; // (NC)
- std::queue<GlobalIndexClonerFetcher::FetchedEntry> _fetchedDocs;
+ std::queue<GlobalIndexClonerFetcher::FetchedEntry> _fetchedDocs; // (NC)
- SharedPromise<void> _completionPromise;
- const std::unique_ptr<CloningExternalState> _externalState;
+ SharedPromise<void> _completionPromise; // (TS)
+ SharedPromise<void> _readyToCommitPromise; // (TS)
+ SharedPromise<void> _waitForCleanupPromise; // (M)
+ const std::unique_ptr<CloningExternalState> _externalState; // (TS)
};
} // namespace global_index
diff --git a/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp b/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp
index 187271df7cf..ea56451168c 100644
--- a/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp
+++ b/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/s/global_index/global_index_cloning_external_state.h"
#include "mongo/db/s/global_index/global_index_cloning_service.h"
#include "mongo/db/s/global_index/global_index_util.h"
+#include "mongo/db/s/resharding/resharding_service_test_helpers.h"
#include "mongo/db/session/logical_session_cache_noop.h"
#include "mongo/db/session/session_catalog_mongod.h"
#include "mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h"
@@ -51,6 +52,14 @@ namespace mongo {
namespace global_index {
namespace {
+using StateTransitionController =
+ resharding_service_test_helpers::StateTransitionController<GlobalIndexClonerStateEnum>;
+using OpObserverForTest =
+ resharding_service_test_helpers::StateTransitionControllerOpObserver<GlobalIndexClonerStateEnum,
+ GlobalIndexClonerDoc>;
+using PauseDuringStateTransitions =
+ resharding_service_test_helpers::PauseDuringStateTransitions<GlobalIndexClonerStateEnum>;
+
const ShardId kRecipientShardId{"myShardId"};
const NamespaceString kSourceNss{"sourcedb", "sourcecollection"};
constexpr auto kSourceShardKey = "key"_sd;
@@ -168,72 +177,6 @@ private:
MockGlobalIndexClonerFetcher* _mockFetcher;
};
-class Blocker {
-public:
- ~Blocker() {
- stdx::unique_lock lk(_mutex);
- _shouldBlock = false;
- _cvBlocked.notify_all();
- }
-
- void blockIfActivated(OperationContext* opCtx) {
- stdx::unique_lock lk(_mutex);
- _blockedOnce = true;
- opCtx->waitForConditionOrInterrupt(_cvBlocked, lk, [this] { return !_shouldBlock; });
- }
-
- void waitUntilBlockedOccurred(OperationContext* opCtx) {
- stdx::unique_lock lk(_mutex);
- opCtx->waitForConditionOrInterrupt(_cvBlocked, lk, [this] { return _blockedOnce; });
- }
-
- void block() {
- stdx::unique_lock lk(_mutex);
- _shouldBlock = true;
- }
-
- void unblock() {
- stdx::unique_lock lk(_mutex);
- _shouldBlock = false;
- }
-
-private:
- Mutex _mutex = MONGO_MAKE_LATCH("GlobalIndexCloningServiceTestBlocker::_mutex");
- stdx::condition_variable _cvBlocked;
- bool _shouldBlock{false};
- bool _blockedOnce{false};
-};
-
-class OpObserverForTest : public OpObserverNoop {
-public:
- OpObserverForTest(Blocker* insertBlocker, Blocker* deleteBlocker)
- : _insertBlocker(insertBlocker), _deleteBlocker(deleteBlocker) {}
-
- void onInserts(OperationContext* opCtx,
- const CollectionPtr& coll,
- std::vector<InsertStatement>::const_iterator begin,
- std::vector<InsertStatement>::const_iterator end,
- bool fromMigrate) override {
- if (NamespaceString::kGlobalIndexClonerNamespace == coll->ns()) {
- _insertBlocker->blockIfActivated(opCtx);
- }
- }
-
- void onDelete(OperationContext* opCtx,
- const NamespaceString& nss,
- const UUID& uuid,
- StmtId stmtId,
- const OplogDeleteEntryArgs& args) override {
- if (NamespaceString::kGlobalIndexClonerNamespace == nss) {
- _deleteBlocker->blockIfActivated(opCtx);
- }
- }
-
-private:
- Blocker* _insertBlocker;
- Blocker* _deleteBlocker;
-};
-
GlobalIndexCloningService::InstanceID extractInstanceId(const GlobalIndexClonerDoc& doc) {
return BSON("_id" << doc.getIndexCollectionUUID());
}
@@ -257,8 +200,13 @@ public:
// so we should manually instantiate it to ensure it exists in our tests.
ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn());
+ _stateTransitionController = std::make_shared<StateTransitionController>();
_opObserverRegistry->addObserver(
- std::make_unique<OpObserverForTest>(&_stateDocInsertBlocker, &_stateDocDeleteBlocker));
+ std::make_unique<OpObserverForTest>(_stateTransitionController,
+ NamespaceString::kGlobalIndexClonerNamespace,
+ [](const GlobalIndexClonerDoc& stateDoc) {
+ return stateDoc.getMutableState().getState();
+ }));
// Create config.transactions collection
auto opCtx = serviceContext->makeOperationContext(Client::getCurrent());
@@ -331,12 +279,8 @@ public:
return false;
}
- Blocker* getStateDocInsertBlocker() {
- return &_stateDocInsertBlocker;
- }
-
- Blocker* getStateDocDeleteBlocker() {
- return &_stateDocDeleteBlocker;
+ StateTransitionController* stateTransitionController() {
+ return _stateTransitionController.get();
}
void replaceFetcherResultList(
@@ -377,8 +321,7 @@ private:
const BSONObj _indexSpec{BSON("key" << BSON(_indexKey << 1) << "unique" << true)};
ReadWriteConcernDefaultsLookupMock _lookupMock;
- Blocker _stateDocInsertBlocker;
- Blocker _stateDocDeleteBlocker;
+ std::shared_ptr<StateTransitionController> _stateTransitionController;
MockGlobalIndexClonerFetcher _mockFetcher;
MockGlobalIndexClonerFetcher _fetcherCopyForVerification;
@@ -391,6 +334,8 @@ TEST_F(GlobalIndexClonerServiceTest, CloneInsertsToGlobalIndexCollection) {
auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
auto future = cloner->getCompletionFuture();
+ cloner->getReadyToCommitFuture().get();
+ cloner->cleanup();
future.get();
ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName())));
@@ -402,39 +347,57 @@ TEST_F(GlobalIndexClonerServiceTest, ShouldBeSafeToRetryOnStepDown) {
auto opCtx = makeOperationContext();
auto rawOpCtx = opCtx.get();
- auto stateDocInsertBlocker = getStateDocInsertBlocker();
- stateDocInsertBlocker->block();
- auto stateDocDeleteBlocker = getStateDocDeleteBlocker();
- stateDocDeleteBlocker->block();
+ const std::vector<GlobalIndexClonerStateEnum> states{GlobalIndexClonerStateEnum::kCloning,
+ GlobalIndexClonerStateEnum::kReadyToCommit,
+ GlobalIndexClonerStateEnum::kDone};
+ PauseDuringStateTransitions stateTransitionsGuard{stateTransitionController(), states};
+
+ auto prevState = GlobalIndexClonerStateEnum::kUnused;
+ for (const auto& nextState : states) {
+ LOGV2(6870601,
+ "Testing next state",
+ "state"_attr = GlobalIndexClonerState_serializer(nextState));
+
+ auto cloner = ([&] {
+ if (nextState == GlobalIndexClonerStateEnum::kCloning ||
+ nextState == GlobalIndexClonerStateEnum::kReadyToCommit) {
+ return GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
+ }
- {
- auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
- stateDocInsertBlocker->waitUntilBlockedOccurred(rawOpCtx);
- stepDown();
+ return *GlobalIndexStateMachine::lookup(rawOpCtx, _service, extractInstanceId(doc));
+ })();
- ASSERT_THROWS(cloner->getCompletionFuture().get(), DBException);
- }
+ if (prevState != GlobalIndexClonerStateEnum::kUnused) {
+ stateTransitionsGuard.unset(prevState);
+ }
- stepUp(rawOpCtx);
+ auto readyToCommitFuture = cloner->getReadyToCommitFuture();
+
+ if (nextState == GlobalIndexClonerStateEnum::kDone) {
+ readyToCommitFuture.get();
+ cloner->cleanup();
+ }
- {
- auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
- stateDocInsertBlocker->unblock();
- stateDocDeleteBlocker->waitUntilBlockedOccurred(rawOpCtx);
+ stateTransitionsGuard.wait(nextState);
stepDown();
+ if (nextState != GlobalIndexClonerStateEnum::kDone) {
+ ASSERT_THROWS(readyToCommitFuture.get(), DBException);
+ }
+
+ // Note: can either throw InterruptDueToRepl or ShutdownInProgress (from executor).
ASSERT_THROWS(cloner->getCompletionFuture().get(), DBException);
- }
- stepUp(rawOpCtx);
+ stepUp(rawOpCtx);
- // It is possible for the primary only service to run to completion and no longer exists.
- {
- auto cloner = GlobalIndexStateMachine::lookup(rawOpCtx, _service, extractInstanceId(doc));
- stateDocDeleteBlocker->unblock();
- (*cloner)->getCompletionFuture().get();
+ prevState = nextState;
}
+ auto cloner = *GlobalIndexStateMachine::lookup(rawOpCtx, _service, extractInstanceId(doc));
+ stateTransitionsGuard.unset(GlobalIndexClonerStateEnum::kDone);
+ cloner->cleanup();
+ cloner->getCompletionFuture().get();
+
checkIndexCollection(rawOpCtx);
}
@@ -458,6 +421,8 @@ TEST_F(GlobalIndexClonerServiceTest, ShouldBeAbleToConsumeMultipleBatchesWorthof
auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
auto future = cloner->getCompletionFuture();
+ cloner->getReadyToCommitFuture().get();
+ cloner->cleanup();
future.get();
ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName())));
@@ -473,6 +438,8 @@ TEST_F(GlobalIndexClonerServiceTest, ShouldWorkWithEmptyCollection) {
auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
auto future = cloner->getCompletionFuture();
+ cloner->getReadyToCommitFuture().get();
+ cloner->cleanup();
future.get();
ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName())));
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
index a906c8f4f93..0c89a9aca53 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
@@ -64,23 +64,11 @@ namespace {
using CoordinatorStateTransitionController =
resharding_service_test_helpers::StateTransitionController<CoordinatorStateEnum>;
-using OpObserverForTest =
- resharding_service_test_helpers::OpObserverForTest<CoordinatorStateEnum,
- ReshardingCoordinatorDocument>;
+using OpObserverForTest = resharding_service_test_helpers::
+ StateTransitionControllerOpObserver<CoordinatorStateEnum, ReshardingCoordinatorDocument>;
using PauseDuringStateTransitions =
resharding_service_test_helpers::PauseDuringStateTransitions<CoordinatorStateEnum>;
-class CoordinatorOpObserverForTest : public OpObserverForTest {
-public:
- CoordinatorOpObserverForTest(std::shared_ptr<CoordinatorStateTransitionController> controller)
- : OpObserverForTest(std::move(controller),
- NamespaceString::kConfigReshardingOperationsNamespace) {}
-
- CoordinatorStateEnum getState(const ReshardingCoordinatorDocument& coordinatorDoc) override {
- return coordinatorDoc.getState();
- }
-};
-
class ExternalStateForTest : public ReshardingCoordinatorExternalState {
ParticipantShardsAndChunks calculateParticipantShardsAndChunks(
OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) override {
@@ -169,11 +157,10 @@ public:
invariant(_opObserverRegistry);
_opObserverRegistry->addObserver(std::make_unique<ReshardingOpObserver>());
- _opObserverRegistry->addObserver(
- std::make_unique<CoordinatorOpObserverForTest>(_controller));
- _opObserverRegistry->addObserver(
- std::make_unique<repl::PrimaryOnlyServiceOpObserver>(getServiceContext()));
-
+ _opObserverRegistry->addObserver(std::make_unique<OpObserverForTest>(
+ _controller,
+ NamespaceString::kConfigReshardingOperationsNamespace,
+ [](const ReshardingCoordinatorDocument& stateDoc) { return stateDoc.getState(); }));
_registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext());
auto service = makeService(getServiceContext());
auto serviceName = service->getServiceName();
diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
index 7ce393e5e9d..53c0044ae75 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
@@ -65,7 +65,8 @@ namespace {
using DonorStateTransitionController =
resharding_service_test_helpers::StateTransitionController<DonorStateEnum>;
using OpObserverForTest =
- resharding_service_test_helpers::OpObserverForTest<DonorStateEnum, ReshardingDonorDocument>;
+ resharding_service_test_helpers::StateTransitionControllerOpObserver<DonorStateEnum,
+ ReshardingDonorDocument>;
using PauseDuringStateTransitions =
resharding_service_test_helpers::PauseDuringStateTransitions<DonorStateEnum>;
@@ -90,17 +91,6 @@ public:
const NamespaceString& tempReshardingNss) override {}
};
-class DonorOpObserverForTest : public OpObserverForTest {
-public:
- DonorOpObserverForTest(std::shared_ptr<DonorStateTransitionController> controller)
- : OpObserverForTest(std::move(controller),
- NamespaceString::kDonorReshardingOperationsNamespace) {}
-
- DonorStateEnum getState(const ReshardingDonorDocument& donorDoc) override {
- return donorDoc.getMutableState().getState();
- }
-};
-
class ReshardingDonorServiceForTest : public ReshardingDonorService {
public:
explicit ReshardingDonorServiceForTest(ServiceContext* serviceContext)
@@ -137,7 +127,12 @@ public:
repl::StorageInterface::set(serviceContext, std::move(storageMock));
_controller = std::make_shared<DonorStateTransitionController>();
- _opObserverRegistry->addObserver(std::make_unique<DonorOpObserverForTest>(_controller));
+ _opObserverRegistry->addObserver(std::make_unique<OpObserverForTest>(
+ _controller,
+ NamespaceString::kDonorReshardingOperationsNamespace,
+ [](const ReshardingDonorDocument& donorDoc) {
+ return donorDoc.getMutableState().getState();
+ }));
}
DonorStateTransitionController* controller() {
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 32eeadff416..1f21ea9342b 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -63,9 +63,8 @@ using RecipientStateTransitionController =
resharding_service_test_helpers::StateTransitionController<RecipientStateEnum>;
using PauseDuringStateTransitions =
resharding_service_test_helpers::PauseDuringStateTransitions<RecipientStateEnum>;
-using OpObserverForTest =
- resharding_service_test_helpers::OpObserverForTest<RecipientStateEnum,
- ReshardingRecipientDocument>;
+using OpObserverForTest = resharding_service_test_helpers::
+ StateTransitionControllerOpObserver<RecipientStateEnum, ReshardingRecipientDocument>;
const ShardId recipientShardId{"myShardId"};
class ExternalStateForTest : public ReshardingRecipientService::RecipientStateMachineExternalState {
@@ -157,17 +156,6 @@ private:
const ShardId _someDonorId{"myDonorId"};
};
-class RecipientOpObserverForTest : public OpObserverForTest {
-public:
- RecipientOpObserverForTest(std::shared_ptr<RecipientStateTransitionController> controller)
- : OpObserverForTest(std::move(controller),
- NamespaceString::kRecipientReshardingOperationsNamespace) {}
-
- RecipientStateEnum getState(const ReshardingRecipientDocument& recipientDoc) override {
- return recipientDoc.getMutableState().getState();
- }
-};
-
class DataReplicationForTest : public ReshardingDataReplicationInterface {
public:
SemiFuture<void> runUntilStrictlyConsistent(
@@ -235,7 +223,12 @@ public:
repl::StorageInterface::set(serviceContext, std::move(storageMock));
_controller = std::make_shared<RecipientStateTransitionController>();
- _opObserverRegistry->addObserver(std::make_unique<RecipientOpObserverForTest>(_controller));
+ _opObserverRegistry->addObserver(std::make_unique<OpObserverForTest>(
+ _controller,
+ NamespaceString::kRecipientReshardingOperationsNamespace,
+ [](const ReshardingRecipientDocument& stateDoc) {
+ return stateDoc.getMutableState().getState();
+ }));
}
RecipientStateTransitionController* controller() {
diff --git a/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp b/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp
deleted file mode 100644
index 2fae639c187..00000000000
--- a/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/s/resharding/resharding_service_test_helpers.h"
-
-#include "mongo/db/s/resharding/resharding_coordinator_service.h"
-#include "mongo/db/s/resharding/resharding_donor_service.h"
-#include "mongo/db/s/resharding/resharding_recipient_service.h"
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
-
-
-namespace mongo {
-namespace resharding_service_test_helpers {
-
-// -----------------------------------------------
-// StateTransitionController
-// -----------------------------------------------
-
-template <class StateEnum>
-void StateTransitionController<StateEnum>::waitUntilStateIsReached(StateEnum state) {
- stdx::unique_lock lk(_mutex);
- _waitUntilUnpausedCond.wait(lk, [this, state] { return _state == state; });
-}
-
-template <class StateEnum>
-void StateTransitionController<StateEnum>::_setPauseDuringTransition(StateEnum state) {
- stdx::lock_guard lk(_mutex);
- _pauseDuringTransition.insert(state);
-}
-
-template <class StateEnum>
-void StateTransitionController<StateEnum>::_unsetPauseDuringTransition(StateEnum state) {
- stdx::lock_guard lk(_mutex);
- _pauseDuringTransition.erase(state);
- _pauseDuringTransitionCond.notify_all();
-}
-
-template <class StateEnum>
-void StateTransitionController<StateEnum>::_notifyNewStateAndWaitUntilUnpaused(
- OperationContext* opCtx, StateEnum newState) {
- stdx::unique_lock lk(_mutex);
- ScopeGuard guard([this, prevState = _state] { _state = prevState; });
- _state = newState;
- _waitUntilUnpausedCond.notify_all();
- opCtx->waitForConditionOrInterrupt(_pauseDuringTransitionCond, lk, [this, newState] {
- return _pauseDuringTransition.count(newState) == 0;
- });
- guard.dismiss();
-}
-
-template <class StateEnum>
-void StateTransitionController<StateEnum>::_resetReachedState() {
- stdx::lock_guard lk(_mutex);
- _state = StateEnum::kUnused;
-}
-
-// -----------------------------------------------
-// PauseDuringStateTransitions
-// -----------------------------------------------
-
-template <class StateEnum>
-PauseDuringStateTransitions<StateEnum>::PauseDuringStateTransitions(
- StateTransitionController<StateEnum>* controller, StateEnum state)
- : PauseDuringStateTransitions<StateEnum>(controller, std::vector<StateEnum>{state}) {}
-
-template <class StateEnum>
-PauseDuringStateTransitions<StateEnum>::PauseDuringStateTransitions(
- StateTransitionController<StateEnum>* controller, std::vector<StateEnum> states)
- : _controller{controller}, _states{std::move(states)} {
- _controller->_resetReachedState();
- for (auto state : _states) {
- _controller->_setPauseDuringTransition(state);
- }
-}
-
-template <class StateEnum>
-PauseDuringStateTransitions<StateEnum>::~PauseDuringStateTransitions() {
- for (auto state : _states) {
- _controller->_unsetPauseDuringTransition(state);
- }
-}
-
-template <class StateEnum>
-void PauseDuringStateTransitions<StateEnum>::wait(StateEnum state) {
- _controller->waitUntilStateIsReached(state);
-}
-
-template <class StateEnum>
-void PauseDuringStateTransitions<StateEnum>::unset(StateEnum state) {
- _controller->_unsetPauseDuringTransition(state);
-}
-
-// -----------------------------------------------
-// OpObserverForTest
-// -----------------------------------------------
-
-template <class StateEnum, class ReshardingDocument>
-OpObserverForTest<StateEnum, ReshardingDocument>::OpObserverForTest(
- std::shared_ptr<StateTransitionController<StateEnum>> controller,
- NamespaceString reshardingDocumentNss)
- : _controller{std::move(controller)},
- _reshardingDocumentNss{std::move(reshardingDocumentNss)} {}
-
-template <class StateEnum, class ReshardingDocument>
-void OpObserverForTest<StateEnum, ReshardingDocument>::onUpdate(OperationContext* opCtx,
- const OplogUpdateEntryArgs& args) {
- if (args.nss != _reshardingDocumentNss) {
- return;
- }
-
- auto doc = ReshardingDocument::parse(IDLParserContext{"OpObserverForTest"},
- args.updateArgs->updatedDoc);
- _controller->_notifyNewStateAndWaitUntilUnpaused(opCtx, getState(doc));
-}
-
-template class StateTransitionController<DonorStateEnum>;
-template class StateTransitionController<RecipientStateEnum>;
-template class StateTransitionController<CoordinatorStateEnum>;
-
-template class PauseDuringStateTransitions<DonorStateEnum>;
-template class PauseDuringStateTransitions<RecipientStateEnum>;
-template class PauseDuringStateTransitions<CoordinatorStateEnum>;
-
-template class OpObserverForTest<DonorStateEnum, ReshardingDonorDocument>;
-template class OpObserverForTest<RecipientStateEnum, ReshardingRecipientDocument>;
-template class OpObserverForTest<CoordinatorStateEnum, ReshardingCoordinatorDocument>;
-
-} // namespace resharding_service_test_helpers
-} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_service_test_helpers.h b/src/mongo/db/s/resharding/resharding_service_test_helpers.h
index 7d123dde7ff..70fe8786fc7 100644
--- a/src/mongo/db/s/resharding/resharding_service_test_helpers.h
+++ b/src/mongo/db/s/resharding/resharding_service_test_helpers.h
@@ -32,39 +32,61 @@
#include "mongo/platform/basic.h"
#include "mongo/db/op_observer/op_observer_noop.h"
+#include "mongo/logv2/log.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding
namespace mongo {
namespace resharding_service_test_helpers {
-template <class StateEnum, class ReshardingDocument>
-class OpObserverForTest;
-
-template <class StateEnum>
-class PauseDuringStateTransitions;
-
-template <class StateEnum>
-class StateTransitionController;
+/**
+ * This contains the logic for pausing/unpausing when a state is reached.
+ *
+ * Template param StateEnum must have kUnused and kDone.
+ */
template <class StateEnum>
class StateTransitionController {
public:
StateTransitionController() = default;
- void waitUntilStateIsReached(StateEnum state);
+ void waitUntilStateIsReached(StateEnum state) {
+ stdx::unique_lock lk(_mutex);
+ _waitUntilUnpausedCond.wait(lk, [this, state] { return _state == state; });
+ }
private:
template <class Enum, class ReshardingDocument>
- friend class OpObserverForTest;
+ friend class StateTransitionControllerOpObserver;
template <class Enum>
friend class PauseDuringStateTransitions;
- void _setPauseDuringTransition(StateEnum state);
-
- void _unsetPauseDuringTransition(StateEnum state);
-
- void _notifyNewStateAndWaitUntilUnpaused(OperationContext* opCtx, StateEnum newState);
-
- void _resetReachedState();
+ void _setPauseDuringTransition(StateEnum state) {
+ stdx::lock_guard lk(_mutex);
+ _pauseDuringTransition.insert(state);
+ }
+
+ void _unsetPauseDuringTransition(StateEnum state) {
+ stdx::lock_guard lk(_mutex);
+ _pauseDuringTransition.erase(state);
+ _pauseDuringTransitionCond.notify_all();
+ }
+
+ void _notifyNewStateAndWaitUntilUnpaused(OperationContext* opCtx, StateEnum newState) {
+ stdx::unique_lock lk(_mutex);
+ ScopeGuard guard([this, prevState = _state] { _state = prevState; });
+ _state = newState;
+ _waitUntilUnpausedCond.notify_all();
+ opCtx->waitForConditionOrInterrupt(_pauseDuringTransitionCond, lk, [this, newState] {
+ return _pauseDuringTransition.count(newState) == 0;
+ });
+ guard.dismiss();
+ }
+
+ void _resetReachedState() {
+ stdx::lock_guard lk(_mutex);
+ _state = StateEnum::kUnused;
+ }
Mutex _mutex = MONGO_MAKE_LATCH("StateTransitionController::_mutex");
stdx::condition_variable _pauseDuringTransitionCond;
@@ -77,11 +99,23 @@ private:
template <class StateEnum>
class PauseDuringStateTransitions {
public:
- PauseDuringStateTransitions(StateTransitionController<StateEnum>* controller, StateEnum state);
- PauseDuringStateTransitions(StateTransitionController<StateEnum>* controller,
- std::vector<StateEnum> states);
+ PauseDuringStateTransitions(StateTransitionController<StateEnum>* controller, StateEnum state)
+ : PauseDuringStateTransitions<StateEnum>(controller, std::vector<StateEnum>{state}) {}
- ~PauseDuringStateTransitions();
+ PauseDuringStateTransitions(StateTransitionController<StateEnum>* controller,
+ std::vector<StateEnum> states)
+ : _controller{controller}, _states{std::move(states)} {
+ _controller->_resetReachedState();
+ for (auto state : _states) {
+ _controller->_setPauseDuringTransition(state);
+ }
+ }
+
+ ~PauseDuringStateTransitions() {
+ for (auto state : _states) {
+ _controller->_unsetPauseDuringTransition(state);
+ }
+ }
PauseDuringStateTransitions(const PauseDuringStateTransitions&) = delete;
PauseDuringStateTransitions& operator=(const PauseDuringStateTransitions&) = delete;
@@ -89,29 +123,77 @@ public:
PauseDuringStateTransitions(PauseDuringStateTransitions&&) = delete;
PauseDuringStateTransitions& operator=(PauseDuringStateTransitions&&) = delete;
- void wait(StateEnum state);
+ void wait(StateEnum state) {
+ _controller->waitUntilStateIsReached(state);
+ }
- void unset(StateEnum state);
+ void unset(StateEnum state) {
+ _controller->_unsetPauseDuringTransition(state);
+ }
private:
StateTransitionController<StateEnum>* const _controller;
const std::vector<StateEnum> _states;
};
-template <class StateEnum, class ReshardingDocument>
-class OpObserverForTest : public OpObserverNoop {
+template <class StateEnum, class StateDocument>
+class StateTransitionControllerOpObserver : public OpObserverNoop {
public:
- OpObserverForTest(std::shared_ptr<StateTransitionController<StateEnum>> controller,
- NamespaceString reshardingDocumentNss);
-
- void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override;
-
- virtual StateEnum getState(const ReshardingDocument& reshardingDoc) = 0;
+ using GetStateFunc = std::function<StateEnum(const StateDocument&)>;
+
+ StateTransitionControllerOpObserver(
+ std::shared_ptr<StateTransitionController<StateEnum>> controller,
+ NamespaceString stateDocumentNss,
+ GetStateFunc getStateFunc)
+ : _controller{std::move(controller)},
+ _stateDocumentNss{std::move(stateDocumentNss)},
+ _getState{std::move(getStateFunc)} {}
+
+ void onInserts(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool fromMigrate) override {
+ if (coll->ns() != _stateDocumentNss) {
+ return;
+ }
+
+ auto doc = StateDocument::parse(
+ IDLParserContext{"StateTransitionControllerOpObserver::onInserts"}, begin->doc);
+ _controller->_notifyNewStateAndWaitUntilUnpaused(opCtx, _getState(doc));
+ invariant(++begin == end); // No support for inserting more than one state document yet.
+ }
+
+ void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override {
+ if (args.nss != _stateDocumentNss) {
+ return;
+ }
+
+ auto doc =
+ StateDocument::parse(IDLParserContext{"StateTransitionControllerOpObserver::onUpdate"},
+ args.updateArgs->updatedDoc);
+ _controller->_notifyNewStateAndWaitUntilUnpaused(opCtx, _getState(doc));
+ }
+
+ void onDelete(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ StmtId stmtId,
+ const OplogDeleteEntryArgs& args) override {
+ if (nss != _stateDocumentNss) {
+ return;
+ }
+
+ _controller->_notifyNewStateAndWaitUntilUnpaused(opCtx, StateEnum::kDone);
+ }
private:
std::shared_ptr<StateTransitionController<StateEnum>> _controller;
- const NamespaceString _reshardingDocumentNss;
+ const NamespaceString _stateDocumentNss;
+ GetStateFunc _getState;
};
} // namespace resharding_service_test_helpers
} // namespace mongo
+
+#undef MONGO_LOGV2_DEFAULT_COMPONENT