summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2021-02-05 20:35:57 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-02 19:10:49 +0000
commit55805f12273876d05fe1ced4190f2fd22e955e9f (patch)
treedff281089361a43a9a55dc8d1a57e156bce37c66 /src
parent68009cfaed11d1648411fc18f7b1149614ed33c8 (diff)
downloadmongo-55805f12273876d05fe1ced4190f2fd22e955e9f.tar.gz
SERVER-54406 NetworkInterfaceMock should allow simultaneous interruption and response
(cherry picked from commit 6166a40b8da9634203cdebc91617d657453e0038)
Diffstat (limited to 'src')
-rw-r--r--src/mongo/executor/network_interface_mock.cpp281
-rw-r--r--src/mongo/executor/network_interface_mock.h118
2 files changed, 200 insertions, 199 deletions
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 513dcf78a63..748209f6e73 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -60,8 +60,6 @@ NetworkInterfaceMock::NetworkInterfaceMock()
NetworkInterfaceMock::~NetworkInterfaceMock() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
invariant(!_hasStarted || inShutdown());
- invariant(_scheduled.empty());
- invariant(_blackHoled.empty());
}
std::string NetworkInterfaceMock::getDiagnosticString() {
@@ -69,10 +67,8 @@ std::string NetworkInterfaceMock::getDiagnosticString() {
return str::stream() << "NetworkInterfaceMock -- waitingToRunMask:" << _waitingToRunMask
<< ", now:" << _now_inlock().toString() << ", hasStarted:" << _hasStarted
<< ", inShutdown: " << _inShutdown.load()
- << ", processing: " << _processing.size()
- << ", scheduled: " << _scheduled.size()
- << ", blackHoled: " << _blackHoled.size()
- << ", unscheduled: " << _unscheduled.size();
+ << ", operations: " << _operations.size()
+ << ", responses: " << _responses.size();
}
Date_t NetworkInterfaceMock::now() {
@@ -86,7 +82,7 @@ std::string NetworkInterfaceMock::getHostName() {
Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
RemoteCommandRequestOnAny& request,
- RemoteCommandCompletionFn&& onFinish,
+ RemoteCommandCompletionFn&& onResponse,
const BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceMock shutdown in progress"};
@@ -95,7 +91,10 @@ Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
stdx::lock_guard<stdx::mutex> lk(_mutex);
const Date_t now = _now_inlock();
- auto op = NetworkOperation(cbHandle, request, now, std::move(onFinish));
+
+ LOGV2(5440600, "Scheduling request", "when"_attr = now, "request"_attr = request);
+
+ auto op = NetworkOperation(cbHandle, request, now, std::move(onResponse));
// network interface mock only works with single target requests
invariant(request.target.size() == 1);
@@ -135,28 +134,18 @@ void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle, const B
stdx::lock_guard<stdx::mutex> lk(_mutex);
ResponseStatus rs(ErrorCodes::CallbackCanceled, "Network operation canceled", Milliseconds(0));
- // We mimic the real NetworkInterface by only delivering the CallbackCanceled status if the
- // operation has not already received a response (i.e., is not already in the _scheduled queue).
- std::vector<NetworkOperationList*> queuesToCheck{&_unscheduled, &_blackHoled, &_processing};
- _interruptWithResponse_inlock(cbHandle, queuesToCheck, rs);
+ _interruptWithResponse_inlock(cbHandle, rs);
}
-void NetworkInterfaceMock::_interruptWithResponse_inlock(
- const CallbackHandle& cbHandle,
- const std::vector<NetworkOperationList*>& queuesToCheck,
- const ResponseStatus& response) {
+void NetworkInterfaceMock::_interruptWithResponse_inlock(const CallbackHandle& cbHandle,
+ const ResponseStatus& response) {
auto matchFn = [&cbHandle](const auto& ops) { return ops.isForCallback(cbHandle); };
+ auto noi = std::find_if(_operations.begin(), _operations.end(), matchFn);
- for (auto list : queuesToCheck) {
- auto noi = std::find_if(list->begin(), list->end(), matchFn);
- if (noi == list->end()) {
- continue;
- }
- _scheduled.splice(_scheduled.begin(), *list, noi);
- noi->setResponse(_now_inlock(), response);
- return;
- }
+ // We've effectively observed the NetworkOperation.
+ noi->markAsProcessing();
+ _scheduleResponse_inlock(noi, _now_inlock(), response);
}
Status NetworkInterfaceMock::setAlarm(const TaskExecutor::CallbackHandle& cbHandle,
@@ -217,23 +206,24 @@ void NetworkInterfaceMock::shutdown() {
_startup_inlock();
}
_inShutdown.store(true);
- NetworkOperationList todo;
- todo.splice(todo.end(), _scheduled);
- todo.splice(todo.end(), _unscheduled);
- todo.splice(todo.end(), _processing);
- todo.splice(todo.end(), _blackHoled);
+ auto todo = std::exchange(_operations, {});
const Date_t now = _now_inlock();
_waitingToRunMask |= kExecutorThread; // Prevents network thread from scheduling.
lk.unlock();
- for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) {
- LOGV2_WARNING(22590,
- "Mock network interface shutting down with outstanding request: {request}",
- "Mock network interface shutting down with outstanding request",
- "request"_attr = iter->getRequest());
- iter->setResponse(
- now, {ErrorCodes::ShutdownInProgress, "Shutting down mock network", Milliseconds(0)});
- iter->finishResponse();
+ for (auto& op : todo) {
+ auto response = NetworkResponse{{},
+ now,
+ ResponseStatus{ErrorCodes::ShutdownInProgress,
+ "Shutting down mock network",
+ Milliseconds(0)}};
+ if (op.processResponse(std::move(response))) {
+ LOGV2_WARNING(
+ 22590,
+ "Mock network interface shutting down with outstanding request: {request}",
+ "Mock network interface shutting down with outstanding request",
+ "request"_attr = op.getRequest());
+ }
}
lk.lock();
invariant(_currentlyRunning == kExecutorThread);
@@ -274,24 +264,30 @@ bool NetworkInterfaceMock::hasReadyRequests() {
}
bool NetworkInterfaceMock::_hasReadyRequests_inlock() {
- if (_unscheduled.empty())
- return false;
- if (_unscheduled.front().getNextConsiderationDate() > _now_inlock()) {
- return false;
- }
- return true;
+ auto noi = std::find_if(
+ _operations.begin(), _operations.end(), [](auto& op) { return op.hasReadyRequest(); });
+ return noi != _operations.end();
}
NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNextReadyRequest() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
- while (!_hasReadyRequests_inlock()) {
+
+ auto findNextReadyRequest = [&] {
+ return std::find_if(
+ _operations.begin(), _operations.end(), [](auto& op) { return op.hasReadyRequest(); });
+ };
+
+ auto noi = findNextReadyRequest();
+ while (noi == _operations.end()) {
_waitingToRunMask |= kExecutorThread;
_runReadyNetworkOperations_inlock(&lk);
+
+ noi = findNextReadyRequest();
}
- invariant(_hasReadyRequests_inlock());
- _processing.splice(_processing.begin(), _unscheduled, _unscheduled.begin());
- return _processing.begin();
+ noi->markAsProcessing();
+
+ return noi;
}
NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getFrontOfUnscheduledQueue() {
@@ -305,33 +301,42 @@ NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getNthUnsch
invariant(_hasReadyRequests_inlock());
// Linear time, but it's just for testing so no big deal.
- invariant(_unscheduled.size() > n);
- auto it = _unscheduled.begin();
- std::advance(it, n);
- return it;
+ auto noi = _operations.begin();
+ for (; noi != _operations.end(); ++noi) {
+ if (noi->hasReadyRequest()) {
+ if (n == 0) {
+ return noi;
+ } else {
+ --n;
+ }
+ }
+ }
+
+ return _operations.end();
+}
+
+void NetworkInterfaceMock::_scheduleResponse_inlock(NetworkOperationIterator noi,
+ Date_t when,
+ const TaskExecutor::ResponseStatus& response) {
+ auto insertBefore = std::find_if(_responses.begin(),
+ _responses.end(),
+ [when](const auto& response) { return response.when > when; });
+
+ _responses.insert(insertBefore, NetworkResponse{noi, when, response});
+ LOGV2(5440601,
+ "Scheduling response",
+ "when"_attr = when,
+ "request"_attr = noi->getRequest(),
+ "response"_attr = response);
}
void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi,
Date_t when,
- const ResponseStatus& response) {
+ const TaskExecutor::ResponseStatus& response) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
- NetworkOperationIterator insertBefore = _scheduled.begin();
- while ((insertBefore != _scheduled.end()) && (insertBefore->getResponseDate() <= when)) {
- ++insertBefore;
- }
-
- // If no RemoteCommandResponse was returned (for example, on a simulated network error), then
- // do not attempt to run the metadata hook, since there is no returned metadata.
- if (_metadataHook && response.isOK()) {
- _metadataHook
- ->readReplyMetadata(
- noi->getRequest().opCtx, noi->getRequest().target.toString(), response.data)
- .transitional_ignore();
- }
-
- noi->setResponse(when, response);
- _scheduled.splice(insertBefore, _processing, noi);
+ noi->assertNotBlackholed();
+ _scheduleResponse_inlock(noi, when, response);
}
RemoteCommandRequest NetworkInterfaceMock::scheduleSuccessfulResponse(const BSONObj& response) {
@@ -379,22 +384,7 @@ RemoteCommandRequest NetworkInterfaceMock::scheduleErrorResponse(NetworkOperatio
void NetworkInterfaceMock::blackHole(NetworkOperationIterator noi) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
- _blackHoled.splice(_blackHoled.end(), _processing, noi);
-}
-
-void NetworkInterfaceMock::requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_currentlyRunning == kNetworkThread);
- invariant(noi->getNextConsiderationDate() < dontAskUntil);
- invariant(_now_inlock() < dontAskUntil);
- NetworkOperationIterator insertBefore = _unscheduled.begin();
- for (; insertBefore != _unscheduled.end(); ++insertBefore) {
- if (insertBefore->getNextConsiderationDate() >= dontAskUntil) {
- break;
- }
- }
- noi->setNextConsiderationDate(dontAskUntil);
- _unscheduled.splice(insertBefore, _processing, noi);
+ noi->markAsBlackholed();
}
Date_t NetworkInterfaceMock::runUntil(Date_t until) {
@@ -410,8 +400,8 @@ Date_t NetworkInterfaceMock::runUntil(Date_t until) {
if (!_alarms.empty() && _alarms.top().when < newNow) {
newNow = _alarms.top().when;
}
- if (!_scheduled.empty() && _scheduled.front().getResponseDate() < newNow) {
- newNow = _scheduled.front().getResponseDate();
+ if (!_responses.empty() && _responses.front().when < newNow) {
+ newNow = _responses.front().when;
}
if (until < newNow) {
newNow = until;
@@ -461,30 +451,17 @@ void NetworkInterfaceMock::waitForWorkUntil(Date_t when) {
_waitForWork_inlock(&lk);
}
-void NetworkInterfaceMock::_enqueueOperation_inlock(
- mongo::executor::NetworkInterfaceMock::NetworkOperation&& op) {
- auto insertBefore =
- std::upper_bound(std::begin(_unscheduled),
- std::end(_unscheduled),
- op,
- [](const NetworkOperation& a, const NetworkOperation& b) {
- return a.getNextConsiderationDate() < b.getNextConsiderationDate();
- });
-
+void NetworkInterfaceMock::_enqueueOperation_inlock(NetworkOperation&& op) {
const auto timeout = op.getRequest().timeout;
auto cbh = op.getCallbackHandle();
-
- _unscheduled.emplace(insertBefore, std::move(op));
+ _operations.emplace_back(std::forward<NetworkOperation>(op));
if (timeout != RemoteCommandRequest::kNoTimeout) {
invariant(timeout >= Milliseconds(0));
_alarms.emplace(cbh, _now_inlock() + timeout, [this, cbh](Status) {
- _interruptWithResponse_inlock(
- cbh,
- {&_unscheduled, &_blackHoled, &_scheduled},
- ResponseStatus(ErrorCodes::NetworkInterfaceExceededTimeLimit,
- "Network timeout",
- Milliseconds(0)));
+ auto response = ResponseStatus(
+ ErrorCodes::NetworkInterfaceExceededTimeLimit, "Network timeout", Milliseconds(0));
+ _interruptWithResponse_inlock(cbh, std::move(response));
});
}
}
@@ -502,16 +479,16 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort
auto valid = _hook->validateHost(target, op.getRequest().cmdObj, handshakeReply);
if (!valid.isOK()) {
- op.setResponse(_now_inlock(), valid);
- op.finishResponse();
+ auto response = NetworkResponse{{}, _now_inlock(), valid};
+ op.processResponse(std::move(response));
return;
}
auto swHookPostconnectCommand = _hook->makeRequest(target);
if (!swHookPostconnectCommand.isOK()) {
- op.setResponse(_now_inlock(), swHookPostconnectCommand.getStatus());
- op.finishResponse();
+ auto response = NetworkResponse{{}, _now_inlock(), swHookPostconnectCommand.getStatus()};
+ op.processResponse(std::move(response));
return;
}
@@ -531,16 +508,16 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort
[this, op = std::move(op)](TaskExecutor::ResponseOnAnyStatus rs) mutable {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!rs.isOK()) {
- op.setResponse(_now_inlock(), rs);
- op.finishResponse();
+ auto response = NetworkResponse{{}, _now_inlock(), rs};
+ op.processResponse(std::move(response));
return;
}
auto handleStatus = _hook->handleReply(op.getRequest().target, std::move(rs));
if (!handleStatus.isOK()) {
- op.setResponse(_now_inlock(), handleStatus);
- op.finishResponse();
+ auto response = NetworkResponse{{}, _now_inlock(), handleStatus};
+ op.processResponse(std::move(response));
return;
}
@@ -595,13 +572,31 @@ void NetworkInterfaceMock::_runReadyNetworkOperations_inlock(stdx::unique_lock<s
_canceledAlarms.erase(iter);
}
}
- while (!_scheduled.empty() && _scheduled.front().getResponseDate() <= _now_inlock()) {
+ while (!_responses.empty() && _now_inlock() >= _responses.front().when) {
invariant(_currentlyRunning == kNetworkThread);
- NetworkOperation op = std::move(_scheduled.front());
- _scheduled.pop_front();
+ auto response = std::exchange(_responses.front(), {});
+ _responses.pop_front();
_waitingToRunMask |= kExecutorThread;
lk->unlock();
- op.finishResponse();
+
+ auto noi = response.noi;
+
+ LOGV2(5440602,
+ "Processing response",
+ "when"_attr = response.when,
+ "request"_attr = noi->getRequest(),
+ "response"_attr = response.response);
+
+ if (_metadataHook) {
+ _metadataHook
+ ->readReplyMetadata(noi->getRequest().opCtx,
+ noi->getRequest().target.toString(),
+ response.response.data)
+ .transitional_ignore();
+ }
+
+ noi->processResponse(std::move(response));
+
lk->lock();
}
invariant(_currentlyRunning == kNetworkThread);
@@ -653,55 +648,45 @@ static const ResponseStatus kUnsetResponse(ErrorCodes::InternalError,
"NetworkOperation::_response never set");
NetworkInterfaceMock::NetworkOperation::NetworkOperation()
- : _requestDate(),
- _nextConsiderationDate(),
- _responseDate(),
- _request(),
- _response(kUnsetResponse),
- _onFinish() {}
+ : _requestDate(), _request(), _onResponse() {}
NetworkInterfaceMock::NetworkOperation::NetworkOperation(
const CallbackHandle& cbHandle,
const RemoteCommandRequestOnAny& theRequest,
Date_t theRequestDate,
- RemoteCommandCompletionFn onFinish)
+ ResponseCallback onResponse)
: _requestDate(theRequestDate),
- _nextConsiderationDate(theRequestDate),
- _responseDate(),
_cbHandle(cbHandle),
_requestOnAny(theRequest),
_request(theRequest, 0),
- _response(kUnsetResponse),
- _onFinish(std::move(onFinish)) {
+ _onResponse(std::move(onResponse)) {
invariant(theRequest.target.size() == 1);
}
std::string NetworkInterfaceMock::NetworkOperation::getDiagnosticString() const {
return str::stream() << "NetworkOperation -- request:'" << _request.toString()
- << "', responseStatus: '" << _response.status.toString()
- << "', responseBody: '" << (_response.isOK() ? _response.toString() : "")
- << "', reqDate: " << _requestDate.toString()
- << ", nextConsiderDate: " << _nextConsiderationDate.toString()
- << ", respDate: " << _responseDate.toString();
+ << ", reqDate: " << _requestDate.toString();
}
-void NetworkInterfaceMock::NetworkOperation::setNextConsiderationDate(
- Date_t nextConsiderationDate) {
- invariant(nextConsiderationDate > _nextConsiderationDate);
- _nextConsiderationDate = nextConsiderationDate;
-}
+bool NetworkInterfaceMock::NetworkOperation::processResponse(NetworkResponse response) {
+ if (_isFinished) {
+ // Nothing to do.
+ return false;
+ }
-void NetworkInterfaceMock::NetworkOperation::setResponse(Date_t responseDate,
- const ResponseStatus& response) {
- invariant(responseDate >= _requestDate);
- _responseDate = responseDate;
- _response = response;
-}
+ // If there's no more to come, then we're done after this response.
+ _isFinished = !response.response.moreToCome;
+ ON_BLOCK_EXIT([&] {
+ if (_isFinished) {
+ _onResponse = {};
+ }
+ });
+
+ auto responseOnAny =
+ TaskExecutor::ResponseOnAnyStatus(_request.target, std::move(response.response));
+ _onResponse(responseOnAny);
-void NetworkInterfaceMock::NetworkOperation::finishResponse() {
- invariant(_onFinish);
- _onFinish({_request.target, _response});
- _onFinish = RemoteCommandCompletionFn();
+ return true;
}
NetworkInterfaceMock::InNetworkGuard::InNetworkGuard(NetworkInterfaceMock* net) : _net(net) {
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 8dec1507577..02e3b813c6d 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -80,6 +80,16 @@ public:
using NetworkOperationList = std::list<NetworkOperation>;
using NetworkOperationIterator = NetworkOperationList::iterator;
+ /**
+ * This struct encapsulates the original Request as well as response data and metadata.
+ */
+ struct NetworkResponse {
+ NetworkOperationIterator noi;
+ Date_t when;
+ TaskExecutor::ResponseStatus response;
+ };
+ using NetworkResponseList = std::list<NetworkResponse>;
+
NetworkInterfaceMock();
virtual ~NetworkInterfaceMock();
@@ -174,6 +184,9 @@ public:
/**
* Returns true if there are unscheduled network requests to be processed.
+ *
+ * This will not notice exhaust operations that have not yet finished but have processed all of
+ * their available responses.
*/
bool hasReadyRequests();
@@ -237,13 +250,6 @@ public:
void blackHole(NetworkOperationIterator noi);
/**
- * Defers decision making on "noi" until virtual time "dontAskUntil". Use
- * this when getNextReadyRequest() returns a request you want to deal with
- * after looking at other requests.
- */
- void requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil);
-
- /**
* Runs the simulator forward until now() == until or hasReadyRequests() is true.
* Returns now().
*
@@ -277,12 +283,11 @@ public:
void setHandshakeReplyForHost(const HostAndPort& host, RemoteCommandResponse&& reply);
/**
- * Deliver the response to the callback handle if the handle is present in queuesToCheck.
+ * Deliver the response to the callback handle if the handle is present.
* This represents interrupting the regular flow with, for example, a NetworkTimeout or
* CallbackCanceled error.
*/
void _interruptWithResponse_inlock(const TaskExecutor::CallbackHandle& cbHandle,
- const std::vector<NetworkOperationList*>& queuesToCheck,
const TaskExecutor::ResponseStatus& response);
private:
@@ -352,6 +357,15 @@ private:
void _connectThenEnqueueOperation_inlock(const HostAndPort& target, NetworkOperation&& op);
/**
+ * Enqueues a response to be processed the next time we runReadyNetworkOperations.
+ *
+ * Note that interruption and timeout also invoke this function.
+ */
+ void _scheduleResponse_inlock(NetworkOperationIterator noi,
+ Date_t when,
+ const TaskExecutor::ResponseStatus& response);
+
+ /**
* Runs all ready network operations, called while holding "lk". May drop and
* reaquire "lk" several times, but will not return until the executor has blocked
* in waitFor*.
@@ -387,23 +401,15 @@ private:
// Next date that the executor expects to wake up at (due to a scheduleWorkAt() call).
Date_t _executorNextWakeupDate; // (M)
- // List of network operations whose responses haven't been scheduled or blackholed. This is
- // where network requests are first queued. It is sorted by
- // NetworkOperation::_nextConsiderationDate, which is set to now() when startCommand() is
- // called, and adjusted by requeueAt().
- NetworkOperationList _unscheduled; // (M)
+ // The list of operations that have been submitted via startCommand. Operations are never
+ // deleted from this list, thus NetworkOperationIterators are valid for the lifetime of the
+ // NetworkInterfaceMock.
+ NetworkOperationList _operations; // (M)
- // List of network operations that have been returned by getNextReadyRequest() but not
- // yet scheudled, black-holed or requeued.
- NetworkOperationList _processing; // (M)
-
- // List of network operations whose responses have been scheduled but not delivered, sorted
- // by NetworkOperation::_responseDate. These operations will have their responses delivered
- // when now() == getResponseDate().
- NetworkOperationList _scheduled; // (M)
-
- // List of network operations that will not be responded to until shutdown() is called.
- NetworkOperationList _blackHoled; // (M)
+ // The list of responses that have been enqueued from scheduleResponse(), cancelation, or
+ // timeout. This list is ordered by NetworkResponse::when and is drained front to back by
+ // runReadyNetworkOperations().
+ NetworkResponseList _responses; // (M)
// Heap of alarms, with the next alarm always on top.
std::priority_queue<AlarmInfo, std::vector<AlarmInfo>, std::greater<AlarmInfo>> _alarms; // (M)
@@ -431,23 +437,37 @@ private:
* Representation of an in-progress network operation.
*/
class NetworkInterfaceMock::NetworkOperation {
+ using ResponseCallback = unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>;
+
public:
NetworkOperation();
NetworkOperation(const TaskExecutor::CallbackHandle& cbHandle,
const RemoteCommandRequestOnAny& theRequest,
Date_t theRequestDate,
- RemoteCommandCompletionFn onFinish);
+ ResponseCallback onResponse);
/**
- * Adjusts the stored virtual time at which this entry will be subject to consideration
- * by the test harness.
+ * Mark the operation as observed by the networking thread. This is equivalent to a remote node
+ * processing the operation.
*/
- void setNextConsiderationDate(Date_t nextConsiderationDate);
+ void markAsProcessing() {
+ _isProcessing = true;
+ }
/**
- * Sets the response and thet virtual time at which it will be delivered.
+ * Mark the operation as blackholed by the networking thread.
*/
- void setResponse(Date_t responseDate, const TaskExecutor::ResponseStatus& response);
+ void markAsBlackholed() {
+ _isProcessing = true;
+ _isBlackholed = true;
+ }
+
+ /**
+ * Process a response to an ongoing operation.
+ *
+ * This invokes the _onResponse callback and may throw.
+ */
+ bool processResponse(NetworkResponse response);
/**
* Predicate that returns true if cbHandle equals the executor's handle for this network
@@ -476,47 +496,43 @@ public:
}
/**
- * Gets the virtual time at which the operation was started.
+ * Returns true if this operation has not been observed via getNextReadyRequest(), been
+ * canceled, or timed out.
*/
- Date_t getRequestDate() const {
- return _requestDate;
+ bool hasReadyRequest() const {
+ return !_isProcessing && !_isFinished;
}
/**
- * Gets the virtual time at which the test harness should next consider what to do
- * with this request.
+ * Assert that this operation has not been blackholed.
*/
- Date_t getNextConsiderationDate() const {
- return _nextConsiderationDate;
+ void assertNotBlackholed() {
+ uassert(5440603, "Response scheduled for a blackholed operation", !_isBlackholed);
}
/**
- * After setResponse() has been called, returns the virtual time at which
- * the response should be delivered.
+ * Gets the virtual time at which the operation was started.
*/
- Date_t getResponseDate() const {
- return _responseDate;
+ Date_t getRequestDate() const {
+ return _requestDate;
}
/**
- * Delivers the response, by invoking the onFinish callback passed into the constructor.
- */
- void finishResponse();
-
- /**
* Returns a printable diagnostic string.
*/
std::string getDiagnosticString() const;
private:
Date_t _requestDate;
- Date_t _nextConsiderationDate;
- Date_t _responseDate;
TaskExecutor::CallbackHandle _cbHandle;
RemoteCommandRequestOnAny _requestOnAny;
RemoteCommandRequest _request;
- TaskExecutor::ResponseStatus _response;
- RemoteCommandCompletionFn _onFinish;
+
+ bool _isProcessing = false;
+ bool _isBlackholed = false;
+ bool _isFinished = false;
+
+ ResponseCallback _onResponse;
};
/**