diff options
author | Benety Goh <benety@mongodb.com> | 2015-05-21 13:53:55 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2015-05-21 19:43:45 -0400 |
commit | fb26d2b0397e5b3c4bf2e90ba6838a0f5784f35c (patch) | |
tree | 240c30e35f82709c606c58a656d1c18e23313d79 | |
parent | 0ea58cb3bc5ad4d0dbf06c0e8b12e2a1701fb56d (diff) | |
download | mongo-fb26d2b0397e5b3c4bf2e90ba6838a0f5784f35c.tar.gz |
SERVER-18035 fixed fetcher wait() to block until fetcher is inactive
-rw-r--r-- | src/mongo/db/repl/fetcher.cpp | 69 | ||||
-rw-r--r-- | src/mongo/db/repl/fetcher.h | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/fetcher_test.cpp | 20 |
3 files changed, 63 insertions, 40 deletions
diff --git a/src/mongo/db/repl/fetcher.cpp b/src/mongo/db/repl/fetcher.cpp index 6ae2419d50d..d89a761cb41 100644 --- a/src/mongo/db/repl/fetcher.cpp +++ b/src/mongo/db/repl/fetcher.cpp @@ -30,8 +30,6 @@ #include "mongo/db/repl/fetcher.h" -#include <boost/thread/lock_guard.hpp> - #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/replication_executor.h" @@ -161,10 +159,15 @@ namespace { uassert(ErrorCodes::BadValue, "callback function cannot be null", work); } - Fetcher::~Fetcher() { } + Fetcher::~Fetcher() { + DESTRUCTOR_GUARD( + cancel(); + wait(); + ); + } std::string Fetcher::getDiagnosticString() const { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); str::stream output; output << "Fetcher"; output << " executor: " << _executor->getDiagnosticString(); @@ -176,19 +179,22 @@ namespace { } bool Fetcher::isActive() const { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); return _active; } Status Fetcher::schedule() { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_active) { + return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled"); + } return _schedule_inlock(_cmdObj, kFirstBatchFieldName); } void Fetcher::cancel() { ReplicationExecutor::CallbackHandle remoteCommandCallbackHandle; { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (!_active) { return; @@ -202,26 +208,11 @@ namespace { } void Fetcher::wait() { - ReplicationExecutor::CallbackHandle remoteCommandCallbackHandle; - { - boost::lock_guard<boost::mutex> lk(_mutex); - - if (!_active) { - return; - } - - remoteCommandCallbackHandle = _remoteCommandCallbackHandle; - } - - invariant(remoteCommandCallbackHandle.isValid()); - _executor->wait(remoteCommandCallbackHandle); + stdx::unique_lock<stdx::mutex> lk(_mutex); + _condition.wait(lk, [this]() { return !_active; }); } Status Fetcher::_schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName) { - if (_active) { - return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled"); - } - StatusWith<ReplicationExecutor::CallbackHandle> scheduleResult = _executor->scheduleRemoteCommand( RemoteCommandRequest(_source, _dbname, cmdObj), @@ -238,13 +229,11 @@ namespace { void Fetcher::_callback(const ReplicationExecutor::RemoteCommandCallbackData& rcbd, const char* batchFieldName) { - boost::lock_guard<boost::mutex> lk(_mutex); - _active = false; - NextAction nextAction = NextAction::kNoAction; if (!rcbd.response.isOK()) { _work(StatusWith<Fetcher::BatchData>(rcbd.response.getStatus()), &nextAction); + _finishCallback(); return; } @@ -252,6 +241,7 @@ namespace { Status status = getStatusFromCommandResult(cursorReponseObj); if (!status.isOK()) { _work(StatusWith<Fetcher::BatchData>(status), &nextAction); + _finishCallback(); return; } @@ -261,6 +251,7 @@ namespace { status = parseCursorResponse(cursorReponseObj, batchFieldName, &batchData, &nss); if (!status.isOK()) { _work(StatusWith<Fetcher::BatchData>(status), &nextAction); + _finishCallback(); return; } @@ -272,21 +263,29 @@ namespace { // Callback function _work may modify nextAction to request the fetcher // not to schedule a getMore command. - if (nextAction != NextAction::kContinue) { + if (!batchData.cursorId || nextAction != NextAction::kContinue) { + _finishCallback(); return; } - nextAction = NextAction::kNoAction; - - if (batchData.cursorId) { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); BSONObj getMoreCmdObj = BSON("getMore" << batchData.cursorId << "collection" << nss.coll()); status = _schedule_inlock(getMoreCmdObj, kNextBatchFieldName); - if (!status.isOK()) { - _work(StatusWith<Fetcher::BatchData>(status), &nextAction); - return; - } } + if (!status.isOK()) { + nextAction = NextAction::kNoAction; + _work(StatusWith<Fetcher::BatchData>(status), &nextAction); + _finishCallback(); + return; + } + } + + void Fetcher::_finishCallback() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _active = false; + _condition.notify_all(); } } // namespace repl diff --git a/src/mongo/db/repl/fetcher.h b/src/mongo/db/repl/fetcher.h index 8057004681f..33eba1b25e4 100644 --- a/src/mongo/db/repl/fetcher.h +++ b/src/mongo/db/repl/fetcher.h @@ -28,7 +28,6 @@ #pragma once -#include <boost/thread/mutex.hpp> #include <string> #include <vector> @@ -39,6 +38,8 @@ #include "mongo/db/clientcursor.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" namespace mongo { @@ -138,7 +139,7 @@ namespace repl { void cancel(); /** - * Waits for active remote command request to complete. + * Waits for remote command requests to complete. * Returns immediately if fetcher is not active. */ void wait(); @@ -156,6 +157,11 @@ namespace repl { void _callback(const ReplicationExecutor::RemoteCommandCallbackData& rcbd, const char* batchFieldName); + /** + * Sets fetcher state to inactive and notifies waiters. + */ + void _finishCallback(); + // Not owned by us. ReplicationExecutor* _executor; @@ -165,7 +171,9 @@ namespace repl { CallbackFn _work; // Protects member data of this Fetcher. - mutable boost::mutex _mutex; + mutable stdx::mutex _mutex; + + mutable stdx::condition_variable _condition; // _active is true when Fetcher is scheduled to be run by the executor. bool _active; diff --git a/src/mongo/db/repl/fetcher_test.cpp b/src/mongo/db/repl/fetcher_test.cpp index ed60337afab..4cccf217311 100644 --- a/src/mongo/db/repl/fetcher_test.cpp +++ b/src/mongo/db/repl/fetcher_test.cpp @@ -138,7 +138,6 @@ namespace { ASSERT_TRUE(fetcher->isActive()); getNet()->runReadyNetworkOperations(); ASSERT_FALSE(getNet()->hasReadyRequests()); - fetcher->wait(); ASSERT_FALSE(fetcher->isActive()); } @@ -405,6 +404,24 @@ namespace { ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction); } + TEST_F(FetcherTest, SetNextActionToContinueWhenNextBatchIsNotAvailable) { + ASSERT_OK(fetcher->schedule()); + const BSONObj doc = BSON("_id" << 1); + callbackHook = [](const StatusWith<Fetcher::BatchData>& fetchResult, + Fetcher::NextAction* nextAction) { + *nextAction = Fetcher::NextAction::kContinue; + }; + processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << + "ns" << "db.coll" << + "firstBatch" << BSON_ARRAY(doc)) << + "ok" << 1)); + ASSERT_OK(status); + ASSERT_EQUALS(0, cursorId); + ASSERT_EQUALS(1U, documents.size()); + ASSERT_EQUALS(doc, documents.front()); + ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction); + } + TEST_F(FetcherTest, FetchMultipleBatches) { ASSERT_OK(fetcher->schedule()); const BSONObj doc = BSON("_id" << 1); @@ -446,7 +463,6 @@ namespace { ASSERT_FALSE(fetcher->isActive()); ASSERT_FALSE(getNet()->hasReadyRequests()); - ASSERT_FALSE(fetcher->isActive()); } TEST_F(FetcherTest, ScheduleGetMoreAndCancel) { |