diff options
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_executor.cpp | 87 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_executor.h | 34 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_executor_test.cpp | 91 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_mock.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_mock.h | 3 |
9 files changed, 129 insertions, 112 deletions
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index d5d79a15824..e76e5381b47 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -136,7 +136,6 @@ namespace repl { stdx::bind(&TopologyCoordinator::prepareHeartbeatResponse, _topCoord.get(), stdx::placeholders::_1, - stdx::placeholders::_2, Date_t(curTimeMillis64()), cmdObj, resultObj, diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp index b48c1c0c63f..ae63827cb91 100644 --- a/src/mongo/db/repl/replication_executor.cpp +++ b/src/mongo/db/repl/replication_executor.cpp @@ -51,16 +51,17 @@ namespace { ReplicationExecutor::~ReplicationExecutor() {} void ReplicationExecutor::run() { - WorkItem work; - while ((work = getWork()).callback) { + std::pair<WorkItem, CallbackHandle> work; + while ((work = getWork()).first.callback) { { boost::lock_guard<boost::mutex> lk(_terribleExLockSyncMutex); - const Status inStatus = work.isCanceled ? + const Status inStatus = work.first.isCanceled ? Status(ErrorCodes::CallbackCanceled, "Callback canceled") : Status::OK(); - makeNoExcept(stdx::bind(work.callback, this, inStatus))(); + makeNoExcept(stdx::bind(work.first.callback, + CallbackData(this, work.second, inStatus)))(); } - signalEvent(work.finishedEvent); + signalEvent(work.first.finishedEvent); } finishShutdown(); } @@ -184,28 +185,35 @@ namespace { } static void remoteCommandFinished( - ReplicationExecutor* executor, + const ReplicationExecutor::CallbackData& cbData, const ReplicationExecutor::RemoteCommandCallbackFn& cb, const ReplicationExecutor::RemoteCommandRequest& request, - const StatusWith<BSONObj>& response, - const Status& status) { + const StatusWith<BSONObj>& response) { - if (status.isOK()) { - cb(executor, request, response); + if (cbData.status.isOK()) { + cb(ReplicationExecutor::RemoteCommandCallbackData( + cbData.executor, cbData.myHandle, request, response)); } else { - cb(executor, request, StatusWith<BSONObj>(status)); + cb(ReplicationExecutor::RemoteCommandCallbackData( + cbData.executor, + cbData.myHandle, + request, + StatusWith<BSONObj>(cbData.status))); } } static void remoteCommandFailedEarly( - ReplicationExecutor* executor, + const ReplicationExecutor::CallbackData& cbData, const ReplicationExecutor::RemoteCommandCallbackFn& cb, - const ReplicationExecutor::RemoteCommandRequest& request, - const Status& status) { - - invariant(!status.isOK()); - cb(executor, request, StatusWith<BSONObj>(status)); + const ReplicationExecutor::RemoteCommandRequest& request) { + + invariant(!cbData.status.isOK()); + cb(ReplicationExecutor::RemoteCommandCallbackData( + cbData.executor, + cbData.myHandle, + request, + StatusWith<BSONObj>(cbData.status))); } void ReplicationExecutor::doRemoteCommand( @@ -245,8 +253,7 @@ namespace { stdx::placeholders::_1, cb, request, - cmdResult, - stdx::placeholders::_2); + cmdResult); _readyQueue.splice(_readyQueue.end(), _networkInProgressQueue, iter); _workAvailable.notify_all(); } @@ -261,8 +268,7 @@ namespace { stdx::bind(remoteCommandFailedEarly, stdx::placeholders::_1, cb, - request, - stdx::placeholders::_2)); + request)); if (handle.isOK()) { _networkWorkers.schedule(makeNoExcept(stdx::bind(&ReplicationExecutor::doRemoteCommand, this, @@ -310,11 +316,11 @@ namespace { lk.unlock(); { boost::lock_guard<boost::mutex> terribleLock(_terribleExLockSyncMutex); - work.callback( - this, - (work.isCanceled ? - Status(ErrorCodes::CallbackCanceled, "Callback canceled") : - Status::OK())); + work.callback(CallbackData(this, + cbHandle, + (work.isCanceled ? + Status(ErrorCodes::CallbackCanceled, "Callback canceled") : + Status::OK()))); } lk.lock(); signalEvent_inlock(work.finishedEvent); @@ -352,7 +358,8 @@ namespace { waitForEvent(cbHandle._finishedEvent); } - ReplicationExecutor::WorkItem ReplicationExecutor::getWork() { + std::pair<ReplicationExecutor::WorkItem, ReplicationExecutor::CallbackHandle> + ReplicationExecutor::getWork() { boost::unique_lock<boost::mutex> lk(_mutex); while (true) { Milliseconds waitFor = scheduleReadySleepers_inlock(); @@ -360,14 +367,15 @@ namespace { break; } else if (_inShutdown) { - return WorkItem(); + return std::make_pair(WorkItem(), CallbackHandle()); } _workAvailable.timed_wait(lk, waitFor); } - const WorkItem result = _readyQueue.front(); + const CallbackHandle cbHandle(_readyQueue.begin()); + const WorkItem work = *cbHandle._iter; _readyQueue.begin()->callback = CallbackFn(); _freeQueue.splice(_freeQueue.begin(), _readyQueue, _readyQueue.begin()); - return result; + return std::make_pair(work, cbHandle); } ReplicationExecutor::Milliseconds ReplicationExecutor::scheduleReadySleepers_inlock() { @@ -416,6 +424,14 @@ namespace { _finishedEvent(iter->finishedEvent) { } + ReplicationExecutor::CallbackData::CallbackData(ReplicationExecutor* theExecutor, + const CallbackHandle& theHandle, + const Status& theStatus) : + executor(theExecutor), + myHandle(theHandle), + status(theStatus) { + } + ReplicationExecutor::RemoteCommandRequest::RemoteCommandRequest() {} ReplicationExecutor::RemoteCommandRequest::RemoteCommandRequest( const HostAndPort& theTarget, @@ -426,6 +442,17 @@ namespace { cmdObj(theCmdObj) { } + ReplicationExecutor::RemoteCommandCallbackData::RemoteCommandCallbackData( + ReplicationExecutor* theExecutor, + const CallbackHandle& theHandle, + const RemoteCommandRequest& theRequest, + const StatusWith<BSONObj>& theResponse) : + executor(theExecutor), + myHandle(theHandle), + request(theRequest), + response(theResponse) { + } + ReplicationExecutor::WorkItem::WorkItem() : generation(0U), isCanceled(false) {} ReplicationExecutor::Event::Event() : diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h index a5fcf7c67a6..4860a484978 100644 --- a/src/mongo/db/repl/replication_executor.h +++ b/src/mongo/db/repl/replication_executor.h @@ -104,9 +104,11 @@ namespace repl { MONGO_DISALLOW_COPYING(ReplicationExecutor); public: typedef boost::posix_time::milliseconds Milliseconds; + struct CallbackData; class CallbackHandle; class EventHandle; class NetworkInterface; + struct RemoteCommandCallbackData; struct RemoteCommandRequest; /** @@ -116,8 +118,7 @@ namespace repl { * the callback was canceled for any reason (including shutdown). Otherwise, it should have * Status::OK(). */ - typedef stdx::function< - void (ReplicationExecutor*, const Status&)> CallbackFn; + typedef stdx::function<void (const CallbackData&)> CallbackFn; /** * Type of a callback from a request to run a command on a remote MongoDB node. @@ -128,10 +129,7 @@ namespace repl { * the BSONObj returned by the command, with the "ok" field indicating the success of the * command in the usual way. */ - typedef stdx::function< - void (ReplicationExecutor*, - const RemoteCommandRequest&, - const StatusWith<BSONObj>&)> RemoteCommandCallbackFn; + typedef stdx::function<void (const RemoteCommandCallbackData&)> RemoteCommandCallbackFn; /** * Constructs a new executor. @@ -294,7 +292,7 @@ namespace repl { * If the "callback" member of the returned WorkItem is falsey, that is a signal * to the run loop to wait for shutdown. */ - WorkItem getWork(); + std::pair<WorkItem, CallbackHandle> getWork(); /** * Marks as runnable any sleepers whose ready date has passed and returns 0ms; or if there @@ -395,6 +393,16 @@ namespace repl { EventHandle _finishedEvent; }; + struct ReplicationExecutor::CallbackData { + CallbackData(ReplicationExecutor* theExecutor, + const CallbackHandle& theHandle, + const Status& theStatus); + + ReplicationExecutor* executor; + CallbackHandle myHandle; + Status status; + }; + /** * Type of object describing a command to execute against a remote MongoDB node. */ @@ -409,6 +417,18 @@ namespace repl { BSONObj cmdObj; }; + struct ReplicationExecutor::RemoteCommandCallbackData { + RemoteCommandCallbackData(ReplicationExecutor* theExecutor, + const CallbackHandle& theHandle, + const RemoteCommandRequest& theRequest, + const StatusWith<BSONObj>& theResponse); + + ReplicationExecutor* executor; + CallbackHandle myHandle; + RemoteCommandRequest request; + StatusWith<BSONObj> response; + }; + /** * Interface to networking and lock manager. */ diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp index 1653d83a958..dbc4f15569d 100644 --- a/src/mongo/db/repl/replication_executor_test.cpp +++ b/src/mongo/db/repl/replication_executor_test.cpp @@ -61,40 +61,36 @@ namespace { return !(lhs == rhs); } - void setStatus(ReplicationExecutor* executor, const Status& source, Status* target) { - *target = source; + void setStatus(const ReplicationExecutor::CallbackData& cbData, Status* target) { + *target = cbData.status; } - void setStatusAndShutdown(ReplicationExecutor* executor, - const Status& source, + void setStatusAndShutdown(const ReplicationExecutor::CallbackData& cbData, Status* target) { - setStatus(executor, source, target); - if (source != ErrorCodes::CallbackCanceled) - executor->shutdown(); + setStatus(cbData, target); + if (cbData.status != ErrorCodes::CallbackCanceled) + cbData.executor->shutdown(); } - void setStatusAndTriggerEvent(ReplicationExecutor* executor, - const Status& inStatus, + void setStatusAndTriggerEvent(const ReplicationExecutor::CallbackData& cbData, Status* outStatus, ReplicationExecutor::EventHandle event) { - *outStatus = inStatus; - if (!inStatus.isOK()) + *outStatus = cbData.status; + if (!cbData.status.isOK()) return; - executor->signalEvent(event); + cbData.executor->signalEvent(event); } - void scheduleSetStatusAndShutdown(ReplicationExecutor* executor, - const Status& inStatus, + void scheduleSetStatusAndShutdown(const ReplicationExecutor::CallbackData& cbData, Status* outStatus1, Status* outStatus2) { - if (!inStatus.isOK()) { - *outStatus1 = inStatus; + if (!cbData.status.isOK()) { + *outStatus1 = cbData.status; return; } - *outStatus1= executor->scheduleWork(stdx::bind(setStatusAndShutdown, - stdx::placeholders::_1, - stdx::placeholders::_2, - outStatus2)).getStatus(); + *outStatus1= cbData.executor->scheduleWork(stdx::bind(setStatusAndShutdown, + stdx::placeholders::_1, + outStatus2)).getStatus(); } TEST(ReplicationExecutor, RunOne) { @@ -102,7 +98,6 @@ namespace { Status status(ErrorCodes::InternalError, "Not mutated"); ASSERT_OK(executor.scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, - stdx::placeholders::_2, &status)).getStatus()); executor.run(); ASSERT_OK(status); @@ -113,7 +108,6 @@ namespace { Status status(ErrorCodes::InternalError, "Not mutated"); ASSERT_OK(executor.scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, - stdx::placeholders::_2, &status)).getStatus()); executor.shutdown(); executor.run(); @@ -127,12 +121,10 @@ namespace { ReplicationExecutor::CallbackHandle cb = ASSERT_GET( executor.scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, - stdx::placeholders::_2, &status1))); executor.cancel(cb); ASSERT_OK(executor.scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, - stdx::placeholders::_2, &status2)).getStatus()); executor.run(); ASSERT_EQUALS(status1, ErrorCodes::CallbackCanceled); @@ -145,7 +137,6 @@ namespace { Status status2(ErrorCodes::InternalError, "Not mutated"); ASSERT_OK(executor.scheduleWork(stdx::bind(scheduleSetStatusAndShutdown, stdx::placeholders::_1, - stdx::placeholders::_2, &status1, &status2)).getStatus()); executor.run(); @@ -159,9 +150,8 @@ namespace { EventChainAndWaitingTest(); void run(); private: - void onGo(ReplicationExecutor* executor, const Status& status); - void onGoAfterTriggered(ReplicationExecutor* executor, - const Status& status); + void onGo(const ReplicationExecutor::CallbackData& cbData); + void onGoAfterTriggered(const ReplicationExecutor::CallbackData& cbData); ReplicationExecutor executor; boost::thread executorThread; @@ -196,12 +186,10 @@ namespace { triggered2 = stdx::bind(setStatusAndTriggerEvent, stdx::placeholders::_1, - stdx::placeholders::_2, &status2, event2); triggered3 = stdx::bind(setStatusAndTriggerEvent, stdx::placeholders::_1, - stdx::placeholders::_2, &status3, event3); } @@ -210,8 +198,7 @@ namespace { executor.onEvent(goEvent, stdx::bind(&EventChainAndWaitingTest::onGo, this, - stdx::placeholders::_1, - stdx::placeholders::_2)); + stdx::placeholders::_1)); executor.signalEvent(goEvent); executor.waitForEvent(goEvent); executor.waitForEvent(event2); @@ -224,7 +211,6 @@ namespace { ReplicationExecutor::CallbackHandle shutdownCallback = ASSERT_GET( executor.scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, - stdx::placeholders::_2, &status5))); executor.wait(shutdownCallback); neverSignaledWaiter.join(); @@ -236,12 +222,12 @@ namespace { ASSERT_OK(status5); } - void EventChainAndWaitingTest::onGo(ReplicationExecutor* executor, - const Status& status) { - if (!status.isOK()) { - status1 = status; + void EventChainAndWaitingTest::onGo(const ReplicationExecutor::CallbackData& cbData) { + if (!cbData.status.isOK()) { + status1 = cbData.status; return; } + ReplicationExecutor* executor = cbData.executor; StatusWith<ReplicationExecutor::EventHandle> errorOrTriggerEvent = executor->makeEvent(); if (!errorOrTriggerEvent.isOK()) { status1 = errorOrTriggerEvent.getStatus(); @@ -267,8 +253,7 @@ namespace { goEvent, stdx::bind(&EventChainAndWaitingTest::onGoAfterTriggered, this, - stdx::placeholders::_1, - stdx::placeholders::_2)); + stdx::placeholders::_1)); if (!cbHandle.isOK()) { status1 = cbHandle.getStatus(); executor->shutdown(); @@ -277,13 +262,13 @@ namespace { status1 = Status::OK(); } - void EventChainAndWaitingTest::onGoAfterTriggered(ReplicationExecutor* executor, - const Status& status) { - status4 = status; - if (!status.isOK()) { + void EventChainAndWaitingTest::onGoAfterTriggered( + const ReplicationExecutor::CallbackData& cbData) { + status4 = cbData.status; + if (!cbData.status.isOK()) { return; } - executor->signalEvent(triggerEvent); + cbData.executor->signalEvent(triggerEvent); } TEST(ReplicationExecutor, ScheduleWorkAt) { @@ -296,17 +281,14 @@ namespace { ASSERT_GET(executor.scheduleWorkAt(Date_t(now.millis + 100), stdx::bind(setStatus, stdx::placeholders::_1, - stdx::placeholders::_2, &status1))); ASSERT_GET(executor.scheduleWorkAt(Date_t(now.millis + 5000), stdx::bind(setStatus, stdx::placeholders::_1, - stdx::placeholders::_2, &status3))); ASSERT_GET(executor.scheduleWorkAt(Date_t(now.millis + 200), stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, - stdx::placeholders::_2, &status2))); executor.run(); ASSERT_OK(status1); @@ -320,21 +302,19 @@ namespace { } static void setStatusOnRemoteCommandCompletion( - ReplicationExecutor* executor, - const ReplicationExecutor::RemoteCommandRequest& actualRequest, - const StatusWith<BSONObj>& actualResponse, + const ReplicationExecutor::RemoteCommandCallbackData& cbData, const ReplicationExecutor::RemoteCommandRequest& expectedRequest, Status* outStatus) { - if (actualRequest != expectedRequest) { + if (cbData.request != expectedRequest) { *outStatus = Status( ErrorCodes::BadValue, mongoutils::str::stream() << "Actual request: " << - getRequestDescription(actualRequest) << "; expected: " << + getRequestDescription(cbData.request) << "; expected: " << getRequestDescription(expectedRequest)); return; } - *outStatus = actualResponse.getStatus(); + *outStatus = cbData.response.getStatus(); } TEST(ReplicationExecutor, ScheduleRemoteCommand) { @@ -350,8 +330,6 @@ namespace { request, stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3, request, &status1))); boost::thread executorThread(stdx::bind(&ReplicationExecutor::run, &executor)); @@ -374,8 +352,6 @@ namespace { request, stdx::bind(setStatusOnRemoteCommandCompletion, stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3, request, &status1))); executor.cancel(cbHandle); @@ -392,7 +368,6 @@ namespace { ASSERT_OK(executor.scheduleWorkWithGlobalExclusiveLock( stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, - stdx::placeholders::_2, &status1)).getStatus()); executor.run(); ASSERT_OK(status1); diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index d37a79fc2cc..765d0a833d8 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -31,6 +31,7 @@ #include <string> #include "mongo/base/disallow_copying.h" +#include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/functional.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -42,7 +43,6 @@ namespace mongo { namespace repl { - class ReplicationExecutor; class HeartbeatInfo; class Member; struct MemberState; @@ -144,8 +144,7 @@ namespace repl { virtual void prepareElectCmdResponse(const BSONObj& cmdObj, BSONObjBuilder& result) = 0; // produce a reply to a heartbeat - virtual void prepareHeartbeatResponse(ReplicationExecutor* executor, - const Status& inStatus, + virtual void prepareHeartbeatResponse(const ReplicationExecutor::CallbackData& data, Date_t now, const BSONObj& cmdObj, BSONObjBuilder* resultObj, diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 2425205ee21..3e22a6cf3ed 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -248,13 +248,13 @@ namespace repl { } // produce a reply to a heartbeat - void TopologyCoordinatorImpl::prepareHeartbeatResponse(ReplicationExecutor* executor, - const Status& inStatus, - Date_t now, - const BSONObj& cmdObj, - BSONObjBuilder* resultObj, - Status* result) { - if (inStatus == ErrorCodes::CallbackCanceled) { + void TopologyCoordinatorImpl::prepareHeartbeatResponse( + const ReplicationExecutor::CallbackData& data, + Date_t now, + const BSONObj& cmdObj, + BSONObjBuilder* resultObj, + Status* result) { + if (data.status == ErrorCodes::CallbackCanceled) { *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); return; } diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 2eaaf7e6965..2a844f8cc14 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -80,8 +80,7 @@ namespace repl { virtual void prepareElectCmdResponse(const BSONObj& cmdObj, BSONObjBuilder& result); // produce a reply to a heartbeat - virtual void prepareHeartbeatResponse(ReplicationExecutor* executor, - const Status& inStatus, + virtual void prepareHeartbeatResponse(const ReplicationExecutor::CallbackData& data, Date_t now, const BSONObj& cmdObj, BSONObjBuilder* resultObj, diff --git a/src/mongo/db/repl/topology_coordinator_mock.cpp b/src/mongo/db/repl/topology_coordinator_mock.cpp index 467744c4f38..b8a0713a59a 100644 --- a/src/mongo/db/repl/topology_coordinator_mock.cpp +++ b/src/mongo/db/repl/topology_coordinator_mock.cpp @@ -64,8 +64,7 @@ namespace repl { void TopologyCoordinatorMock::prepareElectCmdResponse(const BSONObj& cmdObj, BSONObjBuilder& result) {} - void TopologyCoordinatorMock::prepareHeartbeatResponse(ReplicationExecutor* executor, - const Status& inStatus, + void TopologyCoordinatorMock::prepareHeartbeatResponse(const ReplicationExecutor::CallbackData&, Date_t now, const BSONObj& cmdObj, BSONObjBuilder* resultObj, diff --git a/src/mongo/db/repl/topology_coordinator_mock.h b/src/mongo/db/repl/topology_coordinator_mock.h index 7b7a8fddf84..5f52547e4a6 100644 --- a/src/mongo/db/repl/topology_coordinator_mock.h +++ b/src/mongo/db/repl/topology_coordinator_mock.h @@ -63,8 +63,7 @@ namespace repl { virtual void prepareElectCmdResponse(const BSONObj& cmdObj, BSONObjBuilder& result); - virtual void prepareHeartbeatResponse(ReplicationExecutor* executor, - const Status& inStatus, + virtual void prepareHeartbeatResponse(const ReplicationExecutor::CallbackData& data, Date_t now, const BSONObj& cmdObj, BSONObjBuilder* resultObj, |