diff options
-rw-r--r-- | src/mongo/db/repl/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/reporter.cpp | 365 | ||||
-rw-r--r-- | src/mongo/db/repl/reporter.h | 131 | ||||
-rw-r--r-- | src/mongo/db/repl/reporter_test.cpp | 720 | ||||
-rw-r--r-- | src/mongo/unittest/SConscript | 10 | ||||
-rw-r--r-- | src/mongo/unittest/task_executor_proxy.cpp | 113 | ||||
-rw-r--r-- | src/mongo/unittest/task_executor_proxy.h | 77 |
10 files changed, 1213 insertions, 251 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index c6a5250be46..5d38684ffdf 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -517,6 +517,7 @@ env.Library( 'reporter.cpp', ], LIBDEPS=[ + 'replica_set_messages', 'replication_executor', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/namespace_string', @@ -530,6 +531,7 @@ env.CppUnitTest( LIBDEPS=[ 'reporter', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', + '$BUILD_DIR/mongo/unittest/task_executor_proxy', ], ) diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 3b8580fbdaa..a01ac6e1b5f 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -719,9 +719,9 @@ TimestampStatus DataReplicator::initialSync() { _setState_inlock(DataReplicatorState::InitialSync); - // The reporter is paused for the duration of the initial sync, so cancel just in case. + // The reporter is paused for the duration of the initial sync, so shut down just in case. if (_reporter) { - _reporter->cancel(); + _reporter->shutdown(); } _reporterPaused = true; _applierPaused = true; @@ -892,7 +892,7 @@ void DataReplicator::_cancelAllHandles_inlock() { if (_applier) _applier->cancel(); if (_reporter) - _reporter->cancel(); + _reporter->shutdown(); if (_initialSyncState && _initialSyncState->dbsCloner.isActive()) _initialSyncState->dbsCloner.cancel(); } @@ -903,7 +903,7 @@ void DataReplicator::_waitOnAll_inlock() { if (_applier) _applier->wait(); if (_reporter) - _reporter->wait(); + _reporter->join(); if (_initialSyncState) _initialSyncState->dbsCloner.wait(); } @@ -1017,9 +1017,11 @@ void DataReplicator::_doNextActions_Steady_inlock() { _scheduleApplyBatch_inlock(); } - if (!_reporterPaused && (!_reporter || !_reporter->getStatus().isOK())) { - _reporter.reset( - new Reporter(_exec, _opts.prepareReplSetUpdatePositionCommandFn, _syncSource)); + // TODO(benety): Initialize from replica set config election timeout / 2. + Milliseconds keepAliveInterval(1000); + if (!_reporterPaused && (!_reporter || !_reporter->isActive()) && !_syncSource.empty()) { + _reporter.reset(new Reporter( + _exec, _opts.prepareReplSetUpdatePositionCommandFn, _syncSource, keepAliveInterval)); } } diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index f4d2cc61a9d..e0a4005acd5 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -190,9 +190,9 @@ protected: return _rollbackFn(txn, lastOpTimeWritten, syncSource); }; - options.prepareReplSetUpdatePositionCommandFn = []() -> StatusWith<BSONObj> { - return BSON(UpdatePositionArgs::kCommandFieldName << 1); - }; + options.prepareReplSetUpdatePositionCommandFn = + [](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) + -> StatusWith<BSONObj> { return BSON(UpdatePositionArgs::kCommandFieldName << 1); }; options.getMyLastOptime = [this]() { return _myLastOpTime; }; options.setMyLastOptime = [this](const OpTime& opTime) { _setMyLastOptime(opTime); }; options.setFollowerMode = [this](const MemberState& state) { @@ -1006,15 +1006,20 @@ TEST_F(SteadyStateTest, ApplyOneOperation) { // Ensure that we send position information upstream after completing batch. net->enterNetwork(); - ASSERT_TRUE(net->hasReadyRequests()); - { + bool found = false; + while (net->hasReadyRequests()) { auto networkRequest = net->getNextReadyRequest(); auto commandRequest = networkRequest->getRequest(); - ASSERT_EQUALS("admin", commandRequest.dbname); const auto& cmdObj = commandRequest.cmdObj; - ASSERT_EQUALS(std::string(UpdatePositionArgs::kCommandFieldName), - cmdObj.firstElementFieldName()); + if (str::equals(cmdObj.firstElementFieldName(), UpdatePositionArgs::kCommandFieldName) && + commandRequest.dbname == "admin") { + found = true; + break; + } else { + net->blackHole(networkRequest); + } } + ASSERT_TRUE(found); } } // namespace diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index addf7fb36b7..aa64f41ac7e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -184,10 +184,11 @@ DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCo options.applierFn = [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); }; options.rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&) { return Status::OK(); }; - options.prepareReplSetUpdatePositionCommandFn = [replCoord]() -> StatusWith<BSONObj> { - return replCoord->prepareReplSetUpdatePositionCommand( - ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle); - }; + options.prepareReplSetUpdatePositionCommandFn = + [replCoord](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) + -> StatusWith<BSONObj> { + return replCoord->prepareReplSetUpdatePositionCommand(commandStyle); + }; options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); }; options.setMyLastOptime = [replCoord](const OpTime& opTime) { replCoord->setMyLastAppliedOpTime(opTime); }; diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp index 4a0160d6211..b9e9d9cb8e8 100644 --- a/src/mongo/db/repl/reporter.cpp +++ b/src/mongo/db/repl/reporter.cpp @@ -31,132 +31,373 @@ #include "mongo/platform/basic.h" #include "mongo/db/repl/reporter.h" + +#include "mongo/bson/util/bson_extract.h" +#include "mongo/db/repl/old_update_position_args.h" +#include "mongo/db/repl/update_position_args.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/util/assert_util.h" #include "mongo/util/log.h" namespace mongo { namespace repl { -using executor::RemoteCommandRequest; +namespace { + +const char kConfigVersionFieldName[] = "configVersion"; + +/** + * Returns configuration version in update command object. + * Returns -1 on failure. + */ +template <typename UpdatePositionArgsType> +long long _parseCommandRequestConfigVersion(const BSONObj& commandRequest) { + UpdatePositionArgsType args; + if (!args.initialize(commandRequest).isOK()) { + return -1; + } + if (args.updatesBegin() == args.updatesEnd()) { + return -1; + } + return args.updatesBegin()->cfgver; +} + +/** + * Returns true if config version in replSetUpdatePosition response is higher than config version in + * locally generated update command request object. + * Returns false if config version is missing in either document. + */ +bool _isTargetConfigNewerThanRequest( + const BSONObj& commandResult, + const BSONObj& commandRequest, + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) { + long long targetConfigVersion; + if (!bsonExtractIntegerField(commandResult, kConfigVersionFieldName, &targetConfigVersion) + .isOK()) { + return false; + } + + const long long localConfigVersion = + commandStyle == ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle + ? _parseCommandRequestConfigVersion<UpdatePositionArgs>(commandRequest) + : _parseCommandRequestConfigVersion<OldUpdatePositionArgs>(commandRequest); + if (localConfigVersion == -1) { + return false; + } + + return targetConfigVersion > localConfigVersion; +} + +} // namespace Reporter::Reporter(executor::TaskExecutor* executor, PrepareReplSetUpdatePositionCommandFn prepareReplSetUpdatePositionCommandFn, - const HostAndPort& target) + const HostAndPort& target, + Milliseconds keepAliveInterval) : _executor(executor), _prepareReplSetUpdatePositionCommandFn(prepareReplSetUpdatePositionCommandFn), _target(target), - _status(Status::OK()), - _willRunAgain(false), - _active(false) { + _keepAliveInterval(keepAliveInterval) { uassert(ErrorCodes::BadValue, "null task executor", executor); uassert(ErrorCodes::BadValue, "null function to create replSetUpdatePosition command object", prepareReplSetUpdatePositionCommandFn); uassert(ErrorCodes::BadValue, "target name cannot be empty", !target.empty()); + uassert(ErrorCodes::BadValue, + "keep alive interval must be positive", + keepAliveInterval > Milliseconds(0)); } Reporter::~Reporter() { - DESTRUCTOR_GUARD(cancel();); + DESTRUCTOR_GUARD(shutdown(); join();); } -void Reporter::cancel() { +HostAndPort Reporter::getTarget() const { stdx::lock_guard<stdx::mutex> lk(_mutex); + return _target; +} + +Milliseconds Reporter::getKeepAliveInterval() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _keepAliveInterval; +} + +void Reporter::shutdown() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + _status = Status(ErrorCodes::CallbackCanceled, "Reporter no longer valid"); - if (!_active) { + if (!_isActive_inlock()) { return; } - _status = Status(ErrorCodes::CallbackCanceled, "Reporter no longer valid"); - _willRunAgain = false; - invariant(_remoteCommandCallbackHandle.isValid()); - _executor->cancel(_remoteCommandCallbackHandle); -} + _isWaitingToSendReporter = false; -void Reporter::wait() { executor::TaskExecutor::CallbackHandle handle; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (!_active) { - return; - } - if (!_remoteCommandCallbackHandle.isValid()) { - return; - } + if (_remoteCommandCallbackHandle.isValid()) { + invariant(!_prepareAndSendCommandCallbackHandle.isValid()); handle = _remoteCommandCallbackHandle; + } else { + invariant(!_remoteCommandCallbackHandle.isValid()); + invariant(_prepareAndSendCommandCallbackHandle.isValid()); + handle = _prepareAndSendCommandCallbackHandle; } - _executor->wait(handle); + + _executor->cancel(handle); +} + +Status Reporter::join() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _condition.wait(lk, [this]() { return !_isActive_inlock(); }); + return _status; } Status Reporter::trigger() { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _schedule_inlock(); -} -Status Reporter::_schedule_inlock() { + // If these was a previous error then the reporter is dead and return that error. if (!_status.isOK()) { return _status; } - if (_active) { - _willRunAgain = true; + if (_keepAliveTimeoutWhen != Date_t()) { + // Reset keep alive expiration to signal handler that it was canceled internally. + invariant(_prepareAndSendCommandCallbackHandle.isValid()); + _keepAliveTimeoutWhen = Date_t(); + _executor->cancel(_prepareAndSendCommandCallbackHandle); + return Status::OK(); + } else if (_isActive_inlock()) { + _isWaitingToSendReporter = true; + return Status::OK(); + } + + auto scheduleResult = _executor->scheduleWork( + stdx::bind(&Reporter::_prepareAndSendCommandCallback, this, stdx::placeholders::_1, true)); + + _status = scheduleResult.getStatus(); + if (!_status.isOK()) { + LOG(2) << "Reporter failed to schedule callback to prepare and send update command: " + << _status; return _status; } - LOG(2) << "Reporter scheduling report to : " << _target; + _prepareAndSendCommandCallbackHandle = scheduleResult.getValue(); - auto prepareResult = _prepareReplSetUpdatePositionCommandFn(); + return _status; +} - if (!prepareResult.isOK()) { - // Returning NodeNotFound because currently this is the only way - // prepareReplSetUpdatePositionCommand() can fail in production. - return Status(ErrorCodes::NodeNotFound, - "Reporter failed to create replSetUpdatePositionCommand command."); - } - auto cmdObj = prepareResult.getValue(); - StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult = - _executor->scheduleRemoteCommand( - RemoteCommandRequest(_target, "admin", cmdObj), - stdx::bind(&Reporter::_callback, this, stdx::placeholders::_1)); - - if (!scheduleResult.isOK()) { - _status = scheduleResult.getStatus(); - LOG(2) << "Reporter failed to schedule with status: " << _status; +StatusWith<BSONObj> Reporter::_prepareCommand() { + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle = + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_isActive_inlock()); + if (!_status.isOK()) { + return _status; + } + + commandStyle = _commandStyle; + } + + auto prepareResult = _prepareReplSetUpdatePositionCommandFn(commandStyle); + + stdx::lock_guard<stdx::mutex> lk(_mutex); + + // Reporter could have been canceled while preparing the command. + if (!_status.isOK()) { + return _status; + } + // If there was an error in preparing the command, abort and return that error. + if (!prepareResult.isOK()) { + LOG(2) << "Reporter failed to prepare update command with status: " + << prepareResult.getStatus(); + _status = prepareResult.getStatus(); return _status; } - _active = true; - _willRunAgain = false; + return prepareResult.getValue(); +} + +void Reporter::_sendCommand_inlock(BSONObj commandRequest) { + LOG(2) << "Reporter sending slave oplog progress to upstream updater " << _target << ": " + << commandRequest; + + auto scheduleResult = _executor->scheduleRemoteCommand( + executor::RemoteCommandRequest(_target, "admin", commandRequest), + stdx::bind(&Reporter::_processResponseCallback, this, stdx::placeholders::_1)); + + _status = scheduleResult.getStatus(); + if (!_status.isOK()) { + LOG(2) << "Reporter failed to schedule with status: " << _status; + if (_status != ErrorCodes::ShutdownInProgress) { + fassert(34434, _status); + } + return; + } + _remoteCommandCallbackHandle = scheduleResult.getValue(); - return Status::OK(); } -void Reporter::_callback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd) { - stdx::lock_guard<stdx::mutex> lk(_mutex); +void Reporter::_processResponseCallback( + const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd) { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); - _status = rcbd.response.getStatus(); - _active = false; + // If the reporter was shut down before this callback is invoked, + // return the canceled "_status". + if (!_status.isOK()) { + invariant(_status == ErrorCodes::CallbackCanceled); + _onShutdown_inlock(); + return; + } - LOG(2) << "Reporter ended with status: " << _status << " after reporting to " << _target; - if (_status.isOK() && _willRunAgain) { - _schedule_inlock(); - } else { - _willRunAgain = false; + _status = rcbd.response.getStatus(); + if (!_status.isOK()) { + _onShutdown_inlock(); + return; + } + + // Override _status with the one embedded in the command result. + const auto& commandResult = rcbd.response.getValue().data; + _status = getStatusFromCommandResult(commandResult); + + // Some error types are OK and should not cause the reporter to stop sending updates to the + // sync target. + if (_status == ErrorCodes::BadValue && + _commandStyle == ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle) { + LOG(1) << "Reporter falling back to old style UpdatePosition command for sync source: " + << _target; + _status = Status::OK(); + _commandStyle = ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle; + _isWaitingToSendReporter = true; + } else if (_status == ErrorCodes::InvalidReplicaSetConfig && + _isTargetConfigNewerThanRequest( + commandResult, rcbd.request.cmdObj, _commandStyle)) { + LOG(1) << "Reporter found newer configuration on sync source: " << _target + << ". Retrying."; + _status = Status::OK(); + // Do not resend update command immediately. + _isWaitingToSendReporter = false; + } else if (!_status.isOK()) { + _onShutdown_inlock(); + return; + } + + if (!_isWaitingToSendReporter) { + // Since we are also on a timer, schedule a report for that interval, or until + // triggered. + auto when = _executor->now() + _keepAliveInterval; + bool fromTrigger = false; + auto scheduleResult = + _executor->scheduleWorkAt(when, + stdx::bind(&Reporter::_prepareAndSendCommandCallback, + this, + stdx::placeholders::_1, + fromTrigger)); + + _status = scheduleResult.getStatus(); + if (!_status.isOK()) { + _onShutdown_inlock(); + return; + } + + _prepareAndSendCommandCallbackHandle = scheduleResult.getValue(); + _keepAliveTimeoutWhen = when; + + _remoteCommandCallbackHandle = executor::TaskExecutor::CallbackHandle(); + return; + } + } + + // Must call without holding the lock. + auto prepareResult = _prepareCommand(); + + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (!_status.isOK()) { + _onShutdown_inlock(); + return; + } + + _sendCommand_inlock(prepareResult.getValue()); + if (!_status.isOK()) { + _onShutdown_inlock(); + return; } + + invariant(_remoteCommandCallbackHandle.isValid()); + _isWaitingToSendReporter = false; } -Status Reporter::getStatus() const { +void Reporter::_prepareAndSendCommandCallback(const executor::TaskExecutor::CallbackArgs& args, + bool fromTrigger) { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (!_status.isOK()) { + _onShutdown_inlock(); + return; + } + + _status = args.status; + + // Ignore CallbackCanceled status if keep alive was canceled by triggered. + if (!fromTrigger && _status == ErrorCodes::CallbackCanceled && + _keepAliveTimeoutWhen == Date_t()) { + _status = Status::OK(); + } + + if (!_status.isOK()) { + _onShutdown_inlock(); + return; + } + } + + // Must call without holding the lock. + auto prepareResult = _prepareCommand(); + stdx::lock_guard<stdx::mutex> lk(_mutex); - return _status; + if (!_status.isOK()) { + _onShutdown_inlock(); + return; + } + + _sendCommand_inlock(prepareResult.getValue()); + if (!_status.isOK()) { + _onShutdown_inlock(); + return; + } + + invariant(_remoteCommandCallbackHandle.isValid()); + _prepareAndSendCommandCallbackHandle = executor::TaskExecutor::CallbackHandle(); + _keepAliveTimeoutWhen = Date_t(); +} + +void Reporter::_onShutdown_inlock() { + _isWaitingToSendReporter = false; + _remoteCommandCallbackHandle = executor::TaskExecutor::CallbackHandle(); + _prepareAndSendCommandCallbackHandle = executor::TaskExecutor::CallbackHandle(); + _keepAliveTimeoutWhen = Date_t(); + _condition.notify_all(); } bool Reporter::isActive() const { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; + return _isActive_inlock(); +} + +bool Reporter::_isActive_inlock() const { + return _remoteCommandCallbackHandle.isValid() || _prepareAndSendCommandCallbackHandle.isValid(); +} + +bool Reporter::isWaitingToSendReport() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _isWaitingToSendReporter; } -bool Reporter::willRunAgain() const { +Date_t Reporter::getKeepAliveTimeoutWhen_forTest() const { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _willRunAgain; + return _keepAliveTimeoutWhen; } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/reporter.h b/src/mongo/db/repl/reporter.h index e9cc9c06196..6082cd8b70c 100644 --- a/src/mongo/db/repl/reporter.h +++ b/src/mongo/db/repl/reporter.h @@ -31,12 +31,36 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/db/jsobj.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/executor/task_executor.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/time_support.h" namespace mongo { namespace repl { +/** + * Once scheduled, the reporter will periodically send the current replication progress, obtained + * by invoking "_prepareReplSetUpdatePositionCommandFn", to its sync source until it encounters + * an error. + * + * If the sync source cannot accept the current format (used by server versions 3.2.4 and above) of + * the "replSetUpdatePosition" command, the reporter will not abort and instead downgrade the format + * of the command it will send upstream. + * + * While the reporter is active, it will be in one of three states: + * 1) triggered and waiting to send command to sync source as soon as possible. + * 2) waiting for command response from sync source. + * 3) waiting for at least "_keepAliveInterval" ms before sending command to sync source. + * + * Calling trigger() while the reporter is in state 1 or 2 will cause the reporter to immediately + * send a new command upon receiving a successful command response. + * + * Calling trigger() while it is in state 3 sends a command upstream and cancels the current + * keep alive timeout, resetting the keep alive schedule. + */ class Reporter { MONGO_DISALLOW_COPYING(Reporter); @@ -47,36 +71,51 @@ public: * * The returned status indicates whether or not the command was created. */ - using PrepareReplSetUpdatePositionCommandFn = stdx::function<StatusWith<BSONObj>()>; + using PrepareReplSetUpdatePositionCommandFn = stdx::function<StatusWith<BSONObj>( + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle)>; Reporter(executor::TaskExecutor* executor, PrepareReplSetUpdatePositionCommandFn prepareReplSetUpdatePositionCommandFn, - const HostAndPort& target); + const HostAndPort& target, + Milliseconds keepAliveInterval); + virtual ~Reporter(); /** + * Returns sync target. + */ + HostAndPort getTarget() const; + + /** + * Returns keep alive interval. + * Reporter will periodically send replication status to sync source every "_keepAliveInterval" + * until an error occurs. + */ + Milliseconds getKeepAliveInterval() const; + + /** * Returns true if a remote command has been scheduled (but not completed) * with the executor. */ bool isActive() const; /** - * Returns true if a remote command should be scheduled once the current one returns - * from the executor. + * Returns true if new data is available while a remote command is in progress. + * The reporter will schedule a subsequent remote update immediately upon successful + * completion of the previous command instead of when the keep alive callback runs. */ - bool willRunAgain() const; + bool isWaitingToSendReport() const; /** - * Cancels remote command request. + * Cancels both scheduled and active remote command requests. * Returns immediately if the Reporter is not active. */ - void cancel(); + void shutdown(); /** - * Waits for last/current executor handle to finish. - * Returns immediately if the handle is invalid. + * Waits until Reporter is inactive and returns reporter status. */ - void wait(); + Status join(); /** * Signals to the Reporter that there is new information to be sent to the "_target" server. @@ -84,46 +123,84 @@ public: */ Status trigger(); + // ================== Test support API =================== + /** - * Returns the previous return status so that the owner can decide whether the Reporter - * needs a new target to whom it can report. + * Returns scheduled time of keep alive timeout handler. */ - Status getStatus() const; + Date_t getKeepAliveTimeoutWhen_forTest() const; private: /** - * Schedules remote command to be run by the executor + * Returns true if reporter is active. */ - Status _schedule_inlock(); + bool _isActive_inlock() const; /** - * Callback for remote command. + * Prepares remote command to be run by the executor. */ - void _callback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd); + StatusWith<BSONObj> _prepareCommand(); + + /** + * Schedules remote command to be run by the executor. + */ + void _sendCommand_inlock(BSONObj commandRequest); + + /** + * Callback for processing response from remote command. + */ + void _processResponseCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd); + + /** + * Callback for preparing and sending remote command. + */ + void _prepareAndSendCommandCallback(const executor::TaskExecutor::CallbackArgs& args, + bool fromTrigger); + + /** + * Signals end of Reporter work and notifies waiters. + */ + void _onShutdown_inlock(); // Not owned by us. - executor::TaskExecutor* _executor; + executor::TaskExecutor* const _executor; // Prepares update command object. - PrepareReplSetUpdatePositionCommandFn _prepareReplSetUpdatePositionCommandFn; + const PrepareReplSetUpdatePositionCommandFn _prepareReplSetUpdatePositionCommandFn; // Host to whom the Reporter sends updates. - HostAndPort _target; + const HostAndPort _target; - // Protects member data of this Reporter. + // Reporter will send updates every "_keepAliveInterval" ms until the reporter is canceled or + // encounters an error. + const Milliseconds _keepAliveInterval; + + // Protects member data of this Reporter declared below. mutable stdx::mutex _mutex; + mutable stdx::condition_variable _condition; + // Stores the most recent Status returned from the executor. - Status _status; + Status _status = Status::OK(); - // _willRunAgain is true when Reporter is scheduled to be run by the executor and subsequent - // updates have come in. - bool _willRunAgain; - // _active is true when Reporter is scheduled to be run by the executor. - bool _active; + // Stores style of the most recent update command object. + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle _commandStyle = + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle; + + // _isWaitingToSendReporter is true when Reporter is scheduled to be run by the executor and + // subsequent updates have come in. + bool _isWaitingToSendReporter = false; // Callback handle to the scheduled remote command. executor::TaskExecutor::CallbackHandle _remoteCommandCallbackHandle; + + // Callback handle to the scheduled task for preparing and sending the remote command. + executor::TaskExecutor::CallbackHandle _prepareAndSendCommandCallbackHandle; + + // Keep alive timeout callback will not run before this time. + // If this date is Date_t(), the callback is either unscheduled or canceled. + // Used for testing only. + Date_t _keepAliveTimeoutWhen; }; } // namespace repl diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index 448e70472b9..3d6b8b81b43 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -28,11 +28,14 @@ #include "mongo/platform/basic.h" +#include "mongo/db/repl/old_update_position_args.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/reporter.h" +#include "mongo/db/repl/update_position_args.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/executor/network_interface_mock.h" - +#include "mongo/stdx/memory.h" +#include "mongo/unittest/task_executor_proxy.h" #include "mongo/unittest/unittest.h" namespace { @@ -46,27 +49,49 @@ using executor::RemoteCommandResponse; class MockProgressManager { public: void updateMap(int memberId, const OpTime& lastDurableOpTime, const OpTime& lastAppliedOpTime) { - progressMap[memberId] = ProgressInfo(lastDurableOpTime, lastAppliedOpTime); + _progressMap[memberId] = ProgressInfo(lastDurableOpTime, lastAppliedOpTime); } - void setResult(bool newResult) { - _result = newResult; + void clear() { + _progressMap.clear(); } - bool prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) { - if (!_result) { - return _result; - } - cmdBuilder->append("replSetUpdatePosition", 1); - BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes")); - for (auto&& itr : progressMap) { + long long getConfigVersion() const { + return _configVersion; + } + + void setConfigVersion(long long configVersion) { + _configVersion = configVersion; + } + + StatusWith<BSONObj> prepareReplSetUpdatePositionCommand( + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) { + BSONObjBuilder cmdBuilder; + cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1); + BSONArrayBuilder arrayBuilder( + cmdBuilder.subarrayStart(UpdatePositionArgs::kUpdateArrayFieldName)); + for (auto&& itr : _progressMap) { BSONObjBuilder entry(arrayBuilder.subobjStart()); - itr.second.lastDurableOpTime.append(&entry, "durableOpTime"); - itr.second.lastAppliedOpTime.append(&entry, "appliedOpTime"); - entry.append("memberId", itr.first); - entry.append("cfgver", 1); + switch (commandStyle) { + case ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle: + itr.second.lastDurableOpTime.append( + &entry, UpdatePositionArgs::kDurableOpTimeFieldName); + itr.second.lastAppliedOpTime.append( + &entry, UpdatePositionArgs::kAppliedOpTimeFieldName); + break; + case ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle: + // Assume protocol version 1. + itr.second.lastDurableOpTime.append(&entry, + OldUpdatePositionArgs::kOpTimeFieldName); + break; + } + entry.append(UpdatePositionArgs::kMemberIdFieldName, itr.first); + if (_configVersion != -1) { + entry.append(UpdatePositionArgs::kConfigVersionFieldName, _configVersion); + } } - return true; + arrayBuilder.done(); + return cmdBuilder.obj(); } private: @@ -81,266 +106,675 @@ private: OpTime lastAppliedOpTime; }; - std::map<int, ProgressInfo> progressMap; - bool _result = true; + std::map<int, ProgressInfo> _progressMap; + long long _configVersion = 1; }; class ReporterTest : public executor::ThreadPoolExecutorTest { public: ReporterTest(); - void scheduleNetworkResponse(const BSONObj& obj); - void scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason); + + /** + * Schedules response to the current network request. + * Returns command object in the network request. + */ + BSONObj scheduleNetworkResponse(const BSONObj& obj); + BSONObj scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason); + + /** + * Schedules network response and instructs network interface to process response. + * Returns command object in the network request. + */ + BSONObj processNetworkResponse(const BSONObj& obj, + bool expectReadyRequestsAfterProcessing = false); + BSONObj processNetworkResponse(ErrorCodes::Error code, + const std::string& reason, + bool expectReadyRequestsAfterProcessing = false); + + void runUntil(Date_t when, bool expectReadyRequestsAfterAdvancingClock = false); + + void runReadyScheduledTasks(); + + void assertReporterDone(); protected: void setUp() override; void tearDown() override; - std::unique_ptr<Reporter> reporter; +private: + virtual bool triggerAtSetUp() const; + +protected: + std::unique_ptr<unittest::TaskExecutorProxy> _executorProxy; std::unique_ptr<MockProgressManager> posUpdater; Reporter::PrepareReplSetUpdatePositionCommandFn prepareReplSetUpdatePositionCommandFn; + std::unique_ptr<Reporter> reporter; +}; + +class ReporterTestNoTriggerAtSetUp : public ReporterTest { +private: + virtual bool triggerAtSetUp() const override; }; ReporterTest::ReporterTest() {} void ReporterTest::setUp() { executor::ThreadPoolExecutorTest::setUp(); - posUpdater.reset(new MockProgressManager()); - prepareReplSetUpdatePositionCommandFn = [this]() -> StatusWith<BSONObj> { - BSONObjBuilder bob; - if (posUpdater->prepareReplSetUpdatePositionCommand(&bob)) { - return bob.obj(); - } - return Status(ErrorCodes::OperationFailed, - "unable to prepare replSetUpdatePosition command object"); - }; - reporter.reset(new Reporter(&getExecutor(), - [this]() { return prepareReplSetUpdatePositionCommandFn(); }, - HostAndPort("h1"))); + + _executorProxy = stdx::make_unique<unittest::TaskExecutorProxy>(&getExecutor()); + + posUpdater = stdx::make_unique<MockProgressManager>(); + posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1)); + + prepareReplSetUpdatePositionCommandFn = + stdx::bind(&MockProgressManager::prepareReplSetUpdatePositionCommand, + posUpdater.get(), + stdx::placeholders::_1); + + reporter = stdx::make_unique<Reporter>( + _executorProxy.get(), + [this](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) { + return prepareReplSetUpdatePositionCommandFn(commandStyle); + }, + HostAndPort("h1"), + Milliseconds(1000)); launchExecutorThread(); + + if (triggerAtSetUp()) { + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + } else { + ASSERT_FALSE(reporter->isActive()); + } + ASSERT_FALSE(reporter->isWaitingToSendReport()); } void ReporterTest::tearDown() { executor::ThreadPoolExecutorTest::tearDown(); // Executor may still invoke reporter's callback before shutting down. - posUpdater.reset(); reporter.reset(); + posUpdater.reset(); + prepareReplSetUpdatePositionCommandFn = Reporter::PrepareReplSetUpdatePositionCommandFn(); + _executorProxy.reset(); } -void ReporterTest::scheduleNetworkResponse(const BSONObj& obj) { +bool ReporterTest::triggerAtSetUp() const { + return true; +} + +bool ReporterTestNoTriggerAtSetUp::triggerAtSetUp() const { + return false; +} + +BSONObj ReporterTest::scheduleNetworkResponse(const BSONObj& obj) { auto net = getNet(); ASSERT_TRUE(net->hasReadyRequests()); Milliseconds millis(0); RemoteCommandResponse response(obj, BSONObj(), millis); executor::TaskExecutor::ResponseStatus responseStatus(response); - net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); + auto noi = net->getNextReadyRequest(); + net->scheduleResponse(noi, net->now(), responseStatus); + return noi->getRequest().cmdObj; } -void ReporterTest::scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason) { +BSONObj ReporterTest::scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason) { auto net = getNet(); ASSERT_TRUE(net->hasReadyRequests()); executor::TaskExecutor::ResponseStatus responseStatus(code, reason); - net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); + auto noi = net->getNextReadyRequest(); + net->scheduleResponse(noi, net->now(), responseStatus); + return noi->getRequest().cmdObj; +} + +BSONObj ReporterTest::processNetworkResponse(const BSONObj& obj, + bool expectReadyRequestsAfterProcessing) { + auto net = getNet(); + net->enterNetwork(); + auto cmdObj = scheduleNetworkResponse(obj); + net->runReadyNetworkOperations(); + ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests()); + net->exitNetwork(); + return cmdObj; +} + +BSONObj ReporterTest::processNetworkResponse(ErrorCodes::Error code, + const std::string& reason, + bool expectReadyRequestsAfterProcessing) { + auto net = getNet(); + net->enterNetwork(); + auto cmdObj = scheduleNetworkResponse(code, reason); + net->runReadyNetworkOperations(); + ASSERT_EQUALS(expectReadyRequestsAfterProcessing, net->hasReadyRequests()); + net->exitNetwork(); + return cmdObj; } -TEST_F(ReporterTest, InvalidConstruction) { +void ReporterTest::runUntil(Date_t until, bool expectReadyRequestsAfterAdvancingClock) { + auto net = getNet(); + net->enterNetwork(); + ASSERT_EQUALS(until, net->runUntil(until)); + ASSERT_EQUALS(expectReadyRequestsAfterAdvancingClock, net->hasReadyRequests()); + net->exitNetwork(); +} + +void ReporterTest::runReadyScheduledTasks() { + auto net = getNet(); + net->enterNetwork(); + net->exitNetwork(); +} + +void ReporterTest::assertReporterDone() { + ASSERT_FALSE(reporter->isActive()); + ASSERT_FALSE(reporter->isWaitingToSendReport()); + ASSERT_EQUALS(Date_t(), reporter->getKeepAliveTimeoutWhen_forTest()); + ASSERT_EQUALS(reporter->join(), reporter->trigger()); +} + +TEST_F(ReporterTestNoTriggerAtSetUp, InvalidConstruction) { // null PrepareReplSetUpdatePositionCommandFn ASSERT_THROWS(Reporter(&getExecutor(), Reporter::PrepareReplSetUpdatePositionCommandFn(), - HostAndPort("h1")), + HostAndPort("h1"), + Milliseconds(1000)), UserException); // null TaskExecutor - ASSERT_THROWS(Reporter(nullptr, prepareReplSetUpdatePositionCommandFn, HostAndPort("h1")), - UserException); + ASSERT_THROWS_WHAT( + Reporter( + nullptr, prepareReplSetUpdatePositionCommandFn, HostAndPort("h1"), Milliseconds(1000)), + UserException, + "null task executor"); + + // null PrepareReplSetUpdatePositionCommandFn + ASSERT_THROWS_WHAT(Reporter(&getExecutor(), + Reporter::PrepareReplSetUpdatePositionCommandFn(), + HostAndPort("h1"), + Milliseconds(1000)), + UserException, + "null function to create replSetUpdatePosition command object"); // empty HostAndPort - ASSERT_THROWS(Reporter(&getExecutor(), prepareReplSetUpdatePositionCommandFn, HostAndPort()), - UserException); + ASSERT_THROWS_WHAT(Reporter(&getExecutor(), + prepareReplSetUpdatePositionCommandFn, + HostAndPort(), + Milliseconds(1000)), + UserException, + "target name cannot be empty"); + + // zero keep alive interval. + ASSERT_THROWS_WHAT( + Reporter( + &getExecutor(), prepareReplSetUpdatePositionCommandFn, HostAndPort("h1"), Seconds(-1)), + UserException, + "keep alive interval must be positive"); + + // negative keep alive interval. + ASSERT_THROWS_WHAT( + Reporter( + &getExecutor(), prepareReplSetUpdatePositionCommandFn, HostAndPort("h1"), Seconds(-1)), + UserException, + "keep alive interval must be positive"); +} + +TEST_F(ReporterTestNoTriggerAtSetUp, GetTarget) { + ASSERT_EQUALS(HostAndPort("h1"), reporter->getTarget()); } -TEST_F(ReporterTest, IsActiveOnceScheduled) { +TEST_F(ReporterTestNoTriggerAtSetUp, IsActiveOnceScheduled) { ASSERT_FALSE(reporter->isActive()); ASSERT_OK(reporter->trigger()); ASSERT_TRUE(reporter->isActive()); } -TEST_F(ReporterTest, CancelWithoutScheduled) { - ASSERT_FALSE(reporter->isActive()); - reporter->cancel(); +TEST_F(ReporterTestNoTriggerAtSetUp, ShutdownWithoutScheduledStopsTheReporter) { ASSERT_FALSE(reporter->isActive()); + reporter->shutdown(); + Status expectedStatus(ErrorCodes::CallbackCanceled, "Reporter no longer valid"); + ASSERT_EQUALS(expectedStatus, reporter->join()); + assertReporterDone(); } -TEST_F(ReporterTest, ShutdownBeforeSchedule) { +TEST_F(ReporterTestNoTriggerAtSetUp, + ShuttingExecutorDownBeforeActivatingReporterPreventsTheReporterFromStarting) { getExecutor().shutdown(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, reporter->trigger()); - ASSERT_FALSE(reporter->isActive()); + assertReporterDone(); } -// If an error is returned, it should be recorded in the Reporter and be returned when triggered -TEST_F(ReporterTest, ErrorsAreStoredInTheReporter) { - posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1)); +// If an error is returned, it should be recorded in the Reporter and not run again. +TEST_F(ReporterTest, TaskExecutorAndNetworkErrorsStopTheReporter) { ASSERT_OK(reporter->trigger()); ASSERT_TRUE(reporter->isActive()); + ASSERT_TRUE(reporter->isWaitingToSendReport()); - auto net = getNet(); - net->enterNetwork(); - scheduleNetworkResponse(ErrorCodes::NoSuchKey, "waaaah"); - net->runReadyNetworkOperations(); - ASSERT_FALSE(net->hasReadyRequests()); - net->exitNetwork(); + processNetworkResponse(ErrorCodes::NoSuchKey, "waaaah"); - ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->getStatus()); - ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->trigger()); - ASSERT_FALSE(reporter->isActive()); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->join()); + assertReporterDone(); } -// If an error is returned, it should be recorded in the Reporter and not run again. -TEST_F(ReporterTest, ErrorsStopTheReporter) { - posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1)); +TEST_F(ReporterTest, UnsuccessfulCommandResponseStopsTheReporter) { + processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::UnknownError) << "errmsg" + << "unknown error")); + + ASSERT_EQUALS(Status(ErrorCodes::UnknownError, "unknown error"), reporter->join()); + assertReporterDone(); +} + +TEST_F(ReporterTestNoTriggerAtSetUp, + InvalidReplicaSetResponseToARequestWithoutConfigVersionStopsTheReporter) { + posUpdater->setConfigVersion(-1); ASSERT_OK(reporter->trigger()); ASSERT_TRUE(reporter->isActive()); - ASSERT_FALSE(reporter->willRunAgain()); + + processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::InvalidReplicaSetConfig) + << "errmsg" + << "newer config" + << "configVersion" << 100)); + + ASSERT_EQUALS(Status(ErrorCodes::InvalidReplicaSetConfig, "invalid config"), reporter->join()); + assertReporterDone(); +} + +TEST_F(ReporterTest, InvalidReplicaSetResponseWithoutConfigVersionOnSyncTargetStopsTheReporter) { + processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::InvalidReplicaSetConfig) + << "errmsg" + << "invalid config")); + + ASSERT_EQUALS(Status(ErrorCodes::InvalidReplicaSetConfig, "invalid config"), reporter->join()); + assertReporterDone(); +} + +TEST_F(ReporterTest, InvalidReplicaSetResponseWithSameConfigVersionOnSyncTargetStopsTheReporter) { + processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::InvalidReplicaSetConfig) + << "errmsg" + << "invalid config" + << "configVersion" << posUpdater->getConfigVersion())); + + ASSERT_EQUALS(Status(ErrorCodes::InvalidReplicaSetConfig, "invalid config"), reporter->join()); + assertReporterDone(); +} + +TEST_F( + ReporterTest, + InvalidReplicaSetResponseWithNewerConfigVersionOnSyncTargetToAnNewCommandStyleRequestDoesNotStopTheReporter) { + // Reporter should not retry update command on sync source immediately after seeing newer + // configuration. ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isWaitingToSendReport()); + + processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::InvalidReplicaSetConfig) + << "errmsg" + << "newer config" + << "configVersion" << posUpdater->getConfigVersion() + 1)); + ASSERT_TRUE(reporter->isActive()); - ASSERT_TRUE(reporter->willRunAgain()); +} - auto net = getNet(); - net->enterNetwork(); - scheduleNetworkResponse(ErrorCodes::NoSuchKey, "waaaah"); - net->runReadyNetworkOperations(); - ASSERT_FALSE(net->hasReadyRequests()); - net->exitNetwork(); +TEST_F( + ReporterTest, + InvalidReplicaSetResponseWithNewerConfigVersionOnSyncTargetToAnOldCommandStyleRequestDoesNotStopTheReporter) { + auto expectedNewStyleCommandRequest = unittest::assertGet(prepareReplSetUpdatePositionCommandFn( + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle)); - ASSERT_EQUALS(ErrorCodes::NoSuchKey, reporter->getStatus()); - ASSERT_FALSE(reporter->willRunAgain()); - ASSERT_FALSE(reporter->isActive()); + auto commandRequest = + processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::BadValue) << "errmsg" + << "Unexpected field durableOpTime in UpdateInfoArgs"), + true); + ASSERT_EQUALS(expectedNewStyleCommandRequest, commandRequest); + + // Update command object should match old style (pre-3.2.4). + auto expectedOldStyleCommandRequest = unittest::assertGet(prepareReplSetUpdatePositionCommandFn( + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle)); + + commandRequest = processNetworkResponse( + BSON("ok" << 0 << "code" << int(ErrorCodes::InvalidReplicaSetConfig) << "errmsg" + << "newer config" + << "configVersion" << posUpdater->getConfigVersion() + 1)); + ASSERT_EQUALS(expectedOldStyleCommandRequest, commandRequest); + + ASSERT_TRUE(reporter->isActive()); } -// Schedule while we are already scheduled, it should set willRunAgain, then automatically +// Schedule while we are already scheduled, it should set "isWaitingToSendReport", then +// automatically // schedule itself after finishing. -TEST_F(ReporterTest, DoubleScheduleShouldCauseRescheduleImmediatelyAfterRespondedTo) { - posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1)); +TEST_F( + ReporterTest, + TriggeringReporterOnceWhileFirstCommandRequestIsInProgressCausesSecondCommandRequestToBeSentImmediatelyAfterFirstResponseReturns) { + // Second trigger (first time in setUp). ASSERT_OK(reporter->trigger()); ASSERT_TRUE(reporter->isActive()); - ASSERT_FALSE(reporter->willRunAgain()); - ASSERT_OK(reporter->trigger()); - ASSERT_TRUE(reporter->isActive()); - ASSERT_TRUE(reporter->willRunAgain()); + ASSERT_TRUE(reporter->isWaitingToSendReport()); - auto net = getNet(); - net->enterNetwork(); - scheduleNetworkResponse(BSON("ok" << 1)); - net->runReadyNetworkOperations(); - ASSERT_TRUE(net->hasReadyRequests()); - net->exitNetwork(); + processNetworkResponse(BSON("ok" << 1), true); ASSERT_TRUE(reporter->isActive()); - ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_FALSE(reporter->isWaitingToSendReport()); - net->enterNetwork(); - scheduleNetworkResponse(BSON("ok" << 1)); - net->runReadyNetworkOperations(); - ASSERT_FALSE(net->hasReadyRequests()); - net->exitNetwork(); + processNetworkResponse(BSON("ok" << 1)); - ASSERT_FALSE(reporter->isActive()); - ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_FALSE(reporter->isWaitingToSendReport()); } -// Schedule multiple times while we are already scheduled, it should set willRunAgain, +// Schedule multiple times while we are already scheduled, it should set "isWaitingToSendReport", // then automatically schedule itself after finishing, but not a third time since the latter // two will contain the same batch of updates. -TEST_F(ReporterTest, TripleScheduleShouldCauseRescheduleImmediatelyAfterRespondedToOnlyOnce) { - posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1)); +TEST_F( + ReporterTest, + TriggeringReporterTwiceWhileFirstCommandRequestIsInProgressCausesSecondCommandRequestToBeSentImmediatelyAfterFirstResponseReturns) { + // Second trigger (first time in setUp). ASSERT_OK(reporter->trigger()); ASSERT_TRUE(reporter->isActive()); - ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_TRUE(reporter->isWaitingToSendReport()); + + // Third trigger. ASSERT_OK(reporter->trigger()); ASSERT_TRUE(reporter->isActive()); - ASSERT_TRUE(reporter->willRunAgain()); + ASSERT_TRUE(reporter->isWaitingToSendReport()); + + processNetworkResponse(BSON("ok" << 1), true); + + ASSERT_TRUE(reporter->isActive()); + ASSERT_FALSE(reporter->isWaitingToSendReport()); + + processNetworkResponse(BSON("ok" << 1)); + + ASSERT_TRUE(reporter->isActive()); + ASSERT_FALSE(reporter->isWaitingToSendReport()); +} + +TEST_F(ReporterTest, ShuttingReporterDownWhileFirstCommandRequestIsInProgressStopsTheReporter) { ASSERT_OK(reporter->trigger()); ASSERT_TRUE(reporter->isActive()); - ASSERT_TRUE(reporter->willRunAgain()); + ASSERT_TRUE(reporter->isWaitingToSendReport()); + + reporter->shutdown(); auto net = getNet(); net->enterNetwork(); - scheduleNetworkResponse(BSON("ok" << 1)); net->runReadyNetworkOperations(); - ASSERT_TRUE(net->hasReadyRequests()); net->exitNetwork(); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->join()); + assertReporterDone(); +} + +TEST_F(ReporterTest, ShuttingReporterDownWhileSecondCommandRequestIsInProgressStopsTheReporter) { + ASSERT_OK(reporter->trigger()); + ASSERT_TRUE(reporter->isActive()); + ASSERT_TRUE(reporter->isWaitingToSendReport()); + + processNetworkResponse(BSON("ok" << 1), true); + ASSERT_TRUE(reporter->isActive()); - ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_FALSE(reporter->isWaitingToSendReport()); + reporter->shutdown(); + auto net = getNet(); net->enterNetwork(); - scheduleNetworkResponse(BSON("ok" << 1)); net->runReadyNetworkOperations(); ASSERT_FALSE(net->hasReadyRequests()); net->exitNetwork(); - ASSERT_FALSE(reporter->isActive()); - ASSERT_FALSE(reporter->willRunAgain()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->join()); + assertReporterDone(); } -TEST_F(ReporterTest, CancelWhileScheduled) { - posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1)); +TEST_F(ReporterTestNoTriggerAtSetUp, CommandPreparationFailureStopsTheReporter) { + Status expectedStatus(ErrorCodes::UnknownError, "unknown error"); + prepareReplSetUpdatePositionCommandFn = + [expectedStatus](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) + -> StatusWith<BSONObj> { return expectedStatus; }; ASSERT_OK(reporter->trigger()); - ASSERT_TRUE(reporter->isActive()); - ASSERT_FALSE(reporter->willRunAgain()); + + ASSERT_EQUALS(expectedStatus, reporter->join()); + assertReporterDone(); +} + +TEST_F(ReporterTest, CommandPreparationFailureDuringRescheduleStopsTheReporter) { ASSERT_OK(reporter->trigger()); ASSERT_TRUE(reporter->isActive()); - ASSERT_TRUE(reporter->willRunAgain()); + ASSERT_TRUE(reporter->isWaitingToSendReport()); - reporter->cancel(); + runReadyScheduledTasks(); - auto net = getNet(); - net->enterNetwork(); - net->runReadyNetworkOperations(); - ASSERT_FALSE(net->hasReadyRequests()); - net->exitNetwork(); + // This will cause command preparation to fail for the subsequent request. + Status expectedStatus(ErrorCodes::UnknownError, "unknown error"); + prepareReplSetUpdatePositionCommandFn = + [expectedStatus](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) + -> StatusWith<BSONObj> { return expectedStatus; }; + + processNetworkResponse(BSON("ok" << 1)); + ASSERT_EQUALS(expectedStatus, reporter->join()); + assertReporterDone(); +} + +// If a remote server (most likely running with version before 3.2.4) returns ErrorCodes::BadValue +// on a new style replSetUpdateCommand command object, we should regenerate the command with +// pre-3.2.4 style arguments and retry the remote command. +TEST_F(ReporterTest, + BadValueErrorOnNewStyleCommandShouldCauseRescheduleImmediatelyWithOldStyleCommand) { + runReadyScheduledTasks(); + + auto expectedNewStyleCommandRequest = unittest::assertGet(prepareReplSetUpdatePositionCommandFn( + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle)); + + auto commandRequest = + processNetworkResponse(BSON("ok" << 0 << "code" << int(ErrorCodes::BadValue) << "errmsg" + << "Unexpected field durableOpTime in UpdateInfoArgs"), + true); + ASSERT_EQUALS(expectedNewStyleCommandRequest, commandRequest); + + auto expectedOldStyleCommandRequest = unittest::assertGet(prepareReplSetUpdatePositionCommandFn( + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kOldStyle)); + + commandRequest = processNetworkResponse(BSON("ok" << 1)); + + // Update command object should match old style (pre-3.2.2). + ASSERT_NOT_EQUALS(expectedNewStyleCommandRequest, expectedOldStyleCommandRequest); + ASSERT_EQUALS(expectedOldStyleCommandRequest, commandRequest); + + reporter->shutdown(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->join()); + assertReporterDone(); +} + +TEST_F(ReporterTest, FailedUpdateShouldNotRescheduleUpdate) { + processNetworkResponse(ErrorCodes::OperationFailed, "update failed"); + + ASSERT_EQUALS(ErrorCodes::OperationFailed, reporter->join()); + assertReporterDone(); +} + +TEST_F(ReporterTest, SuccessfulUpdateShouldRescheduleUpdate) { + processNetworkResponse(BSON("ok" << 1)); + + auto until = getExecutor().now() + reporter->getKeepAliveInterval(); + ASSERT_EQUALS(until, reporter->getKeepAliveTimeoutWhen_forTest()); + ASSERT_TRUE(reporter->isActive()); + + runUntil(until, true); + + processNetworkResponse(ErrorCodes::OperationFailed, "update failed"); + + ASSERT_EQUALS(ErrorCodes::OperationFailed, reporter->join()); + assertReporterDone(); +} + +TEST_F(ReporterTest, ShutdownWhileKeepAliveTimeoutIsScheduledShouldMakeReporterInactive) { + processNetworkResponse(BSON("ok" << 1)); + + auto until = getExecutor().now() + reporter->getKeepAliveInterval(); + ASSERT_EQUALS(until, reporter->getKeepAliveTimeoutWhen_forTest()); + ASSERT_TRUE(reporter->isActive()); + + reporter->shutdown(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->join()); ASSERT_FALSE(reporter->isActive()); - ASSERT_FALSE(reporter->willRunAgain()); - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->getStatus()); - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->trigger()); + runUntil(until); } -TEST_F(ReporterTest, CancelAfterFirstReturns) { - posUpdater->updateMap(0, OpTime({3, 0}, 1), OpTime({3, 0}, 1)); +TEST_F(ReporterTestNoTriggerAtSetUp, + FailingToSchedulePrepareCommandTaskShouldMakeReporterInactive) { + class TaskExecutorWithFailureInScheduleWork : public unittest::TaskExecutorProxy { + public: + TaskExecutorWithFailureInScheduleWork(executor::TaskExecutor* executor) + : unittest::TaskExecutorProxy(executor) {} + virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWork( + const CallbackFn& work) override { + return Status(ErrorCodes::OperationFailed, "failed to schedule work"); + } + }; + + TaskExecutorWithFailureInScheduleWork badExecutor(&getExecutor()); + _executorProxy->setExecutor(&badExecutor); + + auto status = reporter->trigger(); + + _executorProxy->setExecutor(&getExecutor()); + + ASSERT_EQUALS(ErrorCodes::OperationFailed, status); + ASSERT_EQUALS(ErrorCodes::OperationFailed, reporter->join()); + ASSERT_FALSE(reporter->isActive()); +} + +TEST_F(ReporterTestNoTriggerAtSetUp, FailingToScheduleRemoteCommandTaskShouldMakeReporterInactive) { + class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy { + public: + TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor) + : unittest::TaskExecutorProxy(executor) {} + virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand( + const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) override { + // Any error status other than ShutdownInProgress will cause the reporter to fassert. + return Status(ErrorCodes::ShutdownInProgress, + "failed to send remote command - shutdown in progress"); + } + }; + + TaskExecutorWithFailureInScheduleRemoteCommand badExecutor(&getExecutor()); + _executorProxy->setExecutor(&badExecutor); + + ASSERT_OK(reporter->trigger()); + + // Run callback to prepare command and attempt to send command to sync source. + runReadyScheduledTasks(); + + _executorProxy->setExecutor(&getExecutor()); + + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, reporter->join()); + ASSERT_FALSE(reporter->isActive()); +} + +TEST_F(ReporterTest, FailingToScheduleTimeoutShouldMakeReporterInactive) { + class TaskExecutorWithFailureInScheduleWorkAt : public unittest::TaskExecutorProxy { + public: + TaskExecutorWithFailureInScheduleWorkAt(executor::TaskExecutor* executor) + : unittest::TaskExecutorProxy(executor) {} + virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleWorkAt( + Date_t when, const CallbackFn& work) override { + return Status(ErrorCodes::OperationFailed, "failed to schedule work"); + } + }; + + TaskExecutorWithFailureInScheduleWorkAt badExecutor(&getExecutor()); + _executorProxy->setExecutor(&badExecutor); + + processNetworkResponse(BSON("ok" << 1)); + + _executorProxy->setExecutor(&getExecutor()); + + ASSERT_EQUALS(ErrorCodes::OperationFailed, reporter->join()); + assertReporterDone(); +} + +TEST_F(ReporterTest, KeepAliveTimeoutFailingToScheduleRemoteCommandShouldMakeReporterInactive) { + processNetworkResponse(BSON("ok" << 1)); + + auto until = getExecutor().now() + reporter->getKeepAliveInterval(); + ASSERT_EQUALS(until, reporter->getKeepAliveTimeoutWhen_forTest()); + ASSERT_TRUE(reporter->isActive()); + + Status expectedStatus(ErrorCodes::UnknownError, "failed to prepare update command"); + prepareReplSetUpdatePositionCommandFn = + [expectedStatus](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) + -> StatusWith<BSONObj> { return expectedStatus; }; + + runUntil(until); + + ASSERT_EQUALS(expectedStatus, reporter->join()); + assertReporterDone(); +} + +TEST_F(ReporterTest, + TriggerBeforeKeepAliveTimeoutShouldCancelExistingTimeoutAndSendUpdateImmediately) { + processNetworkResponse(BSON("ok" << 1)); + + auto keepAliveTimeoutWhen = getExecutor().now() + reporter->getKeepAliveInterval(); + + ASSERT_EQUALS(keepAliveTimeoutWhen, reporter->getKeepAliveTimeoutWhen_forTest()); + ASSERT_TRUE(reporter->isActive()); + + auto until = keepAliveTimeoutWhen - reporter->getKeepAliveInterval() / 2; + runUntil(until); + ASSERT_OK(reporter->trigger()); + + // '_keepAliveTimeoutWhen' is reset by trigger() not by the canceled callback. + ASSERT_EQUALS(Date_t(), reporter->getKeepAliveTimeoutWhen_forTest()); ASSERT_TRUE(reporter->isActive()); - ASSERT_FALSE(reporter->willRunAgain()); + + processNetworkResponse(BSON("ok" << 1)); + + keepAliveTimeoutWhen = getExecutor().now() + reporter->getKeepAliveInterval(); + + // A new keep alive timeout should be scheduled. + ASSERT_EQUALS(keepAliveTimeoutWhen, reporter->getKeepAliveTimeoutWhen_forTest()); + ASSERT_TRUE(reporter->isActive()); + + reporter->shutdown(); + + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->join()); + assertReporterDone(); +} + +TEST_F(ReporterTest, ShutdownImmediatelyAfterTriggerWhileKeepAliveTimeoutIsScheduledShouldSucceed) { + processNetworkResponse(BSON("ok" << 1)); + + auto keepAliveTimeoutWhen = getExecutor().now() + reporter->getKeepAliveInterval(); + ASSERT_EQUALS(keepAliveTimeoutWhen, reporter->getKeepAliveTimeoutWhen_forTest()); + ASSERT_TRUE(reporter->isActive()); + + auto until = keepAliveTimeoutWhen - reporter->getKeepAliveInterval() / 2; + runUntil(until); + ASSERT_OK(reporter->trigger()); + + // '_keepAliveTimeoutWhen' is reset by trigger() not by the canceled callback. + ASSERT_EQUALS(Date_t(), reporter->getKeepAliveTimeoutWhen_forTest()); ASSERT_TRUE(reporter->isActive()); - ASSERT_TRUE(reporter->willRunAgain()); auto net = getNet(); net->enterNetwork(); - scheduleNetworkResponse(BSON("ok" << 1)); - net->runReadyNetworkOperations(); ASSERT_TRUE(net->hasReadyRequests()); net->exitNetwork(); - ASSERT_TRUE(reporter->isActive()); - ASSERT_FALSE(reporter->willRunAgain()); + reporter->shutdown(); - reporter->cancel(); net->enterNetwork(); - net->runReadyNetworkOperations(); ASSERT_FALSE(net->hasReadyRequests()); + // Executor should invoke reporter callback with a ErrorCodes::CallbackCanceled status. + net->runReadyNetworkOperations(); net->exitNetwork(); - ASSERT_FALSE(reporter->isActive()); - ASSERT_FALSE(reporter->willRunAgain()); - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->getStatus()); - - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->trigger()); -} - -TEST_F(ReporterTest, ProgressManagerFails) { - posUpdater->setResult(false); - ASSERT_EQUALS(ErrorCodes::NodeNotFound, reporter->trigger().code()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, reporter->join()); + assertReporterDone(); } } // namespace diff --git a/src/mongo/unittest/SConscript b/src/mongo/unittest/SConscript index dddde95a386..b28e6a00e71 100644 --- a/src/mongo/unittest/SConscript +++ b/src/mongo/unittest/SConscript @@ -47,3 +47,13 @@ env.Library( '$BUILD_DIR/mongo/base', ], ) + +env.Library( + target='task_executor_proxy', + source=[ + 'task_executor_proxy.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/executor/task_executor_interface', + ], +) diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp new file mode 100644 index 00000000000..0168e0a752d --- /dev/null +++ b/src/mongo/unittest/task_executor_proxy.cpp @@ -0,0 +1,113 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/unittest/task_executor_proxy.h" + +namespace mongo { +namespace unittest { + +TaskExecutorProxy::TaskExecutorProxy(executor::TaskExecutor* executor) : _executor(executor) {} + +TaskExecutorProxy::~TaskExecutorProxy() = default; + +executor::TaskExecutor* TaskExecutorProxy::getExecutor() const { + return _executor; +} + +void TaskExecutorProxy::setExecutor(executor::TaskExecutor* executor) { + _executor = executor; +} + +void TaskExecutorProxy::startup() { + _executor->startup(); +} + +void TaskExecutorProxy::shutdown() { + _executor->shutdown(); +} + +void TaskExecutorProxy::join() { + _executor->join(); +} + +std::string TaskExecutorProxy::getDiagnosticString() { + return _executor->getDiagnosticString(); +} + +Date_t TaskExecutorProxy::now() { + return _executor->now(); +} + +StatusWith<executor::TaskExecutor::EventHandle> TaskExecutorProxy::makeEvent() { + return _executor->makeEvent(); +} + +void TaskExecutorProxy::signalEvent(const EventHandle& event) { + _executor->signalEvent(event); +} + +StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::onEvent( + const EventHandle& event, const CallbackFn& work) { + return _executor->onEvent(event, work); +} + +void TaskExecutorProxy::waitForEvent(const EventHandle& event) { + _executor->waitForEvent(event); +} + +StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWork( + const CallbackFn& work) { + return _executor->scheduleWork(work); +} + +StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleWorkAt( + Date_t when, const CallbackFn& work) { + return _executor->scheduleWorkAt(when, work); +} + +StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleRemoteCommand( + const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) { + return _executor->scheduleRemoteCommand(request, cb); +} + +void TaskExecutorProxy::cancel(const CallbackHandle& cbHandle) { + _executor->cancel(cbHandle); +} + +void TaskExecutorProxy::wait(const CallbackHandle& cbHandle) { + _executor->wait(cbHandle); +} + +void TaskExecutorProxy::appendConnectionStats(executor::ConnectionPoolStats* stats) const { + _executor->appendConnectionStats(stats); +} + +} // namespace unittest +} // namespace mongo diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h new file mode 100644 index 00000000000..f5c9ee63013 --- /dev/null +++ b/src/mongo/unittest/task_executor_proxy.h @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/executor/task_executor.h" + +namespace mongo { +namespace unittest { + +/** + * Proxy for the executor::TaskExecutor interface used for testing. + */ +class TaskExecutorProxy : public executor::TaskExecutor { + MONGO_DISALLOW_COPYING(TaskExecutorProxy); + +public: + /** + * Does not own target executor. + */ + TaskExecutorProxy(executor::TaskExecutor* executor); + virtual ~TaskExecutorProxy(); + + executor::TaskExecutor* getExecutor() const; + void setExecutor(executor::TaskExecutor* executor); + + virtual void startup() override; + virtual void shutdown() override; + virtual void join() override; + virtual std::string getDiagnosticString() override; + virtual Date_t now() override; + virtual StatusWith<EventHandle> makeEvent() override; + virtual void signalEvent(const EventHandle& event) override; + virtual StatusWith<CallbackHandle> onEvent(const EventHandle& event, + const CallbackFn& work) override; + virtual void waitForEvent(const EventHandle& event) override; + virtual StatusWith<CallbackHandle> scheduleWork(const CallbackFn& work) override; + virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, const CallbackFn& work) override; + virtual StatusWith<CallbackHandle> scheduleRemoteCommand( + const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) override; + virtual void cancel(const CallbackHandle& cbHandle) override; + virtual void wait(const CallbackHandle& cbHandle) override; + virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) const override; + +private: + // Not owned by us. + executor::TaskExecutor* _executor; +}; + +} // namespace unittest +} // namespace mongo |