diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2016-07-08 17:14:44 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2016-07-11 10:44:20 -0400 |
commit | 00d3b20bf51315a1bda183fcad7b4737da789a48 (patch) | |
tree | 65f45721f571bff62c1a3a68cc622b3b35fd0a9f /src | |
parent | bcafb88541f91de118c5c5981c7040422c0694ea (diff) | |
download | mongo-00d3b20bf51315a1bda183fcad7b4737da789a48.tar.gz |
SERVER-23750: wait outside of the DataReplicator mutex
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/client/fetcher.cpp | 12 | ||||
-rw-r--r-- | src/mongo/client/fetcher.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 64 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 1 |
5 files changed, 66 insertions, 16 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index b042a6dc0e9..4f6e831a501 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -255,6 +255,7 @@ void Fetcher::cancel() { } handle = _getMoreCallbackHandle; + _inShutdown = true; } _executor->cancel(handle); @@ -288,6 +289,17 @@ void Fetcher::_callback(const RemoteCommandCallbackArgs& rcbd, const char* batch return; } + bool inShutdown = false; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + inShutdown = _inShutdown; + } + if (inShutdown) { + _work(Status(ErrorCodes::ShutdownInProgress, "fetcher shutting down"), nullptr, nullptr); + _finishCallback(); + return; + } + const BSONObj& queryResponseObj = rcbd.response.getValue().data; Status status = getStatusFromCommandResult(queryResponseObj); if (!status.isOK()) { diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h index 51130535fca..b3330eb1467 100644 --- a/src/mongo/client/fetcher.h +++ b/src/mongo/client/fetcher.h @@ -228,6 +228,9 @@ private: // Using boolean instead of a counter to avoid issues with wrap around. bool _first = true; + // _inShutdown is true after cancel() is called. + bool _inShutdown = false; + // Callback handle to the scheduled getMore command. executor::TaskExecutor::CallbackHandle _getMoreCallbackHandle; diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index fe873860615..5fe04c81f66 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -260,8 +260,9 @@ DataReplicator::DataReplicator( DataReplicator::~DataReplicator() { DESTRUCTOR_GUARD({ + UniqueLock lk(_mutex); _cancelAllHandles_inlock(); - _waitOnAll_inlock(); + _waitOnAndResetAll(lk); }); } @@ -696,9 +697,14 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn) // Reset state. if (_oplogFetcher) { _oplogFetcher->shutdown(); - // TODO: cleanup fetcher, and make work with networkMock/tests. - // _fetcher->join(); - // _fetcher.reset(); + std::unique_ptr<OplogFetcher> oplogFetcher; + _oplogFetcher.swap(oplogFetcher); + lk.unlock(); + + oplogFetcher->join(); + invariant(!oplogFetcher->isActive()); + + lk.lock(); // TODO: clear buffer // _clearFetcherBuffer(); } @@ -722,7 +728,7 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn) // Success, cleanup _cancelAllHandles_inlock(); - _waitOnAll_inlock(); + _waitOnAndResetAll(lk); invariant(!_anyActiveHandles_inlock()); _reporterPaused = false; @@ -804,7 +810,8 @@ void DataReplicator::_onApplierReadyStart(const QueryResponseStatus& fetchResult bool DataReplicator::_anyActiveHandles_inlock() const { return _applierActive || (_oplogFetcher && _oplogFetcher->isActive()) || - (_initialSyncState && _initialSyncState->dbsCloner->isActive()) || + (_initialSyncState && _initialSyncState->dbsCloner && + _initialSyncState->dbsCloner->isActive()) || (_reporter && _reporter->isActive()); } @@ -819,15 +826,42 @@ void DataReplicator::_cancelAllHandles_inlock() { _initialSyncState->dbsCloner->shutdown(); } -void DataReplicator::_waitOnAll_inlock() { - if (_oplogFetcher) - _oplogFetcher->join(); - if (_applier) - _applier->wait(); - if (_reporter) - _reporter->join(); - if (_initialSyncState) - _initialSyncState->dbsCloner->join(); +void DataReplicator::_waitOnAndResetAll(UniqueLock& lk) { + if (_lastOplogEntryFetcher) { + std::unique_ptr<Fetcher> oldFetcher; + oldFetcher.swap(_lastOplogEntryFetcher); + lk.unlock(); + oldFetcher->wait(); + lk.lock(); + } + if (_oplogFetcher) { + std::unique_ptr<OplogFetcher> oldFetcher; + oldFetcher.swap(_oplogFetcher); + lk.unlock(); + oldFetcher->join(); + lk.lock(); + } + if (_applier) { + std::unique_ptr<MultiApplier> oldApplier; + oldApplier.swap(_applier); + lk.unlock(); + oldApplier->wait(); + lk.lock(); + } + if (_reporter) { + std::unique_ptr<Reporter> oldReporter; + oldReporter.swap(_reporter); + lk.unlock(); + oldReporter->join(); + lk.lock(); + } + if (_initialSyncState) { + std::unique_ptr<DatabasesCloner> cloner; + cloner.swap(_initialSyncState->dbsCloner); + lk.unlock(); + cloner->join(); + lk.lock(); + } } void DataReplicator::_doNextActions() { diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index dc472193161..17ea41b2a27 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -306,7 +306,7 @@ private: Status _scheduleReport(); void _cancelAllHandles_inlock(); - void _waitOnAll_inlock(); + void _waitOnAndResetAll(UniqueLock& lk); bool _anyActiveHandles_inlock() const; Status _shutdown(OperationContext* txn); diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 29abe7504b5..b8379d02279 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -534,6 +534,7 @@ protected: NetworkGuard guard(net); if (!net->hasReadyRequests() && processedRequests < expectedResponses) { + net->runReadyNetworkOperations(); guard.dismiss(); continue; } |