/**
* Copyright 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/repl/scatter_gather_runner.h"
#include
#include "mongo/base/status_with.h"
#include "mongo/db/repl/scatter_gather_algorithm.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
using executor::RemoteCommandRequest;
using LockGuard = stdx::lock_guard;
using CallbackHandle = executor::TaskExecutor::CallbackHandle;
using EventHandle = executor::TaskExecutor::EventHandle;
using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn;
ScatterGatherRunner::ScatterGatherRunner(ScatterGatherAlgorithm* algorithm,
executor::TaskExecutor* executor)
: _executor(executor), _impl(std::make_shared(algorithm, executor)) {}
Status ScatterGatherRunner::run() {
auto finishEvh = start();
if (!finishEvh.isOK()) {
return finishEvh.getStatus();
}
_executor->waitForEvent(finishEvh.getValue());
return Status::OK();
}
StatusWith ScatterGatherRunner::start() {
// Callback has a shared pointer to the RunnerImpl, so it's always safe to
// access the RunnerImpl.
// Note: this creates a cycle of shared_ptr:
// RunnerImpl -> Callback in _callbacks -> RunnerImpl
// We must remove callbacks after using them, to break this cycle.
std::shared_ptr& impl = _impl;
auto cb = [impl](const RemoteCommandCallbackArgs& cbData) { impl->processResponse(cbData); };
return _impl->start(cb);
}
void ScatterGatherRunner::cancel() {
_impl->cancel();
}
/**
* Scatter gather runner implementation.
*/
ScatterGatherRunner::RunnerImpl::RunnerImpl(ScatterGatherAlgorithm* algorithm,
executor::TaskExecutor* executor)
: _executor(executor), _algorithm(algorithm) {}
StatusWith ScatterGatherRunner::RunnerImpl::start(
const RemoteCommandCallbackFn processResponseCB) {
LockGuard lk(_mutex);
invariant(!_started);
_started = true;
StatusWith evh = _executor->makeEvent();
if (!evh.isOK()) {
return evh;
}
_sufficientResponsesReceived = evh.getValue();
ScopeGuard earlyReturnGuard = MakeGuard(&RunnerImpl::_signalSufficientResponsesReceived, this);
std::vector requests = _algorithm->getRequests();
for (size_t i = 0; i < requests.size(); ++i) {
const StatusWith cbh =
_executor->scheduleRemoteCommand(requests[i], processResponseCB);
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return StatusWith(cbh.getStatus());
}
fassert(18743, cbh.getStatus());
_callbacks.push_back(cbh.getValue());
}
if (_callbacks.empty() || _algorithm->hasReceivedSufficientResponses()) {
invariant(_algorithm->hasReceivedSufficientResponses());
_signalSufficientResponsesReceived();
}
earlyReturnGuard.Dismiss();
return evh;
}
void ScatterGatherRunner::RunnerImpl::cancel() {
LockGuard lk(_mutex);
invariant(_started);
_signalSufficientResponsesReceived();
}
void ScatterGatherRunner::RunnerImpl::processResponse(
const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
LockGuard lk(_mutex);
if (!_sufficientResponsesReceived.isValid()) {
// We've received sufficient responses and it's not safe to access the algorithm any more.
return;
}
// Remove the callback from our vector to break the cycle of shared_ptr.
auto iter = std::find(_callbacks.begin(), _callbacks.end(), cbData.myHandle);
invariant(iter != _callbacks.end());
std::swap(*iter, _callbacks.back());
_callbacks.pop_back();
if (cbData.response.status == ErrorCodes::CallbackCanceled) {
return;
}
_algorithm->processResponse(cbData.request, cbData.response);
if (_algorithm->hasReceivedSufficientResponses()) {
_signalSufficientResponsesReceived();
} else {
invariant(!_callbacks.empty());
}
}
void ScatterGatherRunner::RunnerImpl::_signalSufficientResponsesReceived() {
if (_sufficientResponsesReceived.isValid()) {
std::for_each(
_callbacks.begin(),
_callbacks.end(),
stdx::bind(&executor::TaskExecutor::cancel, _executor, stdx::placeholders::_1));
// Clear _callbacks to break the cycle of shared_ptr.
_callbacks.clear();
_executor->signalEvent(_sufficientResponsesReceived);
_sufficientResponsesReceived = EventHandle();
}
}
} // namespace repl
} // namespace mongo