summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/data_replicator.cpp16
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp21
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp9
-rw-r--r--src/mongo/db/repl/reporter.cpp365
-rw-r--r--src/mongo/db/repl/reporter.h131
-rw-r--r--src/mongo/db/repl/reporter_test.cpp720
-rw-r--r--src/mongo/unittest/SConscript10
-rw-r--r--src/mongo/unittest/task_executor_proxy.cpp113
-rw-r--r--src/mongo/unittest/task_executor_proxy.h77
10 files changed, 1213 insertions, 251 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index c6a5250be46..5d38684ffdf 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -517,6 +517,7 @@ env.Library(
'reporter.cpp',
],
LIBDEPS=[
+ 'replica_set_messages',
'replication_executor',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/namespace_string',
@@ -530,6 +531,7 @@ env.CppUnitTest(
LIBDEPS=[
'reporter',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
+ '$BUILD_DIR/mongo/unittest/task_executor_proxy',
],
)
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 3b8580fbdaa..a01ac6e1b5f 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -719,9 +719,9 @@ TimestampStatus DataReplicator::initialSync() {
_setState_inlock(DataReplicatorState::InitialSync);
- // The reporter is paused for the duration of the initial sync, so cancel just in case.
+ // The reporter is paused for the duration of the initial sync, so shut down just in case.
if (_reporter) {
- _reporter->cancel();
+ _reporter->shutdown();
}
_reporterPaused = true;
_applierPaused = true;
@@ -892,7 +892,7 @@ void DataReplicator::_cancelAllHandles_inlock() {
if (_applier)
_applier->cancel();
if (_reporter)
- _reporter->cancel();
+ _reporter->shutdown();
if (_initialSyncState && _initialSyncState->dbsCloner.isActive())
_initialSyncState->dbsCloner.cancel();
}
@@ -903,7 +903,7 @@ void DataReplicator::_waitOnAll_inlock() {
if (_applier)
_applier->wait();
if (_reporter)
- _reporter->wait();
+ _reporter->join();
if (_initialSyncState)
_initialSyncState->dbsCloner.wait();
}
@@ -1017,9 +1017,11 @@ void DataReplicator::_doNextActions_Steady_inlock() {
_scheduleApplyBatch_inlock();
}
- if (!_reporterPaused && (!_reporter || !_reporter->getStatus().isOK())) {
- _reporter.reset(
- new Reporter(_exec, _opts.prepareReplSetUpdatePositionCommandFn, _syncSource));
+ // TODO(benety): Initialize from replica set config election timeout / 2.
+ Milliseconds keepAliveInterval(1000);
+ if (!_reporterPaused && (!_reporter || !_reporter->isActive()) && !_syncSource.empty()) {
+ _reporter.reset(new Reporter(
+ _exec, _opts.prepareReplSetUpdatePositionCommandFn, _syncSource, keepAliveInterval));
}
}
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index f4d2cc61a9d..e0a4005acd5 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -190,9 +190,9 @@ protected:
return _rollbackFn(txn, lastOpTimeWritten, syncSource);
};
- options.prepareReplSetUpdatePositionCommandFn = []() -> StatusWith<BSONObj> {
- return BSON(UpdatePositionArgs::kCommandFieldName << 1);
- };
+ options.prepareReplSetUpdatePositionCommandFn =
+ [](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle)
+ -> StatusWith<BSONObj> { return BSON(UpdatePositionArgs::kCommandFieldName << 1); };
options.getMyLastOptime = [this]() { return _myLastOpTime; };
options.setMyLastOptime = [this](const OpTime& opTime) { _setMyLastOptime(opTime); };
options.setFollowerMode = [this](const MemberState& state) {
@@ -1006,15 +1006,20 @@ TEST_F(SteadyStateTest, ApplyOneOperation) {
// Ensure that we send position information upstream after completing batch.
net->enterNetwork();
- ASSERT_TRUE(net->hasReadyRequests());
- {
+ bool found = false;
+ while (net->hasReadyRequests()) {
auto networkRequest = net->getNextReadyRequest();
auto commandRequest = networkRequest->getRequest();
- ASSERT_EQUALS("admin", commandRequest.dbname);
const auto& cmdObj = commandRequest.cmdObj;
- ASSERT_EQUALS(std::string(UpdatePositionArgs::kCommandFieldName),
- cmdObj.firstElementFieldName());
+ if (str::equals(cmdObj.firstElementFieldName(), UpdatePositionArgs::kCommandFieldName) &&
+ commandRequest.dbname == "admin") {
+ found = true;
+ break;
+ } else {
+ net->blackHole(networkRequest);
+ }
}
+ ASSERT_TRUE(found);
}
} // namespace
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index addf7fb36b7..aa64f41ac7e 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -184,10 +184,11 @@ DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCo
options.applierFn = [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); };
options.rollbackFn =
[](OperationContext*, const OpTime&, const HostAndPort&) { return Status::OK(); };
- options.prepareReplSetUpdatePositionCommandFn = [replCoord]() -> StatusWith<BSONObj> {
- return replCoord->prepareReplSetUpdatePositionCommand(
- ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle);
- };
+ options.prepareReplSetUpdatePositionCommandFn =
+ [replCoord](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle)
+ -> StatusWith<BSONObj> {
+ return replCoord->prepareReplSetUpdatePositionCommand(commandStyle);
+ };
options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); };
options.setMyLastOptime =
[replCoord](const OpTime& opTime) { replCoord->setMyLastAppliedOpTime(opTime); };
diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp
index 4a0160d6211..b9e9d9cb8e8 100644
--- a/src/mongo/db/repl/reporter.cpp
+++ b/src/mongo/db/repl/reporter.cpp
@@ -31,132 +31,373 @@
#include "mongo/platform/basic.h"
#include "mongo/db/repl/reporter.h"
+
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/db/repl/old_update_position_args.h"
+#include "mongo/db/repl/update_position_args.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
namespace mongo {
namespace repl {
-using executor::RemoteCommandRequest;
+namespace {
+
+const char kConfigVersionFieldName[] = "configVersion";
+
+/**
+ * Returns configuration version in update command object.
+ * Returns -1 on failure.
+ */
+template <typename UpdatePositionArgsType>
+long long _parseCommandRequestConfigVersion(const BSONObj& commandRequest) {
+ UpdatePositionArgsType args;
+ if (!args.initialize(commandRequest).isOK()) {
+ return -1;
+ }
+ if (args.updatesBegin() == args.updatesEnd()) {
+ return -1;
+ }
+ return args.updatesBegin()->cfgver;
+}
+
+/**
+ * Returns true if config version in replSetUpdatePosition response is higher than config version in
+ * locally generated update command request object.
+ * Returns false if config version is missing in either document.
+ */
+bool _isTargetConfigNewerThanRequest(
+ const BSONObj& commandResult,
+ const BSONObj& commandRequest,
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) {
+ long long targetConfigVersion;
+ if (!bsonExtractIntegerField(commandResult, kConfigVersionFieldName, &targetConfigVersion)
+ .isOK()) {
+ return false;
+ }
+
+ const long long localConfigVersion =
+ commandStyle == ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle
+ ? _parseCommandRequestConfigVersion<UpdatePositionArgs>(commandRequest)
+ : _parseCommandRequestConfigVersion<OldUpdatePositionArgs>(commandRequest);
+ if (localConfigVersion == -1) {
+ return false;
+ }
+
+ return targetConfigVersion > localConfigVersion;
+}
+
+} // namespace
Reporter::Reporter(executor::TaskExecutor* executor,
PrepareReplSetUpdatePositionCommandFn prepareReplSetUpdatePositionCommandFn,
- const HostAndPort& target)
+ const HostAndPort& target,
+ Milliseconds keepAliveInterval)
: _executor(executor),
_prepareReplSetUpdatePositionCommandFn(prepareReplSetUpdatePositionCommandFn),
_target(target),
- _status(Status::OK()),
- _willRunAgain(false),
- _active(false) {
+ _keepAliveInterval(keepAliveInterval) {
uassert(ErrorCodes::BadValue, "null task executor", executor);
uassert(ErrorCodes::BadValue,
"null function to create replSetUpdatePosition command object",
prepareReplSetUpdatePositionCommandFn);
uassert(ErrorCodes::BadValue, "target name cannot be empty", !target.empty());
+ uassert(ErrorCodes::BadValue,
+ "keep alive interval must be positive",
+ keepAliveInterval > Milliseconds(0));
}
Reporter::~Reporter() {
- DESTRUCTOR_GUARD(cancel(););
+ DESTRUCTOR_GUARD(shutdown(); join(););
}
-void Reporter::cancel() {
+HostAndPort Reporter::getTarget() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _target;
+}
+
+Milliseconds Reporter::getKeepAliveInterval() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _keepAliveInterval;
+}
+
+void Reporter::shutdown() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ _status = Status(ErrorCodes::CallbackCanceled, "Reporter no longer valid");
- if (!_active) {
+ if (!_isActive_inlock()) {
return;
}
- _status = Status(ErrorCodes::CallbackCanceled, "Reporter no longer valid");
- _willRunAgain = false;
- invariant(_remoteCommandCallbackHandle.isValid());
- _executor->cancel(_remoteCommandCallbackHandle);
-}
+ _isWaitingToSendReporter = false;
-void Reporter::wait() {
executor::TaskExecutor::CallbackHandle handle;
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (!_active) {
- return;
- }
- if (!_remoteCommandCallbackHandle.isValid()) {
- return;
- }
+ if (_remoteCommandCallbackHandle.isValid()) {
+ invariant(!_prepareAndSendCommandCallbackHandle.isValid());
handle = _remoteCommandCallbackHandle;
+ } else {
+ invariant(!_remoteCommandCallbackHandle.isValid());
+ invariant(_prepareAndSendCommandCallbackHandle.isValid());
+ handle = _prepareAndSendCommandCallbackHandle;
}
- _executor->wait(handle);
+
+ _executor->cancel(handle);
+}
+
+Status Reporter::join() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _condition.wait(lk, [this]() { return !_isActive_inlock(); });
+ return _status;
}
Status Reporter::trigger() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _schedule_inlock();
-}
-Status Reporter::_schedule_inlock() {
+ // If these was a previous error then the reporter is dead and return that error.
if (!_status.isOK()) {
return _status;
}
- if (_active) {
- _willRunAgain = true;
+ if (_keepAliveTimeoutWhen != Date_t()) {
+ // Reset keep alive expiration to signal handler that it was canceled internally.
+ invariant(_prepareAndSendCommandCallbackHandle.isValid());
+ _keepAliveTimeoutWhen = Date_t();
+ _executor->cancel(_prepareAndSendCommandCallbackHandle);
+ return Status::OK();
+ } else if (_isActive_inlock()) {
+ _isWaitingToSendReporter = true;
+ return Status::OK();
+ }
+
+ auto scheduleResult = _executor->scheduleWork(
+ stdx::bind(&Reporter::_prepareAndSendCommandCallback, this, stdx::placeholders::_1, true));
+
+ _status = scheduleResult.getStatus();
+ if (!_status.isOK()) {
+ LOG(2) << "Reporter failed to schedule callback to prepare and send update command: "
+ << _status;
return _status;
}
- LOG(2) << "Reporter scheduling report to : " << _target;
+ _prepareAndSendCommandCallbackHandle = scheduleResult.getValue();
- auto prepareResult = _prepareReplSetUpdatePositionCommandFn();
+ return _status;
+}
- if (!prepareResult.isOK()) {
- // Returning NodeNotFound because currently this is the only way
- // prepareReplSetUpdatePositionCommand() can fail in production.
- return Status(ErrorCodes::NodeNotFound,
- "Reporter failed to create replSetUpdatePositionCommand command.");
- }
- auto cmdObj = prepareResult.getValue();
- StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult =
- _executor->scheduleRemoteCommand(
- RemoteCommandRequest(_target, "admin", cmdObj),
- stdx::bind(&Reporter::_callback, this, stdx::placeholders::_1));
-
- if (!scheduleResult.isOK()) {
- _status = scheduleResult.getStatus();
- LOG(2) << "Reporter failed to schedule with status: " << _status;
+StatusWith<BSONObj> Reporter::_prepareCommand() {
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle =
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle;
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_isActive_inlock());
+ if (!_status.isOK()) {
+ return _status;
+ }
+
+ commandStyle = _commandStyle;
+ }
+
+ auto prepareResult = _prepareReplSetUpdatePositionCommandFn(commandStyle);
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ // Reporter could have been canceled while preparing the command.
+ if (!_status.isOK()) {
+ return _status;
+ }
+ // If there was an error in preparing the command, abort and return that error.
+ if (!prepareResult.isOK()) {
+ LOG(2) << "Reporter failed to prepare update command with status: "
+ << prepareResult.getStatus();
+ _status = prepareResult.getStatus();
return _status;
}
- _active = true;
- _willRunAgain = false;
+ return prepareResult.getValue();
+}
+
+void Reporter::_sendCommand_inlock(BSONObj commandRequest) {
+ LOG(2) << "Reporter sending slave oplog progress to upstream updater " << _target << ": "
+ << commandRequest;
+
+ auto scheduleResult = _executor->scheduleRemoteCommand(
+ executor::RemoteCommandRequest(_target, "admin", commandRequest),
+ stdx::bind(&Reporter::_processResponseCallback, this, stdx::placeholders::_1));
+
+ _status = scheduleResult.getStatus();
+ if (!_status.isOK()) {
+ LOG(2) << "Reporter failed to schedule with status: " << _status;
+ if (_status != ErrorCodes::ShutdownInProgress) {
+ fassert(34434, _status);
+ }
+ return;
+ }
+
_remoteCommandCallbackHandle = scheduleResult.getValue();
- return Status::OK();
}
-void Reporter::_callback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void Reporter::_processResponseCallback(
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- _status = rcbd.response.getStatus();
- _active = false;
+ // If the reporter was shut down before this callback is invoked,
+ // return the canceled "_status".
+ if (!_status.isOK()) {
+ invariant(_status == ErrorCodes::CallbackCanceled);
+ _onShutdown_inlock();
+ return;
+ }
- LOG(2) << "Reporter ended with status: " << _status << " after reporting to " << _target;
- if (_status.isOK() && _willRunAgain) {
- _schedule_inlock();
- } else {
- _willRunAgain = false;
+ _status = rcbd.response.getStatus();
+ if (!_status.isOK()) {
+ _onShutdown_inlock();
+ return;
+ }
+
+ // Override _status with the one embedded in the command result.
+ const auto& commandResult = rcbd.response.getValue().data;
+ _status = getStatusFromCommandResult(commandResult);
+
+ // Some error types are OK and should not cause the reporter to stop sending updates to the
+ // sync target.
+ if (_status == ErrorCodes::BadValue &&
+ _commandStyle == ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle) {
+ LOG(1) << "Reporter falling back to old style UpdatePosition command for sync source: "
+ << _target;
+ _status = Status::OK();
+ _commandStyle = ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle;
+ _isWaitingToSendReporter = true;
+ } else if (_status == ErrorCodes::InvalidReplicaSetConfig &&
+ _isTargetConfigNewerThanRequest(
+ commandResult, rcbd.request.cmdObj, _commandStyle)) {
+ LOG(1) << "Reporter found newer configuration on sync source: " << _target
+ << ". Retrying.";
+ _status = Status::OK();
+ // Do not resend update command immediately.
+ _isWaitingToSendReporter = false;
+ } else if (!_status.isOK()) {
+ _onShutdown_inlock();
+ return;
+ }
+
+ if (!_isWaitingToSendReporter) {
+ // Since we are also on a timer, schedule a report for that interval, or until
+ // triggered.
+ auto when = _executor->now() + _keepAliveInterval;
+ bool fromTrigger = false;
+ auto scheduleResult =
+ _executor->scheduleWorkAt(when,
+ stdx::bind(&Reporter::_prepareAndSendCommandCallback,
+ this,
+ stdx::placeholders::_1,
+ fromTrigger));
+
+ _status = scheduleResult.getStatus();
+ if (!_status.isOK()) {
+ _onShutdown_inlock();
+ return;
+ }
+
+ _prepareAndSendCommandCallbackHandle = scheduleResult.getValue();
+ _keepAliveTimeoutWhen = when;
+
+ _remoteCommandCallbackHandle = executor::TaskExecutor::CallbackHandle();
+ return;
+ }
+ }
+
+ // Must call without holding the lock.
+ auto prepareResult = _prepareCommand();
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (!_status.isOK()) {
+ _onShutdown_inlock();
+ return;
+ }
+
+ _sendCommand_inlock(prepareResult.getValue());
+ if (!_status.isOK()) {
+ _onShutdown_inlock();
+ return;
}
+
+ invariant(_remoteCommandCallbackHandle.isValid());
+ _isWaitingToSendReporter = false;
}
-Status Reporter::getStatus() const {
+void Reporter::_prepareAndSendCommandCallback(const executor::TaskExecutor::CallbackArgs& args,
+ bool fromTrigger) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (!_status.isOK()) {
+ _onShutdown_inlock();
+ return;
+ }
+
+ _status = args.status;
+
+ // Ignore CallbackCanceled status if keep alive was canceled by triggered.
+ if (!fromTrigger && _status == ErrorCodes::CallbackCanceled &&
+ _keepAliveTimeoutWhen == Date_t()) {
+ _status = Status::OK();
+ }
+
+ if (!_status.isOK()) {
+ _onShutdown_inlock();
+ return;
+ }
+ }
+
+ // Must call without holding the lock.
+ auto prepareResult = _prepareCommand();
+
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _status;
+ if (!_status.isOK()) {
+ _onShutdown_inlock();
+ return;
+ }
+
+ _sendCommand_inlock(prepareResult.getValue());
+ if (!_status.isOK()) {
+ _onShutdown_inlock();
+ return;
+ }
+
+ invariant(_remoteCommandCallbackHandle.isValid());
+ _prepareAndSendCommandCallbackHandle = executor::TaskExecutor::CallbackHandle();
+ _keepAliveTimeoutWhen = Date_t();
+}
+
+void Reporter::_onShutdown_inlock() {
+ _isWaitingToSendReporter = false;
+ _remoteCommandCallbackHandle = executor::TaskExecutor::CallbackHandle();
+ _prepareAndSendCommandCallbackHandle = executor::TaskExecutor::CallbackHandle();
+ _keepAliveTimeoutWhen = Date_t();
+ _condition.notify_all();
}
bool Reporter::isActive() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
+ return _isActive_inlock();
+}
+
+bool Reporter::_isActive_inlock() const {
+ return _remoteCommandCallbackHandle.isValid() || _prepareAndSendCommandCallbackHandle.isValid();
+}
+
+bool Reporter::isWaitingToSendReport() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _isWaitingToSendReporter;
}
-bool Reporter::willRunAgain() const {
+Date_t Reporter::getKeepAliveTimeoutWhen_forTest() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _willRunAgain;
+ return _keepAliveTimeoutWhen;
}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/reporter.h b/src/mongo/db/repl/reporter.h
index e9cc9c06196..6082cd8b70c 100644
--- a/src/mongo/db/repl/reporter.h
+++ b/src/mongo/db/repl/reporter.h
@@ -31,12 +31,36 @@
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/time_support.h"
namespace mongo {
namespace repl {
+/**
+ * Once scheduled, the reporter will periodically send the current replication progress, obtained
+ * by invoking "_prepareReplSetUpdatePositionCommandFn", to its sync source until it encounters
+ * an error.
+ *
+ * If the sync source cannot accept the current format (used by server versions 3.2.4 and above) of
+ * the "replSetUpdatePosition" command, the reporter will not abort and instead downgrade the format
+ * of the command it will send upstream.
+ *
+ * While the reporter is active, it will be in one of three states:
+ * 1) triggered and waiting to send command to sync source as soon as possible.
+ * 2) waiting for command response from sync source.
+ * 3) waiting for at least "_keepAliveInterval" ms before sending command to sync source.
+ *
+ * Calling trigger() while the reporter is in state 1 or 2 will cause the reporter to immediately
+ * send a new command upon receiving a successful command response.
+ *
+ * Calling trigger() while it is in state 3 sends a command upstream and cancels the current
+ * keep alive timeout, resetting the keep alive schedule.
+ */
class Reporter {
MONGO_DISALLOW_COPYING(Reporter);
@@ -47,36 +71,51 @@ public:
*
* The returned status indicates whether or not the command was created.
*/
- using PrepareReplSetUpdatePositionCommandFn = stdx::function<StatusWith<BSONObj>()>;
+ using PrepareReplSetUpdatePositionCommandFn = stdx::function<StatusWith<BSONObj>(
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle)>;
Reporter(executor::TaskExecutor* executor,
PrepareReplSetUpdatePositionCommandFn prepareReplSetUpdatePositionCommandFn,
- const HostAndPort& target);
+ const HostAndPort& target,
+ Milliseconds keepAliveInterval);
+
virtual ~Reporter();
/**
+ * Returns sync target.
+ */
+ HostAndPort getTarget() const;
+
+ /**
+ * Returns keep alive interval.
+ * Reporter will periodically send replication status to sync source every "_keepAliveInterval"
+ * until an error occurs.
+ */
+ Milliseconds getKeepAliveInterval() const;
+
+ /**
* Returns true if a remote command has been scheduled (but not completed)
* with the executor.
*/
bool isActive() const;
/**
- * Returns true if a remote command should be scheduled once the current one returns
- * from the executor.
+ * Returns true if new data is available while a remote command is in progress.
+ * The reporter will schedule a subsequent remote update immediately upon successful
+ * completion of the previous command instead of when the keep alive callback runs.
*/
- bool willRunAgain() const;
+ bool isWaitingToSendReport() const;
/**
- * Cancels remote command request.
+ * Cancels both scheduled and active remote command requests.
* Returns immediately if the Reporter is not active.
*/
- void cancel();
+ void shutdown();
/**
- * Waits for last/current executor handle to finish.
- * Returns immediately if the handle is invalid.
+ * Waits until Reporter is inactive and returns reporter status.
*/
- void wait();
+ Status join();
/**
* Signals to the Reporter that there is new information to be sent to the "_target" server.
@@ -84,46 +123,84 @@ public:
*/
Status trigger();
+ // ================== Test support API ===================
+
/**
- * Returns the previous return status so that the owner can decide whether the Reporter
- * needs a new target to whom it can report.
+ * Returns scheduled time of keep alive timeout handler.
*/
- Status getStatus() const;
+ Date_t getKeepAliveTimeoutWhen_forTest() const;
private:
/**
- * Schedules remote command to be run by the executor
+ * Returns true if reporter is active.
*/
- Status _schedule_inlock();
+ bool _isActive_inlock() const;
/**
- * Callback for remote command.
+ * Prepares remote command to be run by the executor.
*/
- void _callback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd);
+ StatusWith<BSONObj> _prepareCommand();
+
+ /**
+ * Schedules remote command to be run by the executor.
+ */
+ void _sendCommand_inlock(BSONObj commandRequest);
+
+ /**
+ * Callback for processing response from remote command.
+ */
+ void _processResponseCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd);
+
+ /**
+ * Callback for preparing and sending remote command.
+ */
+ void _prepareAndSendCommandCallback(const executor::TaskExecutor::CallbackArgs& args,
+ bool fromTrigger);
+
+ /**
+ * Signals end of Reporter work and notifies waiters.
+ */
+ void _onShutdown_inlock();
// Not owned by us.
- executor::TaskExecutor* _executor;
+ executor::TaskExecutor* const _executor;
// Prepares update command object.
- PrepareReplSetUpdatePositionCommandFn _prepareReplSetUpdatePositionCommandFn;
+ const PrepareReplSetUpdatePositionCommandFn _prepareReplSetUpdatePositionCommandFn;
// Host to whom the Reporter sends updates.
- HostAndPort _target;
+ const HostAndPort _target;
- // Protects member data of this Reporter.
+ // Reporter will send updates every "_keepAliveInterval" ms until the reporter is canceled or
+ // encounters an error.
+ const Milliseconds _keepAliveInterval;
+
+ // Protects member data of this Reporter declared below.
mutable stdx::mutex _mutex;
+ mutable stdx::condition_variable _condition;
+
// Stores the most recent Status returned from the executor.
- Status _status;
+ Status _status = Status::OK();
- // _willRunAgain is true when Reporter is scheduled to be run by the executor and subsequent
- // updates have come in.
- bool _willRunAgain;
- // _active is true when Reporter is scheduled to be run by the executor.
- bool _active;
+ // Stores style of the most recent update command object.
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle _commandStyle =
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle;
+
+ // _isWaitingToSendReporter is true when Reporter is scheduled to be run by the executor and
+ // subsequent updates have come in.
+ bool _isWaitingToSendReporter = false;
// Callback handle to the scheduled remote command.
executor::TaskExecutor::CallbackHandle _remoteCommandCallbackHandle;
+
+ // Callback handle to the scheduled task for preparing and sending the remote command.
+ executor::TaskExecutor::CallbackHandle _prepareAndSendCommandCallbackHandle;
+
+ // Keep alive timeout callback will not run before this time.
+ // If this date is Date_t(), the callback is either unscheduled or canceled.
+ // Used for testing only.
+ Date_t _keepAliveTimeoutWhen;
};
} // namespace repl
diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp
index 448e70472b9..3d6b8b81b43 100644
--- a/src/mongo/db/repl/reporter_test.cpp
+++ b/src/mongo/db/repl/reporter_test.cpp
@@ -28,11 +28,14 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/repl/old_update_position_args.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/reporter.h"
+#include "mongo/db/repl/update_position_args.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/executor/network_interface_mock.h"
-
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/task_executor_proxy.h"
#include "mongo/unittest/unittest.h"
namespace {
@@ -46,27 +49,49 @@ using executor::RemoteCommandResponse;
class MockProgressManager {
public:
void updateMap(int memberId, const OpTime& lastDurableOpTime, const OpTime& lastAppliedOpTime) {
- progressMap[memberId] = ProgressInfo(lastDurableOpTime, lastAppliedOpTime);
+ _progressMap[memberId] = ProgressInfo(lastDurableOpTime, lastAppliedOpTime);
}
- void setResult(bool newResult) {
- _result = newResult;
+ void clear() {
+ _progressMap.clear();
}
- bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) {
- if (!_result) {
- return _result;
- }
- cmdBuilder->append("replSetUpdatePosition", 1);
- BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes"));
- for (auto&& itr : progressMap) {
+ long long getConfigVersion() const {
+ return _configVersion;
+ }
+
+ void setConfigVersion(long long configVersion) {
+ _configVersion = configVersion;
+ }
+
+ StatusWith<BSONObj> prepareReplSetUpdatePositionCommand(
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) {
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1);
+ BSONArrayBuilder arrayBuilder(
+ cmdBuilder.subarrayStart(UpdatePositionArgs::kUpdateArrayFieldName));
+ for (auto&& itr : _progressMap) {
BSONObjBuilder entry(arrayBuilder.subobjStart());
- itr.second.lastDurableOpTime.append(&entry, "durableOpTime");
- itr.second.lastAppliedOpTime.append(&entry, "appliedOpTime");
- entry.append("memberId", itr.first);
- entry.append("cfgver", 1);
+ switch (commandStyle) {
+ case ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle:
+ itr.second.lastDurableOpTime.append(
+ &entry, UpdatePositionArgs::kDurableOpTimeFieldName);
+ itr.second.lastAppliedOpTime.append(
+ &entry, UpdatePositionArgs::kAppliedOpTimeFieldName);
+ break;
+ case ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle:
+ // Assume protocol version 1.
+ itr.second.lastDurableOpTime.append(&entry,
+ OldUpdatePositionArgs::kOpTimeFieldName);
+ break;
+ }
+ entry.append(UpdatePositionArgs::kMemberIdFieldName, itr.first);
+ if (_configVersion != -1) {
+ entry.append(UpdatePositionArgs::kConfigVersionFieldName, _configVersion);
+ }
}
- return true;
+ arrayBuilder.done();
+ return cmdBuilder.obj();
}
private:
@@ -81,266 +106,675 @@ private:
OpTime lastAppliedOpTime;
};
- std::map<int, ProgressInfo> progressMap;
- bool _result = true;
+ std::map<int, ProgressInfo> _progressMap;
+ long long _configVersion = 1;
};
class ReporterTest : public executor::ThreadPoolExecutorTest {
public:
ReporterTest();
- void scheduleNetworkResponse(const BSONObj& obj);
- void scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason);
+
+ /**
+ * Schedules response to the current network request.
+ * Returns command object in the network request.
+ */
+ BSONObj scheduleNetworkResponse(const BSONObj& obj);
+ BSONObj scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason);
+
+ /**
+ * Schedules network response and instructs network interface to process response.
+ * Returns command object in the network request.
+ */
+ BSONObj processNetworkResponse(const BSONObj& obj,
+ bool expectReadyRequestsAfterProcessing = false);
+ BSONObj processNetworkResponse(ErrorCodes::Error code,
+ const std::string& reason,
+ bool expectReadyRequestsAfterProcessing = false);
+
+ void runUntil(Date_t when, bool expectReadyRequestsAfterAdvancingClock = false);
+
+ void runReadyScheduledTasks();
+
+ void assertReporterDone();
protected:
void setUp() override;
void tearDown() override;
- std::unique_ptr<Reporter> reporter;
+private:
+ virtual bool triggerAtSetUp() const;
+
+protected:
+ std::unique_ptr<unittest::TaskExecutorProxy> _executorProxy;
std::unique_ptr<MockProgressManager> posUpdater;
Reporter::PrepareReplSetUpdatePositionCommandFn prepareReplSetUpdatePositionCommandFn;
+ std::unique_ptr<Reporter> reporter;
+};
+
+class ReporterTestNoTriggerAtSetUp : public ReporterTest {
+private:
+ virtual bool triggerAtSetUp() const override;
};
ReporterTest::ReporterTest() {}
void ReporterTest::setUp() {
executor::ThreadPoolExecutorTest::setUp();
- posUpdater.reset(new MockProgressManager());
- prepareReplSetUpdatePositionCommandFn = [this]() -> StatusWith<BSONObj> {
- BSONObjBuilder bob;
- if (posUpdater->prepareReplSetUpdatePositionCommand(&bob)) {
- return bob.obj();
- }
- return Status(ErrorCodes::OperationFailed,
- "unable to prepare replSetUpdatePosition command object");
- };
- reporter.reset(new Reporter(&getExecutor(),
- [this]() { return prepareReplSetUpdatePositionCommandFn(); },
- HostAndPort("h1")));
+
+ _executorProxy = stdx::make_unique<unittest::TaskExecutorProxy>(&getExecutor());
+
+ posUpdater = stdx::make_unique<MockProgressManager>();
+ posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1));
+
+ prepareReplSetUpdatePositionCommandFn =
+ stdx::bind(&MockProgressManager::prepareReplSetUpdatePositionCommand,
+ posUpdater.get(),
+ stdx::placeholders::_1);
+
+ reporter = stdx::make_unique<Reporter>(
+ _executorProxy.get(),
+ [this](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) {
+ return prepareReplSetUpdatePositionCommandFn(commandStyle);
+ },
+ HostAndPort("h1"),
+ Milliseconds(1000));
launchExecutorThread();
+
+ if (triggerAtSetUp()) {
+ ASSERT_OK(reporter->trigger());
+ ASSERT_TRUE(reporter->isActive());
+ } else {
+ ASSERT_FALSE(reporter->isActive());
+ }
+ ASSERT_FALSE(reporter->isWaitingToSendReport());
}
void ReporterTest::tearDown() {
executor::ThreadPoolExecutorTest::tearDown();
// Executor may still invoke reporter's callback before shutting down.
- posUpdater.reset();
reporter.reset();
+ posUpdater.reset();
+ prepareReplSetUpdatePositionCommandFn = Reporter::PrepareReplSetUpdatePositionCommandFn();
+ _executorProxy.reset();
}
-void ReporterTest::scheduleNetworkResponse(const BSONObj& obj) {
+bool ReporterTest::triggerAtSetUp() const {
+ return true;
+}
+
+bool ReporterTestNoTriggerAtSetUp::triggerAtSetUp() const {
+ return false;
+}
+
+BSONObj ReporterTest::scheduleNetworkResponse(const BSONObj& obj) {
auto net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
Milliseconds millis(0);
RemoteCommandResponse response(obj, BSONObj(), millis);
executor::TaskExecutor::ResponseStatus responseStatus(response);
- net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
+ auto noi = net->getNextReadyRequest();
+ net->scheduleResponse(noi, net->now(), responseStatus);
+ return noi->getRequest().cmdObj;
}
-void ReporterTest::scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason) {
+BSONObj ReporterTest::scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason) {
auto net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
executor::TaskExecutor::ResponseStatus responseStatus(code, reason);
- net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
+ auto noi = net->getNextReadyRequest();
+ net->scheduleResponse(noi, net->now(), responseStatus);
+ return noi->getRequest().cmdObj;
+}
+
+BSONObj ReporterTest::processNetworkResponse(const BSONObj& obj,
+ bool expectReadyRequestsAfterProcessing) {
+ auto net = getNet();
+ net->enterNetwork();
+ auto cmdObj = scheduleNetworkResponse(obj);
+ net->runReadyNetworkOperations();
+ ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests());
+ net->exitNetwork();
+ return cmdObj;
+}
+
+BSONObj ReporterTest::processNetworkResponse(ErrorCodes::Error code,
+ const std::string& reason,
+ bool expectReadyRequestsAfterProcessing) {
+ auto net = getNet();
+ net->enterNetwork();
+ auto cmdObj = scheduleNetworkResponse(code, reason);
+ net->runReadyNetworkOperations();
+ ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests());
+ net->exitNetwork();
+ return cmdObj;
}
-TEST_F(ReporterTest, InvalidConstruction) {
+void ReporterTest::runUntil(Date_t until, bool expectReadyRequestsAfterAdvancingClock) {
+ auto net = getNet();
+ net->enterNetwork();
+ ASSERT_EQUALS(until, net->runUntil(until));
+ ASSERT_EQUALS(expectReadyRequestsAfterAdvancingClock, net->hasReadyRequests());
+ net->exitNetwork();
+}
+
+void ReporterTest::runReadyScheduledTasks() {
+ auto net = getNet();
+ net->enterNetwork();
+ net->exitNetwork();
+}
+
+void ReporterTest::assertReporterDone() {
+ ASSERT_FALSE(reporter->isActive());
+ ASSERT_FALSE(reporter->isWaitingToSendReport());
+ ASSERT_EQUALS(Date_t(), reporter->getKeepAliveTimeoutWhen_forTest());
+ ASSERT_EQUALS(reporter->join(), reporter->trigger());
+}
+
+TEST_F(ReporterTestNoTriggerAtSetUp, InvalidConstruction) {
// null PrepareReplSetUpdatePositionCommandFn
ASSERT_THROWS(Reporter(&getExecutor(),
Reporter::PrepareReplSetUpdatePositionCommandFn(),
- HostAndPort("h1")),
+ HostAndPort("h1"),
+ Milliseconds(1000)),
UserException);
// null TaskExecutor
- ASSERT_THROWS(Reporter(nullptr, prepareReplSetUpdatePositionCommandFn, HostAndPort("h1")),
- UserException);
+ ASSERT_THROWS_WHAT(
+ Reporter(
+ nullptr, prepareReplSetUpdatePositionCommandFn, HostAndPort("h1"), Milliseconds(1000)),
+ UserException,
+ "null task executor");
+
+ // null PrepareReplSetUpdatePositionCommandFn
+ ASSERT_THROWS_WHAT(Reporter(&getExecutor(),
+ Reporter::PrepareReplSetUpdatePositionCommandFn(),
+ HostAndPort("h1"),
+ Milliseconds(1000)),
+ UserException,
+ "null function to create replSetUpdatePosition command object");
// empty HostAndPort
- ASSERT_THROWS(Reporter(&getExecutor(), prepareReplSetUpdatePositionCommandFn, HostAndPort()),
- UserException);
+ ASSERT_THROWS_WHAT(Reporter(&getExecutor(),
+ prepareReplSetUpdatePositionCommandFn,
+ HostAndPort(),
+ Milliseconds(1000)),
+ UserException,
+ "target name cannot be empty");
+
+ // zero keep alive interval.
+ ASSERT_THROWS_WHAT(
+ Reporter(
+ &getExecutor(), prepareReplSetUpdatePositionCommandFn, HostAndPort("h1"), Seconds(-1)),
+ UserException,
+ "keep alive interval must be positive");
+
+ // negative keep alive interval.
+ ASSERT_THROWS_WHAT(
+ Reporter(
+ &getExecutor(), prepareReplSetUpdatePositionCommandFn, HostAndPort("h1"), Seconds(-1)),
+ UserException,
+ "keep alive interval must be positive");
+}
+
+TEST_F(ReporterTestNoTriggerAtSetUp, GetTarget) {
+ ASSERT_EQUALS(HostAndPort("h1"), reporter->getTarget());
}
-TEST_F(ReporterTest, IsActiveOnceScheduled) {
+TEST_F(ReporterTestNoTriggerAtSetUp, IsActiveOnceScheduled) {
ASSERT_FALSE(reporter->isActive());
ASSERT_OK(reporter->trigger());
ASSERT_TRUE(reporter->isActive());
}
-TEST_F(ReporterTest, CancelWithoutScheduled) {
- ASSERT_FALSE(reporter->isActive());
- reporter->cancel();
+TEST_F(ReporterTestNoTriggerAtSetUp, ShutdownWithoutScheduledStopsTheReporter) {
ASSERT_FALSE(reporter->isActive());
+ reporter->shutdown();
+ Status expectedStatus(ErrorCodes::CallbackCanceled, "Reporter no longer valid");
+ ASSERT_EQUALS(expectedStatus, reporter->join());
+ assertReporterDone();
}
-TEST_F(ReporterTest, ShutdownBeforeSchedule) {
+TEST_F(ReporterTestNoTriggerAtSetUp,
+ ShuttingExecutorDownBeforeActivatingReporterPreventsTheReporterFromStarting) {
getExecutor().shutdown();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, reporter->trigger());
- ASSERT_FALSE(reporter->isActive());
+ assertReporterDone();
}
-// If an error is returned, it should be recorded in the Reporter and be returned when triggered
-TEST_F(ReporterTest, ErrorsAreStoredInTheReporter) {
- posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1));
+// If an error is returned, it should be recorded in the Reporter and not run again.
+TEST_F(ReporterTest, TaskExecutorAndNetworkErrorsStopTheReporter) {
ASSERT_OK(reporter->trigger());
ASSERT_TRUE(reporter->isActive());
+ ASSERT_TRUE(reporter->isWaitingToSendReport());
- auto net = getNet();
- net->enterNetwork();
- scheduleNetworkResponse(ErrorCodes::NoSuchKey, "waaaah");
- net->runReadyNetworkOperations();
- ASSERT_FALSE(net->hasReadyRequests());
- net->exitNetwork();
+ processNetworkResponse(ErrorCodes::NoSuchKey, "waaaah");
- ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->getStatus());
- ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->trigger());
- ASSERT_FALSE(reporter->isActive());
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->join());
+ assertReporterDone();
}
-// If an error is returned, it should be recorded in the Reporter and not run again.
-TEST_F(ReporterTest, ErrorsStopTheReporter) {
- posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1));
+TEST_F(ReporterTest, UnsuccessfulCommandResponseStopsTheReporter) {
+ processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::UnknownError) << "errmsg"
+ << "unknown error"));
+
+ ASSERT_EQUALS(Status(ErrorCodes::UnknownError, "unknown error"), reporter->join());
+ assertReporterDone();
+}
+
+TEST_F(ReporterTestNoTriggerAtSetUp,
+ InvalidReplicaSetResponseToARequestWithoutConfigVersionStopsTheReporter) {
+ posUpdater->setConfigVersion(-1);
ASSERT_OK(reporter->trigger());
ASSERT_TRUE(reporter->isActive());
- ASSERT_FALSE(reporter->willRunAgain());
+
+ processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::InvalidReplicaSetConfig)
+ << "errmsg"
+ << "newer config"
+ << "configVersion" << 100));
+
+ ASSERT_EQUALS(Status(ErrorCodes::InvalidReplicaSetConfig, "invalid config"), reporter->join());
+ assertReporterDone();
+}
+
+TEST_F(ReporterTest, InvalidReplicaSetResponseWithoutConfigVersionOnSyncTargetStopsTheReporter) {
+ processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::InvalidReplicaSetConfig)
+ << "errmsg"
+ << "invalid config"));
+
+ ASSERT_EQUALS(Status(ErrorCodes::InvalidReplicaSetConfig, "invalid config"), reporter->join());
+ assertReporterDone();
+}
+
+TEST_F(ReporterTest, InvalidReplicaSetResponseWithSameConfigVersionOnSyncTargetStopsTheReporter) {
+ processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::InvalidReplicaSetConfig)
+ << "errmsg"
+ << "invalid config"
+ << "configVersion" << posUpdater->getConfigVersion()));
+
+ ASSERT_EQUALS(Status(ErrorCodes::InvalidReplicaSetConfig, "invalid config"), reporter->join());
+ assertReporterDone();
+}
+
+TEST_F(
+ ReporterTest,
+ InvalidReplicaSetResponseWithNewerConfigVersionOnSyncTargetToAnNewCommandStyleRequestDoesNotStopTheReporter) {
+ // Reporter should not retry update command on sync source immediately after seeing newer
+ // configuration.
ASSERT_OK(reporter->trigger());
+ ASSERT_TRUE(reporter->isWaitingToSendReport());
+
+ processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::InvalidReplicaSetConfig)
+ << "errmsg"
+ << "newer config"
+ << "configVersion" << posUpdater->getConfigVersion() + 1));
+
ASSERT_TRUE(reporter->isActive());
- ASSERT_TRUE(reporter->willRunAgain());
+}
- auto net = getNet();
- net->enterNetwork();
- scheduleNetworkResponse(ErrorCodes::NoSuchKey, "waaaah");
- net->runReadyNetworkOperations();
- ASSERT_FALSE(net->hasReadyRequests());
- net->exitNetwork();
+TEST_F(
+ ReporterTest,
+ InvalidReplicaSetResponseWithNewerConfigVersionOnSyncTargetToAnOldCommandStyleRequestDoesNotStopTheReporter) {
+ auto expectedNewStyleCommandRequest = unittest::assertGet(prepareReplSetUpdatePositionCommandFn(
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle));
- ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->getStatus());
- ASSERT_FALSE(reporter->willRunAgain());
- ASSERT_FALSE(reporter->isActive());
+ auto commandRequest =
+ processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::BadValue) << "errmsg"
+ << "Unexpected field durableOpTime in UpdateInfoArgs"),
+ true);
+ ASSERT_EQUALS(expectedNewStyleCommandRequest, commandRequest);
+
+ // Update command object should match old style (pre-3.2.4).
+ auto expectedOldStyleCommandRequest = unittest::assertGet(prepareReplSetUpdatePositionCommandFn(
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle));
+
+ commandRequest = processNetworkResponse(
+ BSON("ok" << 0 << "code" << int(ErrorCodes::InvalidReplicaSetConfig) << "errmsg"
+ << "newer config"
+ << "configVersion" << posUpdater->getConfigVersion() + 1));
+ ASSERT_EQUALS(expectedOldStyleCommandRequest, commandRequest);
+
+ ASSERT_TRUE(reporter->isActive());
}
-// Schedule while we are already scheduled, it should set willRunAgain, then automatically
+// Schedule while we are already scheduled, it should set "isWaitingToSendReport", then
+// automatically
// schedule itself after finishing.
-TEST_F(ReporterTest, DoubleScheduleShouldCauseRescheduleImmediatelyAfterRespondedTo) {
- posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1));
+TEST_F(
+ ReporterTest,
+ TriggeringReporterOnceWhileFirstCommandRequestIsInProgressCausesSecondCommandRequestToBeSentImmediatelyAfterFirstResponseReturns) {
+ // Second trigger (first time in setUp).
ASSERT_OK(reporter->trigger());
ASSERT_TRUE(reporter->isActive());
- ASSERT_FALSE(reporter->willRunAgain());
- ASSERT_OK(reporter->trigger());
- ASSERT_TRUE(reporter->isActive());
- ASSERT_TRUE(reporter->willRunAgain());
+ ASSERT_TRUE(reporter->isWaitingToSendReport());
- auto net = getNet();
- net->enterNetwork();
- scheduleNetworkResponse(BSON("ok" << 1));
- net->runReadyNetworkOperations();
- ASSERT_TRUE(net->hasReadyRequests());
- net->exitNetwork();
+ processNetworkResponse(BSON("ok" << 1), true);
ASSERT_TRUE(reporter->isActive());
- ASSERT_FALSE(reporter->willRunAgain());
+ ASSERT_FALSE(reporter->isWaitingToSendReport());
- net->enterNetwork();
- scheduleNetworkResponse(BSON("ok" << 1));
- net->runReadyNetworkOperations();
- ASSERT_FALSE(net->hasReadyRequests());
- net->exitNetwork();
+ processNetworkResponse(BSON("ok" << 1));
- ASSERT_FALSE(reporter->isActive());
- ASSERT_FALSE(reporter->willRunAgain());
+ ASSERT_TRUE(reporter->isActive());
+ ASSERT_FALSE(reporter->isWaitingToSendReport());
}
-// Schedule multiple times while we are already scheduled, it should set willRunAgain,
+// Schedule multiple times while we are already scheduled, it should set "isWaitingToSendReport",
// then automatically schedule itself after finishing, but not a third time since the latter
// two will contain the same batch of updates.
-TEST_F(ReporterTest, TripleScheduleShouldCauseRescheduleImmediatelyAfterRespondedToOnlyOnce) {
- posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1));
+TEST_F(
+ ReporterTest,
+ TriggeringReporterTwiceWhileFirstCommandRequestIsInProgressCausesSecondCommandRequestToBeSentImmediatelyAfterFirstResponseReturns) {
+ // Second trigger (first time in setUp).
ASSERT_OK(reporter->trigger());
ASSERT_TRUE(reporter->isActive());
- ASSERT_FALSE(reporter->willRunAgain());
+ ASSERT_TRUE(reporter->isWaitingToSendReport());
+
+ // Third trigger.
ASSERT_OK(reporter->trigger());
ASSERT_TRUE(reporter->isActive());
- ASSERT_TRUE(reporter->willRunAgain());
+ ASSERT_TRUE(reporter->isWaitingToSendReport());
+
+ processNetworkResponse(BSON("ok" << 1), true);
+
+ ASSERT_TRUE(reporter->isActive());
+ ASSERT_FALSE(reporter->isWaitingToSendReport());
+
+ processNetworkResponse(BSON("ok" << 1));
+
+ ASSERT_TRUE(reporter->isActive());
+ ASSERT_FALSE(reporter->isWaitingToSendReport());
+}
+
+TEST_F(ReporterTest, ShuttingReporterDownWhileFirstCommandRequestIsInProgressStopsTheReporter) {
ASSERT_OK(reporter->trigger());
ASSERT_TRUE(reporter->isActive());
- ASSERT_TRUE(reporter->willRunAgain());
+ ASSERT_TRUE(reporter->isWaitingToSendReport());
+
+ reporter->shutdown();
auto net = getNet();
net->enterNetwork();
- scheduleNetworkResponse(BSON("ok" << 1));
net->runReadyNetworkOperations();
- ASSERT_TRUE(net->hasReadyRequests());
net->exitNetwork();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->join());
+ assertReporterDone();
+}
+
+TEST_F(ReporterTest, ShuttingReporterDownWhileSecondCommandRequestIsInProgressStopsTheReporter) {
+ ASSERT_OK(reporter->trigger());
+ ASSERT_TRUE(reporter->isActive());
+ ASSERT_TRUE(reporter->isWaitingToSendReport());
+
+ processNetworkResponse(BSON("ok" << 1), true);
+
ASSERT_TRUE(reporter->isActive());
- ASSERT_FALSE(reporter->willRunAgain());
+ ASSERT_FALSE(reporter->isWaitingToSendReport());
+ reporter->shutdown();
+ auto net = getNet();
net->enterNetwork();
- scheduleNetworkResponse(BSON("ok" << 1));
net->runReadyNetworkOperations();
ASSERT_FALSE(net->hasReadyRequests());
net->exitNetwork();
- ASSERT_FALSE(reporter->isActive());
- ASSERT_FALSE(reporter->willRunAgain());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->join());
+ assertReporterDone();
}
-TEST_F(ReporterTest, CancelWhileScheduled) {
- posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1));
+TEST_F(ReporterTestNoTriggerAtSetUp, CommandPreparationFailureStopsTheReporter) {
+ Status expectedStatus(ErrorCodes::UnknownError, "unknown error");
+ prepareReplSetUpdatePositionCommandFn =
+ [expectedStatus](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle)
+ -> StatusWith<BSONObj> { return expectedStatus; };
ASSERT_OK(reporter->trigger());
- ASSERT_TRUE(reporter->isActive());
- ASSERT_FALSE(reporter->willRunAgain());
+
+ ASSERT_EQUALS(expectedStatus, reporter->join());
+ assertReporterDone();
+}
+
+TEST_F(ReporterTest, CommandPreparationFailureDuringRescheduleStopsTheReporter) {
ASSERT_OK(reporter->trigger());
ASSERT_TRUE(reporter->isActive());
- ASSERT_TRUE(reporter->willRunAgain());
+ ASSERT_TRUE(reporter->isWaitingToSendReport());
- reporter->cancel();
+ runReadyScheduledTasks();
- auto net = getNet();
- net->enterNetwork();
- net->runReadyNetworkOperations();
- ASSERT_FALSE(net->hasReadyRequests());
- net->exitNetwork();
+ // This will cause command preparation to fail for the subsequent request.
+ Status expectedStatus(ErrorCodes::UnknownError, "unknown error");
+ prepareReplSetUpdatePositionCommandFn =
+ [expectedStatus](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle)
+ -> StatusWith<BSONObj> { return expectedStatus; };
+
+ processNetworkResponse(BSON("ok" << 1));
+ ASSERT_EQUALS(expectedStatus, reporter->join());
+ assertReporterDone();
+}
+
+// If a remote server (most likely running with version before 3.2.4) returns ErrorCodes::BadValue
+// on a new style replSetUpdateCommand command object, we should regenerate the command with
+// pre-3.2.4 style arguments and retry the remote command.
+TEST_F(ReporterTest,
+ BadValueErrorOnNewStyleCommandShouldCauseRescheduleImmediatelyWithOldStyleCommand) {
+ runReadyScheduledTasks();
+
+ auto expectedNewStyleCommandRequest = unittest::assertGet(prepareReplSetUpdatePositionCommandFn(
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle));
+
+ auto commandRequest =
+ processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::BadValue) << "errmsg"
+ << "Unexpected field durableOpTime in UpdateInfoArgs"),
+ true);
+ ASSERT_EQUALS(expectedNewStyleCommandRequest, commandRequest);
+
+ auto expectedOldStyleCommandRequest = unittest::assertGet(prepareReplSetUpdatePositionCommandFn(
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle));
+
+ commandRequest = processNetworkResponse(BSON("ok" << 1));
+
+ // Update command object should match old style (pre-3.2.2).
+ ASSERT_NOT_EQUALS(expectedNewStyleCommandRequest, expectedOldStyleCommandRequest);
+ ASSERT_EQUALS(expectedOldStyleCommandRequest, commandRequest);
+
+ reporter->shutdown();
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->join());
+ assertReporterDone();
+}
+
+TEST_F(ReporterTest, FailedUpdateShouldNotRescheduleUpdate) {
+ processNetworkResponse(ErrorCodes::OperationFailed, "update failed");
+
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, reporter->join());
+ assertReporterDone();
+}
+
+TEST_F(ReporterTest, SuccessfulUpdateShouldRescheduleUpdate) {
+ processNetworkResponse(BSON("ok" << 1));
+
+ auto until = getExecutor().now() + reporter->getKeepAliveInterval();
+ ASSERT_EQUALS(until, reporter->getKeepAliveTimeoutWhen_forTest());
+ ASSERT_TRUE(reporter->isActive());
+
+ runUntil(until, true);
+
+ processNetworkResponse(ErrorCodes::OperationFailed, "update failed");
+
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, reporter->join());
+ assertReporterDone();
+}
+
+TEST_F(ReporterTest, ShutdownWhileKeepAliveTimeoutIsScheduledShouldMakeReporterInactive) {
+ processNetworkResponse(BSON("ok" << 1));
+
+ auto until = getExecutor().now() + reporter->getKeepAliveInterval();
+ ASSERT_EQUALS(until, reporter->getKeepAliveTimeoutWhen_forTest());
+ ASSERT_TRUE(reporter->isActive());
+
+ reporter->shutdown();
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->join());
ASSERT_FALSE(reporter->isActive());
- ASSERT_FALSE(reporter->willRunAgain());
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->getStatus());
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->trigger());
+ runUntil(until);
}
-TEST_F(ReporterTest, CancelAfterFirstReturns) {
- posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1));
+TEST_F(ReporterTestNoTriggerAtSetUp,
+ FailingToSchedulePrepareCommandTaskShouldMakeReporterInactive) {
+ class TaskExecutorWithFailureInScheduleWork : public unittest::TaskExecutorProxy {
+ public:
+ TaskExecutorWithFailureInScheduleWork(executor::TaskExecutor* executor)
+ : unittest::TaskExecutorProxy(executor) {}
+ virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWork(
+ const CallbackFn& work) override {
+ return Status(ErrorCodes::OperationFailed, "failed to schedule work");
+ }
+ };
+
+ TaskExecutorWithFailureInScheduleWork badExecutor(&getExecutor());
+ _executorProxy->setExecutor(&badExecutor);
+
+ auto status = reporter->trigger();
+
+ _executorProxy->setExecutor(&getExecutor());
+
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, reporter->join());
+ ASSERT_FALSE(reporter->isActive());
+}
+
+TEST_F(ReporterTestNoTriggerAtSetUp, FailingToScheduleRemoteCommandTaskShouldMakeReporterInactive) {
+ class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy {
+ public:
+ TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor)
+ : unittest::TaskExecutorProxy(executor) {}
+ virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand(
+ const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb) override {
+ // Any error status other than ShutdownInProgress will cause the reporter to fassert.
+ return Status(ErrorCodes::ShutdownInProgress,
+ "failed to send remote command - shutdown in progress");
+ }
+ };
+
+ TaskExecutorWithFailureInScheduleRemoteCommand badExecutor(&getExecutor());
+ _executorProxy->setExecutor(&badExecutor);
+
+ ASSERT_OK(reporter->trigger());
+
+ // Run callback to prepare command and attempt to send command to sync source.
+ runReadyScheduledTasks();
+
+ _executorProxy->setExecutor(&getExecutor());
+
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, reporter->join());
+ ASSERT_FALSE(reporter->isActive());
+}
+
+TEST_F(ReporterTest, FailingToScheduleTimeoutShouldMakeReporterInactive) {
+ class TaskExecutorWithFailureInScheduleWorkAt : public unittest::TaskExecutorProxy {
+ public:
+ TaskExecutorWithFailureInScheduleWorkAt(executor::TaskExecutor* executor)
+ : unittest::TaskExecutorProxy(executor) {}
+ virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWorkAt(
+ Date_t when, const CallbackFn& work) override {
+ return Status(ErrorCodes::OperationFailed, "failed to schedule work");
+ }
+ };
+
+ TaskExecutorWithFailureInScheduleWorkAt badExecutor(&getExecutor());
+ _executorProxy->setExecutor(&badExecutor);
+
+ processNetworkResponse(BSON("ok" << 1));
+
+ _executorProxy->setExecutor(&getExecutor());
+
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, reporter->join());
+ assertReporterDone();
+}
+
+TEST_F(ReporterTest, KeepAliveTimeoutFailingToScheduleRemoteCommandShouldMakeReporterInactive) {
+ processNetworkResponse(BSON("ok" << 1));
+
+ auto until = getExecutor().now() + reporter->getKeepAliveInterval();
+ ASSERT_EQUALS(until, reporter->getKeepAliveTimeoutWhen_forTest());
+ ASSERT_TRUE(reporter->isActive());
+
+ Status expectedStatus(ErrorCodes::UnknownError, "failed to prepare update command");
+ prepareReplSetUpdatePositionCommandFn =
+ [expectedStatus](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle)
+ -> StatusWith<BSONObj> { return expectedStatus; };
+
+ runUntil(until);
+
+ ASSERT_EQUALS(expectedStatus, reporter->join());
+ assertReporterDone();
+}
+
+TEST_F(ReporterTest,
+ TriggerBeforeKeepAliveTimeoutShouldCancelExistingTimeoutAndSendUpdateImmediately) {
+ processNetworkResponse(BSON("ok" << 1));
+
+ auto keepAliveTimeoutWhen = getExecutor().now() + reporter->getKeepAliveInterval();
+
+ ASSERT_EQUALS(keepAliveTimeoutWhen, reporter->getKeepAliveTimeoutWhen_forTest());
+ ASSERT_TRUE(reporter->isActive());
+
+ auto until = keepAliveTimeoutWhen - reporter->getKeepAliveInterval() / 2;
+ runUntil(until);
+
ASSERT_OK(reporter->trigger());
+
+ // '_keepAliveTimeoutWhen' is reset by trigger() not by the canceled callback.
+ ASSERT_EQUALS(Date_t(), reporter->getKeepAliveTimeoutWhen_forTest());
ASSERT_TRUE(reporter->isActive());
- ASSERT_FALSE(reporter->willRunAgain());
+
+ processNetworkResponse(BSON("ok" << 1));
+
+ keepAliveTimeoutWhen = getExecutor().now() + reporter->getKeepAliveInterval();
+
+ // A new keep alive timeout should be scheduled.
+ ASSERT_EQUALS(keepAliveTimeoutWhen, reporter->getKeepAliveTimeoutWhen_forTest());
+ ASSERT_TRUE(reporter->isActive());
+
+ reporter->shutdown();
+
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->join());
+ assertReporterDone();
+}
+
+TEST_F(ReporterTest, ShutdownImmediatelyAfterTriggerWhileKeepAliveTimeoutIsScheduledShouldSucceed) {
+ processNetworkResponse(BSON("ok" << 1));
+
+ auto keepAliveTimeoutWhen = getExecutor().now() + reporter->getKeepAliveInterval();
+ ASSERT_EQUALS(keepAliveTimeoutWhen, reporter->getKeepAliveTimeoutWhen_forTest());
+ ASSERT_TRUE(reporter->isActive());
+
+ auto until = keepAliveTimeoutWhen - reporter->getKeepAliveInterval() / 2;
+ runUntil(until);
+
ASSERT_OK(reporter->trigger());
+
+ // '_keepAliveTimeoutWhen' is reset by trigger() not by the canceled callback.
+ ASSERT_EQUALS(Date_t(), reporter->getKeepAliveTimeoutWhen_forTest());
ASSERT_TRUE(reporter->isActive());
- ASSERT_TRUE(reporter->willRunAgain());
auto net = getNet();
net->enterNetwork();
- scheduleNetworkResponse(BSON("ok" << 1));
- net->runReadyNetworkOperations();
ASSERT_TRUE(net->hasReadyRequests());
net->exitNetwork();
- ASSERT_TRUE(reporter->isActive());
- ASSERT_FALSE(reporter->willRunAgain());
+ reporter->shutdown();
- reporter->cancel();
net->enterNetwork();
- net->runReadyNetworkOperations();
ASSERT_FALSE(net->hasReadyRequests());
+ // Executor should invoke reporter callback with a ErrorCodes::CallbackCanceled status.
+ net->runReadyNetworkOperations();
net->exitNetwork();
- ASSERT_FALSE(reporter->isActive());
- ASSERT_FALSE(reporter->willRunAgain());
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->getStatus());
-
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->trigger());
-}
-
-TEST_F(ReporterTest, ProgressManagerFails) {
- posUpdater->setResult(false);
- ASSERT_EQUALS(ErrorCodes::NodeNotFound, reporter->trigger().code());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->join());
+ assertReporterDone();
}
} // namespace
diff --git a/src/mongo/unittest/SConscript b/src/mongo/unittest/SConscript
index dddde95a386..b28e6a00e71 100644
--- a/src/mongo/unittest/SConscript
+++ b/src/mongo/unittest/SConscript
@@ -47,3 +47,13 @@ env.Library(
'$BUILD_DIR/mongo/base',
],
)
+
+env.Library(
+ target='task_executor_proxy',
+ source=[
+ 'task_executor_proxy.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
+ ],
+)
diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp
new file mode 100644
index 00000000000..0168e0a752d
--- /dev/null
+++ b/src/mongo/unittest/task_executor_proxy.cpp
@@ -0,0 +1,113 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/unittest/task_executor_proxy.h"
+
+namespace mongo {
+namespace unittest {
+
+TaskExecutorProxy::TaskExecutorProxy(executor::TaskExecutor* executor) : _executor(executor) {}
+
+TaskExecutorProxy::~TaskExecutorProxy() = default;
+
+executor::TaskExecutor* TaskExecutorProxy::getExecutor() const {
+ return _executor;
+}
+
+void TaskExecutorProxy::setExecutor(executor::TaskExecutor* executor) {
+ _executor = executor;
+}
+
+void TaskExecutorProxy::startup() {
+ _executor->startup();
+}
+
+void TaskExecutorProxy::shutdown() {
+ _executor->shutdown();
+}
+
+void TaskExecutorProxy::join() {
+ _executor->join();
+}
+
+std::string TaskExecutorProxy::getDiagnosticString() {
+ return _executor->getDiagnosticString();
+}
+
+Date_t TaskExecutorProxy::now() {
+ return _executor->now();
+}
+
+StatusWith<executor::TaskExecutor::EventHandle> TaskExecutorProxy::makeEvent() {
+ return _executor->makeEvent();
+}
+
+void TaskExecutorProxy::signalEvent(const EventHandle& event) {
+ _executor->signalEvent(event);
+}
+
+StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::onEvent(
+ const EventHandle& event, const CallbackFn& work) {
+ return _executor->onEvent(event, work);
+}
+
+void TaskExecutorProxy::waitForEvent(const EventHandle& event) {
+ _executor->waitForEvent(event);
+}
+
+StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWork(
+ const CallbackFn& work) {
+ return _executor->scheduleWork(work);
+}
+
+StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWorkAt(
+ Date_t when, const CallbackFn& work) {
+ return _executor->scheduleWorkAt(when, work);
+}
+
+StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleRemoteCommand(
+ const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) {
+ return _executor->scheduleRemoteCommand(request, cb);
+}
+
+void TaskExecutorProxy::cancel(const CallbackHandle& cbHandle) {
+ _executor->cancel(cbHandle);
+}
+
+void TaskExecutorProxy::wait(const CallbackHandle& cbHandle) {
+ _executor->wait(cbHandle);
+}
+
+void TaskExecutorProxy::appendConnectionStats(executor::ConnectionPoolStats* stats) const {
+ _executor->appendConnectionStats(stats);
+}
+
+} // namespace unittest
+} // namespace mongo
diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h
new file mode 100644
index 00000000000..f5c9ee63013
--- /dev/null
+++ b/src/mongo/unittest/task_executor_proxy.h
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/executor/task_executor.h"
+
+namespace mongo {
+namespace unittest {
+
+/**
+ * Proxy for the executor::TaskExecutor interface used for testing.
+ */
+class TaskExecutorProxy : public executor::TaskExecutor {
+ MONGO_DISALLOW_COPYING(TaskExecutorProxy);
+
+public:
+ /**
+ * Does not own target executor.
+ */
+ TaskExecutorProxy(executor::TaskExecutor* executor);
+ virtual ~TaskExecutorProxy();
+
+ executor::TaskExecutor* getExecutor() const;
+ void setExecutor(executor::TaskExecutor* executor);
+
+ virtual void startup() override;
+ virtual void shutdown() override;
+ virtual void join() override;
+ virtual std::string getDiagnosticString() override;
+ virtual Date_t now() override;
+ virtual StatusWith<EventHandle> makeEvent() override;
+ virtual void signalEvent(const EventHandle& event) override;
+ virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event,
+ const CallbackFn& work) override;
+ virtual void waitForEvent(const EventHandle& event) override;
+ virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override;
+ virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override;
+ virtual StatusWith<CallbackHandle> scheduleRemoteCommand(
+ const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) override;
+ virtual void cancel(const CallbackHandle& cbHandle) override;
+ virtual void wait(const CallbackHandle& cbHandle) override;
+ virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
+
+private:
+ // Not owned by us.
+ executor::TaskExecutor* _executor;
+};
+
+} // namespace unittest
+} // namespace mongo