/**
* Copyright 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/repl/scatter_gather_runner.h"
#include
#include "mongo/base/status_with.h"
#include "mongo/db/repl/scatter_gather_algorithm.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
ScatterGatherRunner::ScatterGatherRunner(ScatterGatherAlgorithm* algorithm) :
_algorithm(algorithm),
_started(false) {
}
ScatterGatherRunner::~ScatterGatherRunner() {
}
static void startTrampoline(const ReplicationExecutor::CallbackArgs& cbData,
ScatterGatherRunner* runner,
StatusWith* result) {
// TODO: remove static cast once ScatterGatherRunner is designed to work with a generic
// TaskExecutor.
ReplicationExecutor* executor = static_cast(cbData.executor);
*result = runner->start(executor);
}
Status ScatterGatherRunner::run(ReplicationExecutor* executor) {
StatusWith finishEvh(ErrorCodes::InternalError,
"Not set");
StatusWith startCBH = executor->scheduleWork(
stdx::bind(startTrampoline, stdx::placeholders::_1, this, &finishEvh));
if (!startCBH.isOK()) {
return startCBH.getStatus();
}
executor->wait(startCBH.getValue());
if (!finishEvh.isOK()) {
return finishEvh.getStatus();
}
executor->waitForEvent(finishEvh.getValue());
return Status::OK();
}
StatusWith ScatterGatherRunner::start(
ReplicationExecutor* executor,
const stdx::function& onCompletion) {
invariant(!_started);
_started = true;
_actualResponses = 0;
_onCompletion = onCompletion;
StatusWith evh = executor->makeEvent();
if (!evh.isOK()) {
return evh;
}
_sufficientResponsesReceived = evh.getValue();
ScopeGuard earlyReturnGuard = MakeGuard(
&ScatterGatherRunner::_signalSufficientResponsesReceived,
this,
executor);
const ReplicationExecutor::RemoteCommandCallbackFn cb = stdx::bind(
&ScatterGatherRunner::_processResponse,
stdx::placeholders::_1,
this);
std::vector requests = _algorithm->getRequests();
for (size_t i = 0; i < requests.size(); ++i) {
const StatusWith cbh =
executor->scheduleRemoteCommand(requests[i], cb);
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return StatusWith(cbh.getStatus());
}
fassert(18743, cbh.getStatus());
_callbacks.push_back(cbh.getValue());
}
if (_callbacks.empty() || _algorithm->hasReceivedSufficientResponses()) {
invariant(_algorithm->hasReceivedSufficientResponses());
_signalSufficientResponsesReceived(executor);
}
earlyReturnGuard.Dismiss();
return evh;
}
void ScatterGatherRunner::cancel(ReplicationExecutor* executor) {
invariant(_started);
_signalSufficientResponsesReceived(executor);
}
void ScatterGatherRunner::_processResponse(
const ReplicationExecutor::RemoteCommandCallbackArgs& cbData,
ScatterGatherRunner* runner) {
// It is possible that the ScatterGatherRunner has already gone out of scope, if the
// response indicates the callback was canceled. In that case, do not access any members
// of "runner" and return immediately.
if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) {
return;
}
++runner->_actualResponses;
runner->_algorithm->processResponse(cbData.request, cbData.response);
if (runner->_algorithm->hasReceivedSufficientResponses()) {
// TODO: remove static cast once ScatterGatherRunner is designed to work with a generic
// TaskExecutor.
ReplicationExecutor* executor = static_cast(cbData.executor);
runner->_signalSufficientResponsesReceived(executor);
}
else {
invariant(runner->_actualResponses < runner->_callbacks.size());
}
}
void ScatterGatherRunner::_signalSufficientResponsesReceived(ReplicationExecutor* executor) {
if (_sufficientResponsesReceived.isValid()) {
std::for_each(_callbacks.begin(),
_callbacks.end(),
stdx::bind(&ReplicationExecutor::cancel,
executor,
stdx::placeholders::_1));
const ReplicationExecutor::EventHandle h = _sufficientResponsesReceived;
_sufficientResponsesReceived = ReplicationExecutor::EventHandle();
if (_onCompletion) {
_onCompletion();
}
executor->signalEvent(h);
}
}
} // namespace repl
} // namespace mongo