diff options
author | Randolph Tan <randolph@10gen.com> | 2022-08-11 19:19:51 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-09 17:17:19 +0000 |
commit | c9e875975a43c819387095033bdabff82cc9cb96 (patch) | |
tree | 4b218e81a3c229444945c88d0f4f336868578d90 /src | |
parent | dab0694cd327eb0f7e540de5dee97c69f84ea45d (diff) | |
download | mongo-c9e875975a43c819387095033bdabff82cc9cb96.tar.gz |
SERVER-68706 Global Index Cloner should wait for abort/commit from coordinator
Diffstat (limited to 'src')
10 files changed, 405 insertions, 437 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 2a988b0879b..3565b4cd45f 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -552,7 +552,6 @@ env.Library( target='sharding_mongod_test_fixture', source=[ 'sharding_mongod_test_fixture.cpp', - 'resharding/resharding_service_test_helpers.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/repl/drop_pending_collection_reaper', diff --git a/src/mongo/db/s/global_index/global_index_cloner.idl b/src/mongo/db/s/global_index/global_index_cloner.idl index 81ab433f171..cbfdb4935d0 100644 --- a/src/mongo/db/s/global_index/global_index_cloner.idl +++ b/src/mongo/db/s/global_index/global_index_cloner.idl @@ -42,8 +42,19 @@ enums: values: kUnused: "unused" kCloning: "cloning" + kReadyToCommit: "ready-to-commit" + # done is not a real state but only used in tests. + kDone: "done" structs: + GlobalIndexClonerMutableState: + description: "Contains the mutable state for global index cloning operation." + strict: false + fields: + state: + type: GlobalIndexClonerState + description: "The current state of cloner." + GlobalIndexClonerDoc: description: "Document containing the state and specs of global index cloning operation." strict: false @@ -69,6 +80,4 @@ structs: type: timestamp description: "The minimum timestamp to use for fetching documents from the source collection" - state: - type: GlobalIndexClonerState - description: "The current state of cloner." + mutableState: GlobalIndexClonerMutableState diff --git a/src/mongo/db/s/global_index/global_index_cloning_service.cpp b/src/mongo/db/s/global_index/global_index_cloning_service.cpp index 0002d7ab032..fd3648bd6bc 100644 --- a/src/mongo/db/s/global_index/global_index_cloning_service.cpp +++ b/src/mongo/db/s/global_index/global_index_cloning_service.cpp @@ -36,6 +36,9 @@ #include "mongo/db/index/index_descriptor.h" #include "mongo/db/ops/delete.h" #include "mongo/db/persistent_task_store.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/global_index/global_index_cloning_external_state.h" #include "mongo/db/s/global_index/global_index_server_parameters_gen.h" #include "mongo/db/s/global_index/global_index_util.h" @@ -88,6 +91,12 @@ GlobalIndexCloningService::CloningStateMachine::CloningStateMachine( GlobalIndexClonerDoc clonerDoc) : _serviceContext(serviceContext), _cloningService(cloningService), + _indexCollectionUUID(clonerDoc.getIndexCollectionUUID()), + _sourceNss(clonerDoc.getNss()), + _sourceCollUUID(clonerDoc.getCollectionUUID()), + _indexName(clonerDoc.getIndexName()), + _indexSpec(clonerDoc.getIndexSpec().getOwned()), + _minFetchTimestamp(clonerDoc.getMinFetchTimestamp()), _execForCancelableOpCtx(std::make_shared<ThreadPool>([] { ThreadPool::Options options; options.poolName = "GlobalIndexCloningServiceCancelableOpCtxPool"; @@ -95,7 +104,7 @@ GlobalIndexCloningService::CloningStateMachine::CloningStateMachine( options.maxThreads = 1; return options; }())), - _clonerState(std::move(clonerDoc)), + _mutableState(clonerDoc.getMutableState()), _fetcherFactory(std::move(fetcherFactory)), _externalState(std::move(externalState)) {} @@ -108,31 +117,34 @@ SemiFuture<void> GlobalIndexCloningService::CloningStateMachine::run( _init(executor); - return ExecutorFuture(**executor) - .then([this, executor, abortToken] { return _persistStateDocument(executor, abortToken); }) - .then([this, executor, abortToken] { return _runUntilDoneCloning(executor, abortToken); }) - // TODO: SERVER-68706 wait from coordinator to commit or abort. - .onCompletion([this, stepdownToken](const Status& status) { - _retryingCancelableOpCtxFactory.emplace(stepdownToken, _execForCancelableOpCtx); - return status; - }) - .then([this, executor, stepdownToken] { return _cleanup(executor, stepdownToken); }) - .thenRunOn(_cloningService->getInstanceCleanupExecutor()) - .onError([](const Status& status) { - LOGV2( - 6755903, "Global index cloner encountered an error", "error"_attr = redact(status)); - return status; - }) - .onCompletion([this, self = shared_from_this()](const Status& status) { - if (!_completionPromise.getFuture().isReady()) { - if (status.isOK()) { - _completionPromise.emplaceValue(); - } else { - _completionPromise.setError(status); - } - } - }) - .semi(); + _readyToCommitPromise.setFrom(ExecutorFuture(**executor) + .then([this, executor, abortToken] { + return _persistStateDocument(executor, abortToken); + }) + .then([this, executor, abortToken] { + return _runUntilDoneCloning(executor, abortToken); + }) + .unsafeToInlineFuture()); + + _completionPromise.setFrom( + _readyToCommitPromise.getFuture() + .thenRunOn(**executor) + .then([this, stepdownToken] { + _retryingCancelableOpCtxFactory.emplace(stepdownToken, _execForCancelableOpCtx); + }) + .then([this, executor, stepdownToken] { + return future_util::withCancellation(_waitForCleanupPromise.getFuture(), + stepdownToken); + }) + .then([this, executor, stepdownToken] { return _cleanup(executor, stepdownToken); }) + .unsafeToInlineFuture() + .tapError([](const Status& status) { + LOGV2(6755903, + "Global index cloner encountered an error", + "error"_attr = redact(status)); + })); + + return _completionPromise.getFuture().semi(); } void GlobalIndexCloningService::CloningStateMachine::interrupt(Status status) {} @@ -152,7 +164,11 @@ ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_cleanup( return ExecutorFuture(**executor) .then([this, executor, stepdownToken, &cancelableFactory] { auto opCtx = cancelableFactory.makeOperationContext(&cc()); - _removeStateDocument(opCtx.get()); + PersistentTaskStore<GlobalIndexClonerDoc> store( + _cloningService->getStateDocumentsNS()); + store.remove(opCtx.get(), + BSON(GlobalIndexClonerDoc::kIndexCollectionUUIDFieldName + << _indexCollectionUUID)); }); }) .onTransientError([](const auto& status) {}) @@ -163,33 +179,28 @@ ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_cleanup( void GlobalIndexCloningService::CloningStateMachine::_init( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - _inserter = std::make_unique<GlobalIndexInserter>(_clonerState.getNss(), - _clonerState.getIndexName(), - _clonerState.getIndexCollectionUUID(), - **executor); + _inserter = std::make_unique<GlobalIndexInserter>( + _sourceNss, _indexName, _indexCollectionUUID, **executor); auto client = _serviceContext->makeClient("globalIndexClonerServiceInit"); AlternativeClientRegion clientRegion(client); auto opCtx = _serviceContext->makeOperationContext(Client::getCurrent()); - auto routingInfo = - _externalState->getShardedCollectionRoutingInfo(opCtx.get(), _clonerState.getNss()); + auto routingInfo = _externalState->getShardedCollectionRoutingInfo(opCtx.get(), _sourceNss); uassert(6755901, - str::stream() << "Cannot create global index on unsharded ns " - << _clonerState.getNss().ns(), + str::stream() << "Cannot create global index on unsharded ns " << _sourceNss.ns(), routingInfo.isSharded()); auto myShardId = _externalState->myShardId(_serviceContext); - auto indexKeyPattern = - _clonerState.getIndexSpec().getObjectField(IndexDescriptor::kKeyPatternFieldName); - _fetcher = _fetcherFactory->make(_clonerState.getNss(), - _clonerState.getCollectionUUID(), - _clonerState.getIndexCollectionUUID(), + auto indexKeyPattern = _indexSpec.getObjectField(IndexDescriptor::kKeyPatternFieldName); + _fetcher = _fetcherFactory->make(_sourceNss, + _sourceCollUUID, + _indexCollectionUUID, myShardId, - _clonerState.getMinFetchTimestamp(), + _minFetchTimestamp, routingInfo.getShardKeyPattern().getKeyPattern(), indexKeyPattern.getOwned()); } @@ -205,14 +216,17 @@ ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_runUntilDo }) .then([this, executor, cancelToken, cancelableFactory] { return _clone(executor, cancelToken, cancelableFactory); + }) + .then([this, executor, cancelToken, cancelableFactory] { + return _transitionToReadyToCommit(executor, cancelToken); + }) + .then([this, cancelToken](const repl::OpTime& readyToCommitOpTime) { + return WaitForMajorityService::get(_serviceContext) + .waitUntilMajority(readyToCommitOpTime, cancelToken); }); }) - .onTransientError([](const Status& status) { - - }) - .onUnrecoverableError([](const Status& status) { - - }) + .onTransientError([](const Status& status) {}) + .onUnrecoverableError([](const Status& status) {}) .until<Status>([](const Status& status) { return status.isOK(); }) .on(**executor, cancelToken); } @@ -230,11 +244,12 @@ void GlobalIndexCloningService::CloningStateMachine::checkIfOptionsConflict( const BSONObj& stateDoc) const { auto newCloning = GlobalIndexClonerDoc::parse(IDLParserContext("globalIndexCloningCheckConflict"), stateDoc); + uassert(6755900, - str::stream() << "new global index " << stateDoc << " is incompatible with ongoing " - << _clonerState.toBSON(), - newCloning.getNss() == _clonerState.getNss() && - newCloning.getCollectionUUID() == _clonerState.getCollectionUUID()); + str::stream() << "New global index " << stateDoc + << " is incompatible with ongoing global index build in namespace: " + << _sourceNss << ", uuid: " << _sourceCollUUID, + newCloning.getNss() == _sourceNss && newCloning.getCollectionUUID() == _sourceCollUUID); } CancellationToken GlobalIndexCloningService::CloningStateMachine::_initAbortSource( @@ -251,7 +266,7 @@ CancellationToken GlobalIndexCloningService::CloningStateMachine::_initAbortSour ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_persistStateDocument( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& cancelToken) { - if (_clonerState.getState() > GlobalIndexClonerStateEnum::kUnused) { + if (_getState() > GlobalIndexClonerStateEnum::kUnused) { return ExecutorFuture<void>(**executor, Status::OK()); } @@ -259,14 +274,17 @@ ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_persistSta ->withAutomaticRetry([this, executor](auto& cancelableFactory) { auto opCtx = cancelableFactory.makeOperationContext(Client::getCurrent()); - GlobalIndexClonerDoc newDoc(_clonerState); - newDoc.setState(GlobalIndexClonerStateEnum::kCloning); + auto newDoc = _makeClonerDoc(); + newDoc.getMutableState().setState(GlobalIndexClonerStateEnum::kCloning); PersistentTaskStore<GlobalIndexClonerDoc> store(_cloningService->getStateDocumentsNS()); store.add(opCtx.get(), newDoc, kNoWaitWriteConcern); - std::swap(_clonerState, newDoc); - LOGV2(6755904, "Persisted global index state document"); + + { + stdx::unique_lock lk(_mutex); + _mutableState.setState(GlobalIndexClonerStateEnum::kCloning); + } }) .onTransientError([](const Status& status) {}) .onUnrecoverableError([](const Status& status) {}) @@ -274,33 +292,43 @@ ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_persistSta .on(**executor, cancelToken); } -void GlobalIndexCloningService::CloningStateMachine::_removeStateDocument(OperationContext* opCtx) { - const auto& nss = _cloningService->getStateDocumentsNS(); - writeConflictRetry( - opCtx, "GlobalIndexCloningStateMachine::removeStateDocument", nss.toString(), [&] { - AutoGetCollection coll(opCtx, nss, MODE_IX); +ExecutorFuture<repl::OpTime> +GlobalIndexCloningService::CloningStateMachine::_transitionToReadyToCommit( + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& cancelToken) { + if (_getState() > GlobalIndexClonerStateEnum::kReadyToCommit) { + // If we recovered from disk, then primary only service would have already waited for + // majority, so just return an empty opTime. + return ExecutorFuture<repl::OpTime>(**executor, repl::OpTime()); + } - if (!coll) { - return; - } + return _retryingCancelableOpCtxFactory + ->withAutomaticRetry([this, executor](auto& cancelableFactory) { + auto opCtx = cancelableFactory.makeOperationContext(Client::getCurrent()); - WriteUnitOfWork wuow(opCtx); + PersistentTaskStore<GlobalIndexClonerDoc> store(_cloningService->getStateDocumentsNS()); + + auto mutableState = _getMutableState(); + mutableState.setState(GlobalIndexClonerStateEnum::kReadyToCommit); - // Set the promise when the delete commits, this is to ensure that any interruption that - // happens later won't result in setting an error on the completion promise. - opCtx->recoveryUnit()->onCommit([this](boost::optional<Timestamp> unusedCommitTime) { - _completionPromise.emplaceValue(); - }); + BSONObj update(BSON("$set" << BSON(GlobalIndexClonerDoc::kMutableStateFieldName + << mutableState.toBSON()))); + store.update( + opCtx.get(), + BSON(GlobalIndexClonerDoc::kIndexCollectionUUIDFieldName << _indexCollectionUUID), + update); - deleteObjects(opCtx, - *coll, - nss, - BSON(GlobalIndexClonerDoc::kIndexCollectionUUIDFieldName - << _clonerState.getIndexCollectionUUID()), - true /* justOne */); + { + stdx::unique_lock lk(_mutex); + _mutableState = mutableState; + } - wuow.commit(); - }); + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + }) + .onTransientError([](const Status& status) {}) + .onUnrecoverableError([](const Status& status) {}) + .until<StatusWith<repl::OpTime>>( + [](const StatusWith<repl::OpTime>& status) { return status.isOK(); }) + .on(**executor, cancelToken); } void GlobalIndexCloningService::CloningStateMachine::_initializeCollections( @@ -308,8 +336,7 @@ void GlobalIndexCloningService::CloningStateMachine::_initializeCollections( auto cancelableOpCtx = cancelableOpCtxFactory.makeOperationContext(Client::getCurrent()); auto opCtx = cancelableOpCtx.get(); - resharding::data_copy::ensureCollectionExists( - opCtx, skipIdNss(_clonerState.getNss(), _clonerState.getIndexName()), {}); + resharding::data_copy::ensureCollectionExists(opCtx, skipIdNss(_sourceNss, _indexName), {}); } ExecutorFuture<void> GlobalIndexCloningService::CloningStateMachine::_clone( @@ -389,5 +416,43 @@ void GlobalIndexCloningService::CloningStateMachine::_ensureCollection(Operation }); } +void GlobalIndexCloningService::CloningStateMachine::cleanup() { + stdx::unique_lock lk(_mutex); + + if (!_waitForCleanupPromise.getFuture().isReady()) { + _waitForCleanupPromise.emplaceValue(); + } + + // TODO: SERVER-67563 Implement abort +} + +GlobalIndexClonerStateEnum GlobalIndexCloningService::CloningStateMachine::_getState() const { + stdx::unique_lock lk(_mutex); + return _mutableState.getState(); +} + +GlobalIndexClonerMutableState GlobalIndexCloningService::CloningStateMachine::_getMutableState() + const { + stdx::unique_lock lk(_mutex); + return _mutableState; +} + +GlobalIndexClonerDoc GlobalIndexCloningService::CloningStateMachine::_makeClonerDoc() const { + GlobalIndexClonerDoc clonerDoc; + clonerDoc.setIndexCollectionUUID(_indexCollectionUUID); + clonerDoc.setNss(_sourceNss); + clonerDoc.setCollectionUUID(_sourceCollUUID); + clonerDoc.setIndexName(_indexName); + clonerDoc.setIndexSpec(_indexSpec); + clonerDoc.setMinFetchTimestamp(_minFetchTimestamp); + + { + stdx::unique_lock lk(_mutex); + clonerDoc.setMutableState(_mutableState); + } + + return clonerDoc; +} + } // namespace global_index } // namespace mongo diff --git a/src/mongo/db/s/global_index/global_index_cloning_service.h b/src/mongo/db/s/global_index/global_index_cloning_service.h index 6342938e529..67d3ecdc456 100644 --- a/src/mongo/db/s/global_index/global_index_cloning_service.h +++ b/src/mongo/db/s/global_index/global_index_cloning_service.h @@ -94,6 +94,10 @@ public: void abort(); + SharedSemiFuture<void> getReadyToCommitFuture() const { + return _readyToCommitPromise.getFuture(); + } + /** * Returns a Future that will be resolved when all work associated with this Instance is done * making forward progress. @@ -111,6 +115,12 @@ public: */ void abort(bool isUserCancelled); + /** + * Tells this cloner to perform cleanup. This can cause this cloner to abort if it is still + * running. + */ + void cleanup(); + void checkIfOptionsConflict(const BSONObj& stateDoc) const final; private: @@ -137,10 +147,9 @@ private: std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& cancelToken); - /** - * Deletes the state document from storage. - */ - void _removeStateDocument(OperationContext* opCtx); + ExecutorFuture<repl::OpTime> _transitionToReadyToCommit( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& cancelToken); /** * Performs the entire cloning process. @@ -181,39 +190,60 @@ private: */ void _ensureCollection(OperationContext* opCtx, const NamespaceString& nss); - ServiceContext* const _serviceContext; + GlobalIndexClonerStateEnum _getState() const; + GlobalIndexClonerMutableState _getMutableState() const; + GlobalIndexClonerDoc _makeClonerDoc() const; + + /********************************************************************************** + * Thread safety legend + * + * (TS) - Thread safe. Object can be accessed concurrently without additional mutex. + * (NC) - No concurrent access pattern. So can be used without mutex. + * (M) - Mutex required. + */ + + ServiceContext* const _serviceContext; // (TS) // The primary-only service instance corresponding to the cloner instance. Not owned. - const GlobalIndexCloningService* const _cloningService; + const GlobalIndexCloningService* const _cloningService; // (TS) + + const UUID _indexCollectionUUID; + const NamespaceString _sourceNss; + const UUID _sourceCollUUID; + const std::string _indexName; + const BSONObj _indexSpec; + const Timestamp _minFetchTimestamp; // A separate executor different from the one supplied by the primary only service is needed // because the one from POS can be shut down during step down. This will ensure that the // operation context created from the cancelableOpCtxFactory can be interrupted when the cancel // token is aborted during step down. - const std::shared_ptr<ThreadPool> _execForCancelableOpCtx; + const std::shared_ptr<ThreadPool> _execForCancelableOpCtx; // (TS) boost::optional<resharding::RetryingCancelableOperationContextFactory> - _retryingCancelableOpCtxFactory; + _retryingCancelableOpCtxFactory; // (TS) - Mutex _mutex = MONGO_MAKE_LATCH("GlobalIndexCloningStateMachine::_mutex"); + mutable Mutex _mutex = MONGO_MAKE_LATCH("GlobalIndexCloningStateMachine::_mutex"); - GlobalIndexClonerDoc _clonerState; + GlobalIndexClonerMutableState _mutableState; // (NC) // Canceled when there is an unrecoverable error or stepdown. - boost::optional<CancellationSource> _abortSource; + boost::optional<CancellationSource> _abortSource; // (M) - std::unique_ptr<GlobalIndexClonerFetcherFactoryInterface> _fetcherFactory; - std::unique_ptr<GlobalIndexClonerFetcherInterface> _fetcher; - std::unique_ptr<GlobalIndexInserter> _inserter; + std::unique_ptr<GlobalIndexClonerFetcherFactoryInterface> _fetcherFactory; // (TS) + std::unique_ptr<GlobalIndexClonerFetcherInterface> _fetcher; // (NC) + std::unique_ptr<GlobalIndexInserter> _inserter; // (NC) // Keeps track if there is still a posibility that we still have documents that needs to be // fetched from the source collection. - bool _hasMoreToFetch{true}; + bool _hasMoreToFetch{true}; // (NC) - std::queue<GlobalIndexClonerFetcher::FetchedEntry> _fetchedDocs; + std::queue<GlobalIndexClonerFetcher::FetchedEntry> _fetchedDocs; // (NC) - SharedPromise<void> _completionPromise; - const std::unique_ptr<CloningExternalState> _externalState; + SharedPromise<void> _completionPromise; // (TS) + SharedPromise<void> _readyToCommitPromise; // (TS) + SharedPromise<void> _waitForCleanupPromise; // (M) + const std::unique_ptr<CloningExternalState> _externalState; // (TS) }; } // namespace global_index diff --git a/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp b/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp index 187271df7cf..ea56451168c 100644 --- a/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp +++ b/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp @@ -39,6 +39,7 @@ #include "mongo/db/s/global_index/global_index_cloning_external_state.h" #include "mongo/db/s/global_index/global_index_cloning_service.h" #include "mongo/db/s/global_index/global_index_util.h" +#include "mongo/db/s/resharding/resharding_service_test_helpers.h" #include "mongo/db/session/logical_session_cache_noop.h" #include "mongo/db/session/session_catalog_mongod.h" #include "mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h" @@ -51,6 +52,14 @@ namespace mongo { namespace global_index { namespace { +using StateTransitionController = + resharding_service_test_helpers::StateTransitionController<GlobalIndexClonerStateEnum>; +using OpObserverForTest = + resharding_service_test_helpers::StateTransitionControllerOpObserver<GlobalIndexClonerStateEnum, + GlobalIndexClonerDoc>; +using PauseDuringStateTransitions = + resharding_service_test_helpers::PauseDuringStateTransitions<GlobalIndexClonerStateEnum>; + const ShardId kRecipientShardId{"myShardId"}; const NamespaceString kSourceNss{"sourcedb", "sourcecollection"}; constexpr auto kSourceShardKey = "key"_sd; @@ -168,72 +177,6 @@ private: MockGlobalIndexClonerFetcher* _mockFetcher; }; -class Blocker { -public: - ~Blocker() { - stdx::unique_lock lk(_mutex); - _shouldBlock = false; - _cvBlocked.notify_all(); - } - - void blockIfActivated(OperationContext* opCtx) { - stdx::unique_lock lk(_mutex); - _blockedOnce = true; - opCtx->waitForConditionOrInterrupt(_cvBlocked, lk, [this] { return !_shouldBlock; }); - } - - void waitUntilBlockedOccurred(OperationContext* opCtx) { - stdx::unique_lock lk(_mutex); - opCtx->waitForConditionOrInterrupt(_cvBlocked, lk, [this] { return _blockedOnce; }); - } - - void block() { - stdx::unique_lock lk(_mutex); - _shouldBlock = true; - } - - void unblock() { - stdx::unique_lock lk(_mutex); - _shouldBlock = false; - } - -private: - Mutex _mutex = MONGO_MAKE_LATCH("GlobalIndexCloningServiceTestBlocker::_mutex"); - stdx::condition_variable _cvBlocked; - bool _shouldBlock{false}; - bool _blockedOnce{false}; -}; - -class OpObserverForTest : public OpObserverNoop { -public: - OpObserverForTest(Blocker* insertBlocker, Blocker* deleteBlocker) - : _insertBlocker(insertBlocker), _deleteBlocker(deleteBlocker) {} - - void onInserts(OperationContext* opCtx, - const CollectionPtr& coll, - std::vector<InsertStatement>::const_iterator begin, - std::vector<InsertStatement>::const_iterator end, - bool fromMigrate) override { - if (NamespaceString::kGlobalIndexClonerNamespace == coll->ns()) { - _insertBlocker->blockIfActivated(opCtx); - } - } - - void onDelete(OperationContext* opCtx, - const NamespaceString& nss, - const UUID& uuid, - StmtId stmtId, - const OplogDeleteEntryArgs& args) override { - if (NamespaceString::kGlobalIndexClonerNamespace == nss) { - _deleteBlocker->blockIfActivated(opCtx); - } - } - -private: - Blocker* _insertBlocker; - Blocker* _deleteBlocker; -}; - GlobalIndexCloningService::InstanceID extractInstanceId(const GlobalIndexClonerDoc& doc) { return BSON("_id" << doc.getIndexCollectionUUID()); } @@ -257,8 +200,13 @@ public: // so we should manually instantiate it to ensure it exists in our tests. ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn()); + _stateTransitionController = std::make_shared<StateTransitionController>(); _opObserverRegistry->addObserver( - std::make_unique<OpObserverForTest>(&_stateDocInsertBlocker, &_stateDocDeleteBlocker)); + std::make_unique<OpObserverForTest>(_stateTransitionController, + NamespaceString::kGlobalIndexClonerNamespace, + [](const GlobalIndexClonerDoc& stateDoc) { + return stateDoc.getMutableState().getState(); + })); // Create config.transactions collection auto opCtx = serviceContext->makeOperationContext(Client::getCurrent()); @@ -331,12 +279,8 @@ public: return false; } - Blocker* getStateDocInsertBlocker() { - return &_stateDocInsertBlocker; - } - - Blocker* getStateDocDeleteBlocker() { - return &_stateDocDeleteBlocker; + StateTransitionController* stateTransitionController() { + return _stateTransitionController.get(); } void replaceFetcherResultList( @@ -377,8 +321,7 @@ private: const BSONObj _indexSpec{BSON("key" << BSON(_indexKey << 1) << "unique" << true)}; ReadWriteConcernDefaultsLookupMock _lookupMock; - Blocker _stateDocInsertBlocker; - Blocker _stateDocDeleteBlocker; + std::shared_ptr<StateTransitionController> _stateTransitionController; MockGlobalIndexClonerFetcher _mockFetcher; MockGlobalIndexClonerFetcher _fetcherCopyForVerification; @@ -391,6 +334,8 @@ TEST_F(GlobalIndexClonerServiceTest, CloneInsertsToGlobalIndexCollection) { auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON()); auto future = cloner->getCompletionFuture(); + cloner->getReadyToCommitFuture().get(); + cloner->cleanup(); future.get(); ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName()))); @@ -402,39 +347,57 @@ TEST_F(GlobalIndexClonerServiceTest, ShouldBeSafeToRetryOnStepDown) { auto opCtx = makeOperationContext(); auto rawOpCtx = opCtx.get(); - auto stateDocInsertBlocker = getStateDocInsertBlocker(); - stateDocInsertBlocker->block(); - auto stateDocDeleteBlocker = getStateDocDeleteBlocker(); - stateDocDeleteBlocker->block(); + const std::vector<GlobalIndexClonerStateEnum> states{GlobalIndexClonerStateEnum::kCloning, + GlobalIndexClonerStateEnum::kReadyToCommit, + GlobalIndexClonerStateEnum::kDone}; + PauseDuringStateTransitions stateTransitionsGuard{stateTransitionController(), states}; + + auto prevState = GlobalIndexClonerStateEnum::kUnused; + for (const auto& nextState : states) { + LOGV2(6870601, + "Testing next state", + "state"_attr = GlobalIndexClonerState_serializer(nextState)); + + auto cloner = ([&] { + if (nextState == GlobalIndexClonerStateEnum::kCloning || + nextState == GlobalIndexClonerStateEnum::kReadyToCommit) { + return GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON()); + } - { - auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON()); - stateDocInsertBlocker->waitUntilBlockedOccurred(rawOpCtx); - stepDown(); + return *GlobalIndexStateMachine::lookup(rawOpCtx, _service, extractInstanceId(doc)); + })(); - ASSERT_THROWS(cloner->getCompletionFuture().get(), DBException); - } + if (prevState != GlobalIndexClonerStateEnum::kUnused) { + stateTransitionsGuard.unset(prevState); + } - stepUp(rawOpCtx); + auto readyToCommitFuture = cloner->getReadyToCommitFuture(); + + if (nextState == GlobalIndexClonerStateEnum::kDone) { + readyToCommitFuture.get(); + cloner->cleanup(); + } - { - auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON()); - stateDocInsertBlocker->unblock(); - stateDocDeleteBlocker->waitUntilBlockedOccurred(rawOpCtx); + stateTransitionsGuard.wait(nextState); stepDown(); + if (nextState != GlobalIndexClonerStateEnum::kDone) { + ASSERT_THROWS(readyToCommitFuture.get(), DBException); + } + + // Note: can either throw InterruptDueToRepl or ShutdownInProgress (from executor). ASSERT_THROWS(cloner->getCompletionFuture().get(), DBException); - } - stepUp(rawOpCtx); + stepUp(rawOpCtx); - // It is possible for the primary only service to run to completion and no longer exists. - { - auto cloner = GlobalIndexStateMachine::lookup(rawOpCtx, _service, extractInstanceId(doc)); - stateDocDeleteBlocker->unblock(); - (*cloner)->getCompletionFuture().get(); + prevState = nextState; } + auto cloner = *GlobalIndexStateMachine::lookup(rawOpCtx, _service, extractInstanceId(doc)); + stateTransitionsGuard.unset(GlobalIndexClonerStateEnum::kDone); + cloner->cleanup(); + cloner->getCompletionFuture().get(); + checkIndexCollection(rawOpCtx); } @@ -458,6 +421,8 @@ TEST_F(GlobalIndexClonerServiceTest, ShouldBeAbleToConsumeMultipleBatchesWorthof auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON()); auto future = cloner->getCompletionFuture(); + cloner->getReadyToCommitFuture().get(); + cloner->cleanup(); future.get(); ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName()))); @@ -473,6 +438,8 @@ TEST_F(GlobalIndexClonerServiceTest, ShouldWorkWithEmptyCollection) { auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON()); auto future = cloner->getCompletionFuture(); + cloner->getReadyToCommitFuture().get(); + cloner->cleanup(); future.get(); ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName()))); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp index a906c8f4f93..0c89a9aca53 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp @@ -64,23 +64,11 @@ namespace { using CoordinatorStateTransitionController = resharding_service_test_helpers::StateTransitionController<CoordinatorStateEnum>; -using OpObserverForTest = - resharding_service_test_helpers::OpObserverForTest<CoordinatorStateEnum, - ReshardingCoordinatorDocument>; +using OpObserverForTest = resharding_service_test_helpers:: + StateTransitionControllerOpObserver<CoordinatorStateEnum, ReshardingCoordinatorDocument>; using PauseDuringStateTransitions = resharding_service_test_helpers::PauseDuringStateTransitions<CoordinatorStateEnum>; -class CoordinatorOpObserverForTest : public OpObserverForTest { -public: - CoordinatorOpObserverForTest(std::shared_ptr<CoordinatorStateTransitionController> controller) - : OpObserverForTest(std::move(controller), - NamespaceString::kConfigReshardingOperationsNamespace) {} - - CoordinatorStateEnum getState(const ReshardingCoordinatorDocument& coordinatorDoc) override { - return coordinatorDoc.getState(); - } -}; - class ExternalStateForTest : public ReshardingCoordinatorExternalState { ParticipantShardsAndChunks calculateParticipantShardsAndChunks( OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) override { @@ -169,11 +157,10 @@ public: invariant(_opObserverRegistry); _opObserverRegistry->addObserver(std::make_unique<ReshardingOpObserver>()); - _opObserverRegistry->addObserver( - std::make_unique<CoordinatorOpObserverForTest>(_controller)); - _opObserverRegistry->addObserver( - std::make_unique<repl::PrimaryOnlyServiceOpObserver>(getServiceContext())); - + _opObserverRegistry->addObserver(std::make_unique<OpObserverForTest>( + _controller, + NamespaceString::kConfigReshardingOperationsNamespace, + [](const ReshardingCoordinatorDocument& stateDoc) { return stateDoc.getState(); })); _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext()); auto service = makeService(getServiceContext()); auto serviceName = service->getServiceName(); diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp index 7ce393e5e9d..53c0044ae75 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp @@ -65,7 +65,8 @@ namespace { using DonorStateTransitionController = resharding_service_test_helpers::StateTransitionController<DonorStateEnum>; using OpObserverForTest = - resharding_service_test_helpers::OpObserverForTest<DonorStateEnum, ReshardingDonorDocument>; + resharding_service_test_helpers::StateTransitionControllerOpObserver<DonorStateEnum, + ReshardingDonorDocument>; using PauseDuringStateTransitions = resharding_service_test_helpers::PauseDuringStateTransitions<DonorStateEnum>; @@ -90,17 +91,6 @@ public: const NamespaceString& tempReshardingNss) override {} }; -class DonorOpObserverForTest : public OpObserverForTest { -public: - DonorOpObserverForTest(std::shared_ptr<DonorStateTransitionController> controller) - : OpObserverForTest(std::move(controller), - NamespaceString::kDonorReshardingOperationsNamespace) {} - - DonorStateEnum getState(const ReshardingDonorDocument& donorDoc) override { - return donorDoc.getMutableState().getState(); - } -}; - class ReshardingDonorServiceForTest : public ReshardingDonorService { public: explicit ReshardingDonorServiceForTest(ServiceContext* serviceContext) @@ -137,7 +127,12 @@ public: repl::StorageInterface::set(serviceContext, std::move(storageMock)); _controller = std::make_shared<DonorStateTransitionController>(); - _opObserverRegistry->addObserver(std::make_unique<DonorOpObserverForTest>(_controller)); + _opObserverRegistry->addObserver(std::make_unique<OpObserverForTest>( + _controller, + NamespaceString::kDonorReshardingOperationsNamespace, + [](const ReshardingDonorDocument& donorDoc) { + return donorDoc.getMutableState().getState(); + })); } DonorStateTransitionController* controller() { diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index 32eeadff416..1f21ea9342b 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -63,9 +63,8 @@ using RecipientStateTransitionController = resharding_service_test_helpers::StateTransitionController<RecipientStateEnum>; using PauseDuringStateTransitions = resharding_service_test_helpers::PauseDuringStateTransitions<RecipientStateEnum>; -using OpObserverForTest = - resharding_service_test_helpers::OpObserverForTest<RecipientStateEnum, - ReshardingRecipientDocument>; +using OpObserverForTest = resharding_service_test_helpers:: + StateTransitionControllerOpObserver<RecipientStateEnum, ReshardingRecipientDocument>; const ShardId recipientShardId{"myShardId"}; class ExternalStateForTest : public ReshardingRecipientService::RecipientStateMachineExternalState { @@ -157,17 +156,6 @@ private: const ShardId _someDonorId{"myDonorId"}; }; -class RecipientOpObserverForTest : public OpObserverForTest { -public: - RecipientOpObserverForTest(std::shared_ptr<RecipientStateTransitionController> controller) - : OpObserverForTest(std::move(controller), - NamespaceString::kRecipientReshardingOperationsNamespace) {} - - RecipientStateEnum getState(const ReshardingRecipientDocument& recipientDoc) override { - return recipientDoc.getMutableState().getState(); - } -}; - class DataReplicationForTest : public ReshardingDataReplicationInterface { public: SemiFuture<void> runUntilStrictlyConsistent( @@ -235,7 +223,12 @@ public: repl::StorageInterface::set(serviceContext, std::move(storageMock)); _controller = std::make_shared<RecipientStateTransitionController>(); - _opObserverRegistry->addObserver(std::make_unique<RecipientOpObserverForTest>(_controller)); + _opObserverRegistry->addObserver(std::make_unique<OpObserverForTest>( + _controller, + NamespaceString::kRecipientReshardingOperationsNamespace, + [](const ReshardingRecipientDocument& stateDoc) { + return stateDoc.getMutableState().getState(); + })); } RecipientStateTransitionController* controller() { diff --git a/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp b/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp deleted file mode 100644 index 2fae639c187..00000000000 --- a/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - - -#include "mongo/platform/basic.h" - -#include "mongo/db/s/resharding/resharding_service_test_helpers.h" - -#include "mongo/db/s/resharding/resharding_coordinator_service.h" -#include "mongo/db/s/resharding/resharding_donor_service.h" -#include "mongo/db/s/resharding/resharding_recipient_service.h" - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - - -namespace mongo { -namespace resharding_service_test_helpers { - -// ----------------------------------------------- -// StateTransitionController -// ----------------------------------------------- - -template <class StateEnum> -void StateTransitionController<StateEnum>::waitUntilStateIsReached(StateEnum state) { - stdx::unique_lock lk(_mutex); - _waitUntilUnpausedCond.wait(lk, [this, state] { return _state == state; }); -} - -template <class StateEnum> -void StateTransitionController<StateEnum>::_setPauseDuringTransition(StateEnum state) { - stdx::lock_guard lk(_mutex); - _pauseDuringTransition.insert(state); -} - -template <class StateEnum> -void StateTransitionController<StateEnum>::_unsetPauseDuringTransition(StateEnum state) { - stdx::lock_guard lk(_mutex); - _pauseDuringTransition.erase(state); - _pauseDuringTransitionCond.notify_all(); -} - -template <class StateEnum> -void StateTransitionController<StateEnum>::_notifyNewStateAndWaitUntilUnpaused( - OperationContext* opCtx, StateEnum newState) { - stdx::unique_lock lk(_mutex); - ScopeGuard guard([this, prevState = _state] { _state = prevState; }); - _state = newState; - _waitUntilUnpausedCond.notify_all(); - opCtx->waitForConditionOrInterrupt(_pauseDuringTransitionCond, lk, [this, newState] { - return _pauseDuringTransition.count(newState) == 0; - }); - guard.dismiss(); -} - -template <class StateEnum> -void StateTransitionController<StateEnum>::_resetReachedState() { - stdx::lock_guard lk(_mutex); - _state = StateEnum::kUnused; -} - -// ----------------------------------------------- -// PauseDuringStateTransitions -// ----------------------------------------------- - -template <class StateEnum> -PauseDuringStateTransitions<StateEnum>::PauseDuringStateTransitions( - StateTransitionController<StateEnum>* controller, StateEnum state) - : PauseDuringStateTransitions<StateEnum>(controller, std::vector<StateEnum>{state}) {} - -template <class StateEnum> -PauseDuringStateTransitions<StateEnum>::PauseDuringStateTransitions( - StateTransitionController<StateEnum>* controller, std::vector<StateEnum> states) - : _controller{controller}, _states{std::move(states)} { - _controller->_resetReachedState(); - for (auto state : _states) { - _controller->_setPauseDuringTransition(state); - } -} - -template <class StateEnum> -PauseDuringStateTransitions<StateEnum>::~PauseDuringStateTransitions() { - for (auto state : _states) { - _controller->_unsetPauseDuringTransition(state); - } -} - -template <class StateEnum> -void PauseDuringStateTransitions<StateEnum>::wait(StateEnum state) { - _controller->waitUntilStateIsReached(state); -} - -template <class StateEnum> -void PauseDuringStateTransitions<StateEnum>::unset(StateEnum state) { - _controller->_unsetPauseDuringTransition(state); -} - -// ----------------------------------------------- -// OpObserverForTest -// ----------------------------------------------- - -template <class StateEnum, class ReshardingDocument> -OpObserverForTest<StateEnum, ReshardingDocument>::OpObserverForTest( - std::shared_ptr<StateTransitionController<StateEnum>> controller, - NamespaceString reshardingDocumentNss) - : _controller{std::move(controller)}, - _reshardingDocumentNss{std::move(reshardingDocumentNss)} {} - -template <class StateEnum, class ReshardingDocument> -void OpObserverForTest<StateEnum, ReshardingDocument>::onUpdate(OperationContext* opCtx, - const OplogUpdateEntryArgs& args) { - if (args.nss != _reshardingDocumentNss) { - return; - } - - auto doc = ReshardingDocument::parse(IDLParserContext{"OpObserverForTest"}, - args.updateArgs->updatedDoc); - _controller->_notifyNewStateAndWaitUntilUnpaused(opCtx, getState(doc)); -} - -template class StateTransitionController<DonorStateEnum>; -template class StateTransitionController<RecipientStateEnum>; -template class StateTransitionController<CoordinatorStateEnum>; - -template class PauseDuringStateTransitions<DonorStateEnum>; -template class PauseDuringStateTransitions<RecipientStateEnum>; -template class PauseDuringStateTransitions<CoordinatorStateEnum>; - -template class OpObserverForTest<DonorStateEnum, ReshardingDonorDocument>; -template class OpObserverForTest<RecipientStateEnum, ReshardingRecipientDocument>; -template class OpObserverForTest<CoordinatorStateEnum, ReshardingCoordinatorDocument>; - -} // namespace resharding_service_test_helpers -} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_service_test_helpers.h b/src/mongo/db/s/resharding/resharding_service_test_helpers.h index 7d123dde7ff..70fe8786fc7 100644 --- a/src/mongo/db/s/resharding/resharding_service_test_helpers.h +++ b/src/mongo/db/s/resharding/resharding_service_test_helpers.h @@ -32,39 +32,61 @@ #include "mongo/platform/basic.h" #include "mongo/db/op_observer/op_observer_noop.h" +#include "mongo/logv2/log.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding namespace mongo { namespace resharding_service_test_helpers { -template <class StateEnum, class ReshardingDocument> -class OpObserverForTest; - -template <class StateEnum> -class PauseDuringStateTransitions; - -template <class StateEnum> -class StateTransitionController; +/** + * This contains the logic for pausing/unpausing when a state is reached. + * + * Template param StateEnum must have kUnused and kDone. + */ template <class StateEnum> class StateTransitionController { public: StateTransitionController() = default; - void waitUntilStateIsReached(StateEnum state); + void waitUntilStateIsReached(StateEnum state) { + stdx::unique_lock lk(_mutex); + _waitUntilUnpausedCond.wait(lk, [this, state] { return _state == state; }); + } private: template <class Enum, class ReshardingDocument> - friend class OpObserverForTest; + friend class StateTransitionControllerOpObserver; template <class Enum> friend class PauseDuringStateTransitions; - void _setPauseDuringTransition(StateEnum state); - - void _unsetPauseDuringTransition(StateEnum state); - - void _notifyNewStateAndWaitUntilUnpaused(OperationContext* opCtx, StateEnum newState); - - void _resetReachedState(); + void _setPauseDuringTransition(StateEnum state) { + stdx::lock_guard lk(_mutex); + _pauseDuringTransition.insert(state); + } + + void _unsetPauseDuringTransition(StateEnum state) { + stdx::lock_guard lk(_mutex); + _pauseDuringTransition.erase(state); + _pauseDuringTransitionCond.notify_all(); + } + + void _notifyNewStateAndWaitUntilUnpaused(OperationContext* opCtx, StateEnum newState) { + stdx::unique_lock lk(_mutex); + ScopeGuard guard([this, prevState = _state] { _state = prevState; }); + _state = newState; + _waitUntilUnpausedCond.notify_all(); + opCtx->waitForConditionOrInterrupt(_pauseDuringTransitionCond, lk, [this, newState] { + return _pauseDuringTransition.count(newState) == 0; + }); + guard.dismiss(); + } + + void _resetReachedState() { + stdx::lock_guard lk(_mutex); + _state = StateEnum::kUnused; + } Mutex _mutex = MONGO_MAKE_LATCH("StateTransitionController::_mutex"); stdx::condition_variable _pauseDuringTransitionCond; @@ -77,11 +99,23 @@ private: template <class StateEnum> class PauseDuringStateTransitions { public: - PauseDuringStateTransitions(StateTransitionController<StateEnum>* controller, StateEnum state); - PauseDuringStateTransitions(StateTransitionController<StateEnum>* controller, - std::vector<StateEnum> states); + PauseDuringStateTransitions(StateTransitionController<StateEnum>* controller, StateEnum state) + : PauseDuringStateTransitions<StateEnum>(controller, std::vector<StateEnum>{state}) {} - ~PauseDuringStateTransitions(); + PauseDuringStateTransitions(StateTransitionController<StateEnum>* controller, + std::vector<StateEnum> states) + : _controller{controller}, _states{std::move(states)} { + _controller->_resetReachedState(); + for (auto state : _states) { + _controller->_setPauseDuringTransition(state); + } + } + + ~PauseDuringStateTransitions() { + for (auto state : _states) { + _controller->_unsetPauseDuringTransition(state); + } + } PauseDuringStateTransitions(const PauseDuringStateTransitions&) = delete; PauseDuringStateTransitions& operator=(const PauseDuringStateTransitions&) = delete; @@ -89,29 +123,77 @@ public: PauseDuringStateTransitions(PauseDuringStateTransitions&&) = delete; PauseDuringStateTransitions& operator=(PauseDuringStateTransitions&&) = delete; - void wait(StateEnum state); + void wait(StateEnum state) { + _controller->waitUntilStateIsReached(state); + } - void unset(StateEnum state); + void unset(StateEnum state) { + _controller->_unsetPauseDuringTransition(state); + } private: StateTransitionController<StateEnum>* const _controller; const std::vector<StateEnum> _states; }; -template <class StateEnum, class ReshardingDocument> -class OpObserverForTest : public OpObserverNoop { +template <class StateEnum, class StateDocument> +class StateTransitionControllerOpObserver : public OpObserverNoop { public: - OpObserverForTest(std::shared_ptr<StateTransitionController<StateEnum>> controller, - NamespaceString reshardingDocumentNss); - - void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override; - - virtual StateEnum getState(const ReshardingDocument& reshardingDoc) = 0; + using GetStateFunc = std::function<StateEnum(const StateDocument&)>; + + StateTransitionControllerOpObserver( + std::shared_ptr<StateTransitionController<StateEnum>> controller, + NamespaceString stateDocumentNss, + GetStateFunc getStateFunc) + : _controller{std::move(controller)}, + _stateDocumentNss{std::move(stateDocumentNss)}, + _getState{std::move(getStateFunc)} {} + + void onInserts(OperationContext* opCtx, + const CollectionPtr& coll, + std::vector<InsertStatement>::const_iterator begin, + std::vector<InsertStatement>::const_iterator end, + bool fromMigrate) override { + if (coll->ns() != _stateDocumentNss) { + return; + } + + auto doc = StateDocument::parse( + IDLParserContext{"StateTransitionControllerOpObserver::onInserts"}, begin->doc); + _controller->_notifyNewStateAndWaitUntilUnpaused(opCtx, _getState(doc)); + invariant(++begin == end); // No support for inserting more than one state document yet. + } + + void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override { + if (args.nss != _stateDocumentNss) { + return; + } + + auto doc = + StateDocument::parse(IDLParserContext{"StateTransitionControllerOpObserver::onUpdate"}, + args.updateArgs->updatedDoc); + _controller->_notifyNewStateAndWaitUntilUnpaused(opCtx, _getState(doc)); + } + + void onDelete(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& uuid, + StmtId stmtId, + const OplogDeleteEntryArgs& args) override { + if (nss != _stateDocumentNss) { + return; + } + + _controller->_notifyNewStateAndWaitUntilUnpaused(opCtx, StateEnum::kDone); + } private: std::shared_ptr<StateTransitionController<StateEnum>> _controller; - const NamespaceString _reshardingDocumentNss; + const NamespaceString _stateDocumentNss; + GetStateFunc _getState; }; } // namespace resharding_service_test_helpers } // namespace mongo + +#undef MONGO_LOGV2_DEFAULT_COMPONENT |