summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_executor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_executor.cpp')
-rw-r--r--src/mongo/db/repl/replication_executor.cpp1016
1 files changed, 484 insertions, 532 deletions
diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp
index a944e76751f..f6f04e51a8d 100644
--- a/src/mongo/db/repl/replication_executor.cpp
+++ b/src/mongo/db/repl/replication_executor.cpp
@@ -46,563 +46,515 @@ namespace mongo {
namespace repl {
namespace {
- stdx::function<void ()> makeNoExcept(const stdx::function<void ()> &fn);
+stdx::function<void()> makeNoExcept(const stdx::function<void()>& fn);
} // namespace
- using executor::NetworkInterface;
-
- ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface,
- StorageInterface* storageInterface,
- int64_t prngSeed) :
- _random(prngSeed),
- _networkInterface(netInterface),
- _storageInterface(storageInterface),
- _totalEventWaiters(0),
- _inShutdown(false),
- _dblockWorkers(OldThreadPool::DoNotStartThreadsTag(),
- 3,
- "replExecDBWorker-"),
- _dblockTaskRunner(
- &_dblockWorkers,
- stdx::bind(&StorageInterface::createOperationContext, storageInterface)),
- _dblockExclusiveLockTaskRunner(
- &_dblockWorkers,
- stdx::bind(&StorageInterface::createOperationContext, storageInterface)),
- _nextId(0) {
- }
-
- ReplicationExecutor::~ReplicationExecutor() {}
-
- std::string ReplicationExecutor::getDiagnosticString() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _getDiagnosticString_inlock();
- }
-
- std::string ReplicationExecutor::_getDiagnosticString_inlock() const {
- str::stream output;
- output << "ReplicationExecutor";
- output << " networkInProgress:" << _networkInProgressQueue.size();
- output << " dbWorkInProgress:" << _dbWorkInProgressQueue.size();
- output << " exclusiveInProgress:" << _exclusiveLockInProgressQueue.size();
- output << " sleeperQueue:" << _sleepersQueue.size();
- output << " ready:" << _readyQueue.size();
- output << " free:" << _freeQueue.size();
- output << " unsignaledEvents:" << _unsignaledEvents.size();
- output << " eventWaiters:" << _totalEventWaiters;
- output << " shuttingDown:" << _inShutdown;
- output << " networkInterface:" << _networkInterface->getDiagnosticString();
- return output;
- }
-
- Date_t ReplicationExecutor::now() {
- return _networkInterface->now();
- }
-
- void ReplicationExecutor::run() {
- setThreadName("ReplicationExecutor");
- _networkInterface->startup();
- _dblockWorkers.startThreads();
- std::pair<WorkItem, CallbackHandle> work;
- while ((work = getWork()).first.callback.isValid()) {
- {
- stdx::lock_guard<stdx::mutex> lk(_terribleExLockSyncMutex);
- const Callback* callback = _getCallbackFromHandle(work.first.callback);
- const Status inStatus = callback->_isCanceled ?
- Status(ErrorCodes::CallbackCanceled, "Callback canceled") :
- Status::OK();
- makeNoExcept(stdx::bind(callback->_callbackFn,
- CallbackArgs(this, work.second, inStatus)))();
- }
- signalEvent(work.first.finishedEvent);
- }
- finishShutdown();
- _networkInterface->shutdown();
- }
-
- void ReplicationExecutor::shutdown() {
- // Correct shutdown needs to:
- // * Disable future work queueing.
- // * drain all of the unsignaled events, sleepers, and ready queue, by running those
- // callbacks with a "shutdown" or "canceled" status.
- // * Signal all threads blocked in waitForEvent, and wait for them to return from that method.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inShutdown = true;
-
- _readyQueue.splice(_readyQueue.end(), _dbWorkInProgressQueue);
- _readyQueue.splice(_readyQueue.end(), _exclusiveLockInProgressQueue);
- _readyQueue.splice(_readyQueue.end(), _networkInProgressQueue);
- _readyQueue.splice(_readyQueue.end(), _sleepersQueue);
- for (auto event : _unsignaledEvents) {
- _readyQueue.splice(_readyQueue.end(), _getEventFromHandle(event)->_waiters);
- }
- for (auto readyWork : _readyQueue) {
- _getCallbackFromHandle(readyWork.callback)->_isCanceled = true;
- }
- _networkInterface->signalWorkAvailable();
- }
-
- void ReplicationExecutor::finishShutdown() {
- _dblockExclusiveLockTaskRunner.cancel();
- _dblockTaskRunner.cancel();
- _dblockWorkers.join();
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_inShutdown);
- invariant(_dbWorkInProgressQueue.empty());
- invariant(_exclusiveLockInProgressQueue.empty());
- invariant(_readyQueue.empty());
- invariant(_sleepersQueue.empty());
-
- while (!_unsignaledEvents.empty()) {
- EventList::iterator eventIter = _unsignaledEvents.begin();
- invariant(_getEventFromHandle(*eventIter)->_waiters.empty());
- signalEvent_inlock(*eventIter);
- }
-
- while (_totalEventWaiters > 0)
- _noMoreWaitingThreads.wait(lk);
-
- invariant(_dbWorkInProgressQueue.empty());
- invariant(_exclusiveLockInProgressQueue.empty());
- invariant(_readyQueue.empty());
- invariant(_sleepersQueue.empty());
- invariant(_unsignaledEvents.empty());
- }
-
- void ReplicationExecutor::maybeNotifyShutdownComplete_inlock() {
- if (_totalEventWaiters == 0)
- _noMoreWaitingThreads.notify_all();
- }
-
- StatusWith<ReplicationExecutor::EventHandle> ReplicationExecutor::makeEvent() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return makeEvent_inlock();
- }
-
- StatusWith<ReplicationExecutor::EventHandle> ReplicationExecutor::makeEvent_inlock() {
- if (_inShutdown)
- return StatusWith<EventHandle>(ErrorCodes::ShutdownInProgress, "Shutdown in progress");
-
- _unsignaledEvents.emplace_back();
- auto event = std::make_shared<Event>(this, --_unsignaledEvents.end());
- setEventForHandle(&_unsignaledEvents.back(), std::move(event));
- return _unsignaledEvents.back();
- }
-
- void ReplicationExecutor::signalEvent(const EventHandle& eventHandle) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- signalEvent_inlock(eventHandle);
- }
-
- void ReplicationExecutor::signalEvent_inlock(const EventHandle& eventHandle) {
- Event* event = _getEventFromHandle(eventHandle);
- event->_signal_inlock();
- _unsignaledEvents.erase(event->_iter);
- }
-
- void ReplicationExecutor::waitForEvent(const EventHandle& event) {
- _getEventFromHandle(event)->waitUntilSignaled();
- }
-
- void ReplicationExecutor::cancel(const CallbackHandle& cbHandle) {
- _getCallbackFromHandle(cbHandle)->cancel();
- };
-
- void ReplicationExecutor::wait(const CallbackHandle& cbHandle) {
- _getCallbackFromHandle(cbHandle)->waitForCompletion();
- };
-
- StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::onEvent(
- const EventHandle& eventHandle,
- const CallbackFn& work) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- WorkQueue* queue = &_readyQueue;
- Event* event = _getEventFromHandle(eventHandle);
- if (!event->_isSignaled) {
- queue = &event->_waiters;
- }
- else {
- queue = &_readyQueue;
- }
- return enqueueWork_inlock(queue, work);
- }
-
- static void remoteCommandFinished(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicationExecutor::RemoteCommandCallbackFn& cb,
- const RemoteCommandRequest& request,
- const ResponseStatus& response) {
-
- if (cbData.status.isOK()) {
- cb(ReplicationExecutor::RemoteCommandCallbackArgs(
- cbData.executor, cbData.myHandle, request, response));
- }
- else {
- cb(ReplicationExecutor::RemoteCommandCallbackArgs(
- cbData.executor,
- cbData.myHandle,
- request,
- ResponseStatus(cbData.status)));
+using executor::NetworkInterface;
+
+ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface,
+ StorageInterface* storageInterface,
+ int64_t prngSeed)
+ : _random(prngSeed),
+ _networkInterface(netInterface),
+ _storageInterface(storageInterface),
+ _totalEventWaiters(0),
+ _inShutdown(false),
+ _dblockWorkers(OldThreadPool::DoNotStartThreadsTag(), 3, "replExecDBWorker-"),
+ _dblockTaskRunner(&_dblockWorkers,
+ stdx::bind(&StorageInterface::createOperationContext, storageInterface)),
+ _dblockExclusiveLockTaskRunner(
+ &_dblockWorkers, stdx::bind(&StorageInterface::createOperationContext, storageInterface)),
+ _nextId(0) {}
+
+ReplicationExecutor::~ReplicationExecutor() {}
+
+std::string ReplicationExecutor::getDiagnosticString() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _getDiagnosticString_inlock();
+}
+
+std::string ReplicationExecutor::_getDiagnosticString_inlock() const {
+ str::stream output;
+ output << "ReplicationExecutor";
+ output << " networkInProgress:" << _networkInProgressQueue.size();
+ output << " dbWorkInProgress:" << _dbWorkInProgressQueue.size();
+ output << " exclusiveInProgress:" << _exclusiveLockInProgressQueue.size();
+ output << " sleeperQueue:" << _sleepersQueue.size();
+ output << " ready:" << _readyQueue.size();
+ output << " free:" << _freeQueue.size();
+ output << " unsignaledEvents:" << _unsignaledEvents.size();
+ output << " eventWaiters:" << _totalEventWaiters;
+ output << " shuttingDown:" << _inShutdown;
+ output << " networkInterface:" << _networkInterface->getDiagnosticString();
+ return output;
+}
+
+Date_t ReplicationExecutor::now() {
+ return _networkInterface->now();
+}
+
+void ReplicationExecutor::run() {
+ setThreadName("ReplicationExecutor");
+ _networkInterface->startup();
+ _dblockWorkers.startThreads();
+ std::pair<WorkItem, CallbackHandle> work;
+ while ((work = getWork()).first.callback.isValid()) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_terribleExLockSyncMutex);
+ const Callback* callback = _getCallbackFromHandle(work.first.callback);
+ const Status inStatus = callback->_isCanceled
+ ? Status(ErrorCodes::CallbackCanceled, "Callback canceled")
+ : Status::OK();
+ makeNoExcept(
+ stdx::bind(callback->_callbackFn, CallbackArgs(this, work.second, inStatus)))();
}
- }
-
- static void remoteCommandFailedEarly(
- const ReplicationExecutor::CallbackArgs& cbData,
- const ReplicationExecutor::RemoteCommandCallbackFn& cb,
- const RemoteCommandRequest& request) {
-
- invariant(!cbData.status.isOK());
+ signalEvent(work.first.finishedEvent);
+ }
+ finishShutdown();
+ _networkInterface->shutdown();
+}
+
+void ReplicationExecutor::shutdown() {
+ // Correct shutdown needs to:
+ // * Disable future work queueing.
+ // * drain all of the unsignaled events, sleepers, and ready queue, by running those
+ // callbacks with a "shutdown" or "canceled" status.
+ // * Signal all threads blocked in waitForEvent, and wait for them to return from that method.
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _inShutdown = true;
+
+ _readyQueue.splice(_readyQueue.end(), _dbWorkInProgressQueue);
+ _readyQueue.splice(_readyQueue.end(), _exclusiveLockInProgressQueue);
+ _readyQueue.splice(_readyQueue.end(), _networkInProgressQueue);
+ _readyQueue.splice(_readyQueue.end(), _sleepersQueue);
+ for (auto event : _unsignaledEvents) {
+ _readyQueue.splice(_readyQueue.end(), _getEventFromHandle(event)->_waiters);
+ }
+ for (auto readyWork : _readyQueue) {
+ _getCallbackFromHandle(readyWork.callback)->_isCanceled = true;
+ }
+ _networkInterface->signalWorkAvailable();
+}
+
+void ReplicationExecutor::finishShutdown() {
+ _dblockExclusiveLockTaskRunner.cancel();
+ _dblockTaskRunner.cancel();
+ _dblockWorkers.join();
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ invariant(_inShutdown);
+ invariant(_dbWorkInProgressQueue.empty());
+ invariant(_exclusiveLockInProgressQueue.empty());
+ invariant(_readyQueue.empty());
+ invariant(_sleepersQueue.empty());
+
+ while (!_unsignaledEvents.empty()) {
+ EventList::iterator eventIter = _unsignaledEvents.begin();
+ invariant(_getEventFromHandle(*eventIter)->_waiters.empty());
+ signalEvent_inlock(*eventIter);
+ }
+
+ while (_totalEventWaiters > 0)
+ _noMoreWaitingThreads.wait(lk);
+
+ invariant(_dbWorkInProgressQueue.empty());
+ invariant(_exclusiveLockInProgressQueue.empty());
+ invariant(_readyQueue.empty());
+ invariant(_sleepersQueue.empty());
+ invariant(_unsignaledEvents.empty());
+}
+
+void ReplicationExecutor::maybeNotifyShutdownComplete_inlock() {
+ if (_totalEventWaiters == 0)
+ _noMoreWaitingThreads.notify_all();
+}
+
+StatusWith<ReplicationExecutor::EventHandle> ReplicationExecutor::makeEvent() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return makeEvent_inlock();
+}
+
+StatusWith<ReplicationExecutor::EventHandle> ReplicationExecutor::makeEvent_inlock() {
+ if (_inShutdown)
+ return StatusWith<EventHandle>(ErrorCodes::ShutdownInProgress, "Shutdown in progress");
+
+ _unsignaledEvents.emplace_back();
+ auto event = std::make_shared<Event>(this, --_unsignaledEvents.end());
+ setEventForHandle(&_unsignaledEvents.back(), std::move(event));
+ return _unsignaledEvents.back();
+}
+
+void ReplicationExecutor::signalEvent(const EventHandle& eventHandle) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ signalEvent_inlock(eventHandle);
+}
+
+void ReplicationExecutor::signalEvent_inlock(const EventHandle& eventHandle) {
+ Event* event = _getEventFromHandle(eventHandle);
+ event->_signal_inlock();
+ _unsignaledEvents.erase(event->_iter);
+}
+
+void ReplicationExecutor::waitForEvent(const EventHandle& event) {
+ _getEventFromHandle(event)->waitUntilSignaled();
+}
+
+void ReplicationExecutor::cancel(const CallbackHandle& cbHandle) {
+ _getCallbackFromHandle(cbHandle)->cancel();
+};
+
+void ReplicationExecutor::wait(const CallbackHandle& cbHandle) {
+ _getCallbackFromHandle(cbHandle)->waitForCompletion();
+};
+
+StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::onEvent(
+ const EventHandle& eventHandle, const CallbackFn& work) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ WorkQueue* queue = &_readyQueue;
+ Event* event = _getEventFromHandle(eventHandle);
+ if (!event->_isSignaled) {
+ queue = &event->_waiters;
+ } else {
+ queue = &_readyQueue;
+ }
+ return enqueueWork_inlock(queue, work);
+}
+
+static void remoteCommandFinished(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplicationExecutor::RemoteCommandCallbackFn& cb,
+ const RemoteCommandRequest& request,
+ const ResponseStatus& response) {
+ if (cbData.status.isOK()) {
cb(ReplicationExecutor::RemoteCommandCallbackArgs(
- cbData.executor,
- cbData.myHandle,
- request,
- ResponseStatus(cbData.status)));
- }
-
- void ReplicationExecutor::_finishRemoteCommand(
- const RemoteCommandRequest& request,
- const ResponseStatus& response,
- const CallbackHandle& cbHandle,
- const uint64_t expectedHandleGeneration,
- const RemoteCommandCallbackFn& cb) {
-
- Callback* callback = _getCallbackFromHandle(cbHandle);
- const WorkQueue::iterator iter = callback->_iter;
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
- return;
- }
-
- if (expectedHandleGeneration != iter->generation) {
- return;
- }
-
- LOG(4) << "Received remote response: "
- << (response.isOK() ? response.getValue().toString() :
- response.getStatus().toString());
-
- callback->_callbackFn = stdx::bind(remoteCommandFinished,
- stdx::placeholders::_1,
- cb,
- request,
- response);
- _readyQueue.splice(_readyQueue.end(), _networkInProgressQueue, iter);
- }
-
- StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleRemoteCommand(
- const RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) {
- RemoteCommandRequest scheduledRequest = request;
- if (request.timeout == RemoteCommandRequest::kNoTimeout) {
- scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate;
- }
- else {
- scheduledRequest.expirationDate = _networkInterface->now() + scheduledRequest.timeout;
- }
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- StatusWith<CallbackHandle> handle = enqueueWork_inlock(
- &_networkInProgressQueue,
- stdx::bind(remoteCommandFailedEarly,
- stdx::placeholders::_1,
- cb,
- scheduledRequest));
- if (handle.isOK()) {
- _getCallbackFromHandle(handle.getValue())->_iter->isNetworkOperation = true;
-
- LOG(4) << "Scheduling remote request: " << request.toString();
-
- _networkInterface->startCommand(
- handle.getValue(),
- scheduledRequest,
- stdx::bind(&ReplicationExecutor::_finishRemoteCommand,
+ cbData.executor, cbData.myHandle, request, response));
+ } else {
+ cb(ReplicationExecutor::RemoteCommandCallbackArgs(
+ cbData.executor, cbData.myHandle, request, ResponseStatus(cbData.status)));
+ }
+}
+
+static void remoteCommandFailedEarly(const ReplicationExecutor::CallbackArgs& cbData,
+ const ReplicationExecutor::RemoteCommandCallbackFn& cb,
+ const RemoteCommandRequest& request) {
+ invariant(!cbData.status.isOK());
+ cb(ReplicationExecutor::RemoteCommandCallbackArgs(
+ cbData.executor, cbData.myHandle, request, ResponseStatus(cbData.status)));
+}
+
+void ReplicationExecutor::_finishRemoteCommand(const RemoteCommandRequest& request,
+ const ResponseStatus& response,
+ const CallbackHandle& cbHandle,
+ const uint64_t expectedHandleGeneration,
+ const RemoteCommandCallbackFn& cb) {
+ Callback* callback = _getCallbackFromHandle(cbHandle);
+ const WorkQueue::iterator iter = callback->_iter;
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_inShutdown) {
+ return;
+ }
+
+ if (expectedHandleGeneration != iter->generation) {
+ return;
+ }
+
+ LOG(4) << "Received remote response: " << (response.isOK() ? response.getValue().toString()
+ : response.getStatus().toString());
+
+ callback->_callbackFn =
+ stdx::bind(remoteCommandFinished, stdx::placeholders::_1, cb, request, response);
+ _readyQueue.splice(_readyQueue.end(), _networkInProgressQueue, iter);
+}
+
+StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleRemoteCommand(
+ const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) {
+ RemoteCommandRequest scheduledRequest = request;
+ if (request.timeout == RemoteCommandRequest::kNoTimeout) {
+ scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate;
+ } else {
+ scheduledRequest.expirationDate = _networkInterface->now() + scheduledRequest.timeout;
+ }
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ StatusWith<CallbackHandle> handle = enqueueWork_inlock(
+ &_networkInProgressQueue,
+ stdx::bind(remoteCommandFailedEarly, stdx::placeholders::_1, cb, scheduledRequest));
+ if (handle.isOK()) {
+ _getCallbackFromHandle(handle.getValue())->_iter->isNetworkOperation = true;
+
+ LOG(4) << "Scheduling remote request: " << request.toString();
+
+ _networkInterface->startCommand(
+ handle.getValue(),
+ scheduledRequest,
+ stdx::bind(&ReplicationExecutor::_finishRemoteCommand,
+ this,
+ scheduledRequest,
+ stdx::placeholders::_1,
+ handle.getValue(),
+ _getCallbackFromHandle(handle.getValue())->_iter->generation,
+ cb));
+ }
+ return handle;
+}
+
+StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleWork(
+ const CallbackFn& work) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _networkInterface->signalWorkAvailable();
+ return enqueueWork_inlock(&_readyQueue, work);
+}
+
+StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleWorkAt(
+ Date_t when, const CallbackFn& work) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ WorkQueue temp;
+ StatusWith<CallbackHandle> cbHandle = enqueueWork_inlock(&temp, work);
+ if (!cbHandle.isOK())
+ return cbHandle;
+ _getCallbackFromHandle(cbHandle.getValue())->_iter->readyDate = when;
+ WorkQueue::iterator insertBefore = _sleepersQueue.begin();
+ while (insertBefore != _sleepersQueue.end() && insertBefore->readyDate <= when)
+ ++insertBefore;
+ _sleepersQueue.splice(insertBefore, temp, temp.begin());
+ return cbHandle;
+}
+
+StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleDBWork(
+ const CallbackFn& work) {
+ return scheduleDBWork(work, NamespaceString(), MODE_NONE);
+}
+
+StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleDBWork(
+ const CallbackFn& work, const NamespaceString& nss, LockMode mode) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ StatusWith<CallbackHandle> handle = enqueueWork_inlock(&_dbWorkInProgressQueue, work);
+ if (handle.isOK()) {
+ auto doOp = stdx::bind(&ReplicationExecutor::_doOperation,
this,
- scheduledRequest,
stdx::placeholders::_1,
+ stdx::placeholders::_2,
handle.getValue(),
- _getCallbackFromHandle(handle.getValue())->_iter->generation,
- cb));
- }
- return handle;
- }
-
- StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleWork(
- const CallbackFn& work) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _networkInterface->signalWorkAvailable();
- return enqueueWork_inlock(&_readyQueue, work);
- }
-
- StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleWorkAt(
- Date_t when,
- const CallbackFn& work) {
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- WorkQueue temp;
- StatusWith<CallbackHandle> cbHandle = enqueueWork_inlock(&temp, work);
- if (!cbHandle.isOK())
- return cbHandle;
- _getCallbackFromHandle(cbHandle.getValue())->_iter->readyDate = when;
- WorkQueue::iterator insertBefore = _sleepersQueue.begin();
- while (insertBefore != _sleepersQueue.end() && insertBefore->readyDate <= when)
- ++insertBefore;
- _sleepersQueue.splice(insertBefore, temp, temp.begin());
- return cbHandle;
- }
-
- StatusWith<ReplicationExecutor::CallbackHandle>
- ReplicationExecutor::scheduleDBWork(const CallbackFn& work) {
- return scheduleDBWork(work, NamespaceString(), MODE_NONE);
- }
-
- StatusWith<ReplicationExecutor::CallbackHandle>
- ReplicationExecutor::scheduleDBWork(const CallbackFn& work,
- const NamespaceString& nss,
- LockMode mode) {
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- StatusWith<CallbackHandle> handle = enqueueWork_inlock(&_dbWorkInProgressQueue,
- work);
- if (handle.isOK()) {
- auto doOp = stdx::bind(
- &ReplicationExecutor::_doOperation,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- handle.getValue(),
- &_dbWorkInProgressQueue,
- nullptr);
- auto task = [doOp](OperationContext* txn, const Status& status) {
- makeNoExcept(stdx::bind(doOp, txn, status))();
- return TaskRunner::NextAction::kDisposeOperationContext;
- };
- if (mode == MODE_NONE && nss.ns().empty()) {
- _dblockTaskRunner.schedule(task);
- }
- else {
- _dblockTaskRunner.schedule(DatabaseTask::makeCollectionLockTask(task, nss, mode));
- }
- }
- return handle;
- }
-
- void ReplicationExecutor::_doOperation(OperationContext* txn,
- const Status& taskRunnerStatus,
- const CallbackHandle& cbHandle,
- WorkQueue* workQueue,
- stdx::mutex* terribleExLockSyncMutex) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_inShutdown)
- return;
- Callback* callback = _getCallbackFromHandle(cbHandle);
- const WorkQueue::iterator iter = callback->_iter;
- iter->callback = CallbackHandle();
- _freeQueue.splice(_freeQueue.begin(), *workQueue, iter);
- lk.unlock();
- {
- std::unique_ptr<stdx::lock_guard<stdx::mutex> > terribleLock(
- terribleExLockSyncMutex ?
- new stdx::lock_guard<stdx::mutex>(*terribleExLockSyncMutex) :
- nullptr);
- // Only possible task runner error status is CallbackCanceled.
- callback->_callbackFn(CallbackArgs(this,
- cbHandle,
- (callback->_isCanceled || !taskRunnerStatus.isOK() ?
- Status(ErrorCodes::CallbackCanceled,
- "Callback canceled") :
- Status::OK()),
- txn));
+ &_dbWorkInProgressQueue,
+ nullptr);
+ auto task = [doOp](OperationContext* txn, const Status& status) {
+ makeNoExcept(stdx::bind(doOp, txn, status))();
+ return TaskRunner::NextAction::kDisposeOperationContext;
+ };
+ if (mode == MODE_NONE && nss.ns().empty()) {
+ _dblockTaskRunner.schedule(task);
+ } else {
+ _dblockTaskRunner.schedule(DatabaseTask::makeCollectionLockTask(task, nss, mode));
}
- lk.lock();
- signalEvent_inlock(callback->_finishedEvent);
}
-
- StatusWith<ReplicationExecutor::CallbackHandle>
- ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock(
- const CallbackFn& work) {
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- StatusWith<CallbackHandle> handle = enqueueWork_inlock(&_exclusiveLockInProgressQueue,
- work);
- if (handle.isOK()) {
- auto doOp = stdx::bind(
- &ReplicationExecutor::_doOperation,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- handle.getValue(),
- &_exclusiveLockInProgressQueue,
- &_terribleExLockSyncMutex);
- _dblockExclusiveLockTaskRunner.schedule(
- DatabaseTask::makeGlobalExclusiveLockTask(
- [doOp](OperationContext* txn, const Status& status) {
+ return handle;
+}
+
+void ReplicationExecutor::_doOperation(OperationContext* txn,
+ const Status& taskRunnerStatus,
+ const CallbackHandle& cbHandle,
+ WorkQueue* workQueue,
+ stdx::mutex* terribleExLockSyncMutex) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (_inShutdown)
+ return;
+ Callback* callback = _getCallbackFromHandle(cbHandle);
+ const WorkQueue::iterator iter = callback->_iter;
+ iter->callback = CallbackHandle();
+ _freeQueue.splice(_freeQueue.begin(), *workQueue, iter);
+ lk.unlock();
+ {
+ std::unique_ptr<stdx::lock_guard<stdx::mutex>> terribleLock(
+ terribleExLockSyncMutex ? new stdx::lock_guard<stdx::mutex>(*terribleExLockSyncMutex)
+ : nullptr);
+ // Only possible task runner error status is CallbackCanceled.
+ callback->_callbackFn(
+ CallbackArgs(this,
+ cbHandle,
+ (callback->_isCanceled || !taskRunnerStatus.isOK()
+ ? Status(ErrorCodes::CallbackCanceled, "Callback canceled")
+ : Status::OK()),
+ txn));
+ }
+ lk.lock();
+ signalEvent_inlock(callback->_finishedEvent);
+}
+
+StatusWith<ReplicationExecutor::CallbackHandle>
+ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock(const CallbackFn& work) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ StatusWith<CallbackHandle> handle = enqueueWork_inlock(&_exclusiveLockInProgressQueue, work);
+ if (handle.isOK()) {
+ auto doOp = stdx::bind(&ReplicationExecutor::_doOperation,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ handle.getValue(),
+ &_exclusiveLockInProgressQueue,
+ &_terribleExLockSyncMutex);
+ _dblockExclusiveLockTaskRunner.schedule(DatabaseTask::makeGlobalExclusiveLockTask(
+ [doOp](OperationContext* txn, const Status& status) {
makeNoExcept(stdx::bind(doOp, txn, status))();
return TaskRunner::NextAction::kDisposeOperationContext;
}));
- }
- return handle;
- }
-
- std::pair<ReplicationExecutor::WorkItem, ReplicationExecutor::CallbackHandle>
- ReplicationExecutor::getWork() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (true) {
- const Date_t now = _networkInterface->now();
- Date_t nextWakeupDate = scheduleReadySleepers_inlock(now);
- if (!_readyQueue.empty()) {
- break;
- }
- else if (_inShutdown) {
- return std::make_pair(WorkItem(), CallbackHandle());
- }
- lk.unlock();
- if (nextWakeupDate == Date_t::max()) {
- _networkInterface->waitForWork();
- }
- else {
- _networkInterface->waitForWorkUntil(nextWakeupDate);
- }
- lk.lock();
- }
- const WorkItem work = *_readyQueue.begin();
- const CallbackHandle cbHandle = work.callback;
- _readyQueue.begin()->callback = CallbackHandle();
- _freeQueue.splice(_freeQueue.begin(), _readyQueue, _readyQueue.begin());
- return std::make_pair(work, cbHandle);
- }
-
- int64_t ReplicationExecutor::nextRandomInt64(int64_t limit) {
- return _random.nextInt64(limit);
- }
-
- Date_t ReplicationExecutor::scheduleReadySleepers_inlock(const Date_t now) {
- WorkQueue::iterator iter = _sleepersQueue.begin();
- while ((iter != _sleepersQueue.end()) && (iter->readyDate <= now)) {
- ++iter;
- }
- _readyQueue.splice(_readyQueue.end(), _sleepersQueue, _sleepersQueue.begin(), iter);
- if (iter == _sleepersQueue.end()) {
- // indicate no sleeper to wait for
- return Date_t::max();
- }
- return iter->readyDate;
- }
-
- StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::enqueueWork_inlock(
- WorkQueue* queue, const CallbackFn& callbackFn) {
-
- invariant(callbackFn);
- StatusWith<EventHandle> event = makeEvent_inlock();
- if (!event.isOK())
- return StatusWith<CallbackHandle>(event.getStatus());
-
- if (_freeQueue.empty())
- _freeQueue.push_front(WorkItem());
- const WorkQueue::iterator iter = _freeQueue.begin();
- WorkItem& work = *iter;
-
- invariant(!work.callback.isValid());
- setCallbackForHandle(&work.callback, std::shared_ptr<executor::TaskExecutor::CallbackState>(
- new Callback(this, callbackFn, iter, event.getValue())));
-
- work.generation++;
- work.finishedEvent = event.getValue();
- work.readyDate = Date_t();
- queue->splice(queue->end(), _freeQueue, iter);
- return StatusWith<CallbackHandle>(work.callback);
}
-
- ReplicationExecutor::WorkItem::WorkItem() : generation(0U),
- isNetworkOperation(false) {}
-
- ReplicationExecutor::Event::Event(ReplicationExecutor* executor,
- const EventList::iterator& iter) :
- executor::TaskExecutor::EventState(), _executor(executor), _isSignaled(false), _iter(iter) {}
-
- ReplicationExecutor::Event::~Event() {}
-
- void ReplicationExecutor::Event::signal() {
- // Must go through executor to signal so that this can be removed from the _unsignaledEvents
- // EventList.
- _executor->signalEvent(*_iter);
- }
-
- void ReplicationExecutor::Event::_signal_inlock() {
- invariant(!_isSignaled);
- _isSignaled = true;
-
- if (!_waiters.empty()) {
- _executor->_readyQueue.splice(_executor->_readyQueue.end(), _waiters);
- _executor->_networkInterface->signalWorkAvailable();
+ return handle;
+}
+
+std::pair<ReplicationExecutor::WorkItem, ReplicationExecutor::CallbackHandle>
+ReplicationExecutor::getWork() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (true) {
+ const Date_t now = _networkInterface->now();
+ Date_t nextWakeupDate = scheduleReadySleepers_inlock(now);
+ if (!_readyQueue.empty()) {
+ break;
+ } else if (_inShutdown) {
+ return std::make_pair(WorkItem(), CallbackHandle());
}
-
- _isSignaledCondition.notify_all();
- }
-
- void ReplicationExecutor::Event::waitUntilSignaled() {
- stdx::unique_lock<stdx::mutex> lk(_executor->_mutex);
- ++_executor->_totalEventWaiters;
- while (!_isSignaled) {
- _isSignaledCondition.wait(lk);
+ lk.unlock();
+ if (nextWakeupDate == Date_t::max()) {
+ _networkInterface->waitForWork();
+ } else {
+ _networkInterface->waitForWorkUntil(nextWakeupDate);
}
- --_executor->_totalEventWaiters;
- _executor->maybeNotifyShutdownComplete_inlock();
- }
-
- bool ReplicationExecutor::Event::isSignaled() {
- stdx::lock_guard<stdx::mutex> lk(_executor->_mutex);
- return _isSignaled;
+ lk.lock();
}
-
- ReplicationExecutor::Callback::Callback(ReplicationExecutor* executor,
- const CallbackFn callbackFn,
- const WorkQueue::iterator& iter,
- const EventHandle& finishedEvent) :
- executor::TaskExecutor::CallbackState(),
- _executor(executor),
- _callbackFn(callbackFn),
- _isCanceled(false),
- _iter(iter),
- _finishedEvent(finishedEvent) {}
-
- ReplicationExecutor::Callback::~Callback() {}
-
- void ReplicationExecutor::Callback::cancel() {
- stdx::unique_lock<stdx::mutex> lk(_executor->_mutex);
- _isCanceled = true;
- if (_iter->isNetworkOperation) {
- lk.unlock();
- _executor->_networkInterface->cancelCommand(_iter->callback);
- }
+ const WorkItem work = *_readyQueue.begin();
+ const CallbackHandle cbHandle = work.callback;
+ _readyQueue.begin()->callback = CallbackHandle();
+ _freeQueue.splice(_freeQueue.begin(), _readyQueue, _readyQueue.begin());
+ return std::make_pair(work, cbHandle);
+}
+
+int64_t ReplicationExecutor::nextRandomInt64(int64_t limit) {
+ return _random.nextInt64(limit);
+}
+
+Date_t ReplicationExecutor::scheduleReadySleepers_inlock(const Date_t now) {
+ WorkQueue::iterator iter = _sleepersQueue.begin();
+ while ((iter != _sleepersQueue.end()) && (iter->readyDate <= now)) {
+ ++iter;
+ }
+ _readyQueue.splice(_readyQueue.end(), _sleepersQueue, _sleepersQueue.begin(), iter);
+ if (iter == _sleepersQueue.end()) {
+ // indicate no sleeper to wait for
+ return Date_t::max();
+ }
+ return iter->readyDate;
+}
+
+StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::enqueueWork_inlock(
+ WorkQueue* queue, const CallbackFn& callbackFn) {
+ invariant(callbackFn);
+ StatusWith<EventHandle> event = makeEvent_inlock();
+ if (!event.isOK())
+ return StatusWith<CallbackHandle>(event.getStatus());
+
+ if (_freeQueue.empty())
+ _freeQueue.push_front(WorkItem());
+ const WorkQueue::iterator iter = _freeQueue.begin();
+ WorkItem& work = *iter;
+
+ invariant(!work.callback.isValid());
+ setCallbackForHandle(&work.callback,
+ std::shared_ptr<executor::TaskExecutor::CallbackState>(
+ new Callback(this, callbackFn, iter, event.getValue())));
+
+ work.generation++;
+ work.finishedEvent = event.getValue();
+ work.readyDate = Date_t();
+ queue->splice(queue->end(), _freeQueue, iter);
+ return StatusWith<CallbackHandle>(work.callback);
+}
+
+ReplicationExecutor::WorkItem::WorkItem() : generation(0U), isNetworkOperation(false) {}
+
+ReplicationExecutor::Event::Event(ReplicationExecutor* executor, const EventList::iterator& iter)
+ : executor::TaskExecutor::EventState(), _executor(executor), _isSignaled(false), _iter(iter) {}
+
+ReplicationExecutor::Event::~Event() {}
+
+void ReplicationExecutor::Event::signal() {
+ // Must go through executor to signal so that this can be removed from the _unsignaledEvents
+ // EventList.
+ _executor->signalEvent(*_iter);
+}
+
+void ReplicationExecutor::Event::_signal_inlock() {
+ invariant(!_isSignaled);
+ _isSignaled = true;
+
+ if (!_waiters.empty()) {
+ _executor->_readyQueue.splice(_executor->_readyQueue.end(), _waiters);
+ _executor->_networkInterface->signalWorkAvailable();
+ }
+
+ _isSignaledCondition.notify_all();
+}
+
+void ReplicationExecutor::Event::waitUntilSignaled() {
+ stdx::unique_lock<stdx::mutex> lk(_executor->_mutex);
+ ++_executor->_totalEventWaiters;
+ while (!_isSignaled) {
+ _isSignaledCondition.wait(lk);
+ }
+ --_executor->_totalEventWaiters;
+ _executor->maybeNotifyShutdownComplete_inlock();
+}
+
+bool ReplicationExecutor::Event::isSignaled() {
+ stdx::lock_guard<stdx::mutex> lk(_executor->_mutex);
+ return _isSignaled;
+}
+
+ReplicationExecutor::Callback::Callback(ReplicationExecutor* executor,
+ const CallbackFn callbackFn,
+ const WorkQueue::iterator& iter,
+ const EventHandle& finishedEvent)
+ : executor::TaskExecutor::CallbackState(),
+ _executor(executor),
+ _callbackFn(callbackFn),
+ _isCanceled(false),
+ _iter(iter),
+ _finishedEvent(finishedEvent) {}
+
+ReplicationExecutor::Callback::~Callback() {}
+
+void ReplicationExecutor::Callback::cancel() {
+ stdx::unique_lock<stdx::mutex> lk(_executor->_mutex);
+ _isCanceled = true;
+ if (_iter->isNetworkOperation) {
+ lk.unlock();
+ _executor->_networkInterface->cancelCommand(_iter->callback);
}
+}
- void ReplicationExecutor::Callback::waitForCompletion() {
- _executor->waitForEvent(_finishedEvent);
- }
+void ReplicationExecutor::Callback::waitForCompletion() {
+ _executor->waitForEvent(_finishedEvent);
+}
- ReplicationExecutor::Event* ReplicationExecutor::_getEventFromHandle(
- const EventHandle& eventHandle) {
- return static_cast<Event*>(getEventFromHandle(eventHandle));
- }
+ReplicationExecutor::Event* ReplicationExecutor::_getEventFromHandle(
+ const EventHandle& eventHandle) {
+ return static_cast<Event*>(getEventFromHandle(eventHandle));
+}
- ReplicationExecutor::Callback* ReplicationExecutor::_getCallbackFromHandle(
- const CallbackHandle& callbackHandle) {
- return static_cast<Callback*>(getCallbackFromHandle(callbackHandle));
- }
+ReplicationExecutor::Callback* ReplicationExecutor::_getCallbackFromHandle(
+ const CallbackHandle& callbackHandle) {
+ return static_cast<Callback*>(getCallbackFromHandle(callbackHandle));
+}
namespace {
- void callNoExcept(const stdx::function<void ()>& fn) {
- try {
- fn();
- }
- catch (...) {
- std::terminate();
- }
+void callNoExcept(const stdx::function<void()>& fn) {
+ try {
+ fn();
+ } catch (...) {
+ std::terminate();
}
+}
- stdx::function<void ()> makeNoExcept(const stdx::function<void ()> &fn) {
- return stdx::bind(callNoExcept, fn);
- }
+stdx::function<void()> makeNoExcept(const stdx::function<void()>& fn) {
+ return stdx::bind(callNoExcept, fn);
+}
} // namespace