summaryrefslogtreecommitdiff
path: root/src/mongo/executor/mock_remote_command_runner.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/executor/mock_remote_command_runner.h')
-rw-r--r--src/mongo/executor/mock_remote_command_runner.h305
1 files changed, 295 insertions, 10 deletions
diff --git a/src/mongo/executor/mock_remote_command_runner.h b/src/mongo/executor/mock_remote_command_runner.h
index 85c2d1881b1..2441b166d35 100644
--- a/src/mongo/executor/mock_remote_command_runner.h
+++ b/src/mongo/executor/mock_remote_command_runner.h
@@ -27,19 +27,180 @@
* it in the license file.
*/
+#include <deque>
+
#include "mongo/executor/remote_command_runner.h"
+#include "mongo/executor/remote_command_runner_error_info.h"
#include "mongo/executor/remote_command_targeter.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/util/future_util.h"
+#include "mongo/util/producer_consumer_queue.h"
namespace mongo::executor::remote_command_runner {
/**
- * A mock implementation of the RemoteCommandRunner. In unit tests, swap out the RemoteCommandRunner
- * decoration on the service context with an instance of this mock to inject your own behavior and
- * check invariants.
+ * This header provides two mock implementations of the remote_command_runner::doRequest API. In
+ * unit tests, swap out the RemoteCommandRunner decoration on the ServiceContext with an instance
+ * of one of these mocks to inject your own behavior and check invariants.
+ *
+ * See the header comments for each mock for details.
+ */
+
+/**
+ * The SyncMockRemoteCommandRunner's representation of a RPC request. Exposed to users of the mock
+ * via the onCommand/getNextRequest APIs, which can be used to examine what requests have been
+ * scheduled on the mock and to schedule responsess to them.
+ */
+class RequestInfo {
+public:
+ RequestInfo(BSONObj cmd, StringData dbName, HostAndPort target, Promise<BSONObj>&& promise)
+ : _cmd{cmd}, _dbName{dbName}, _target{target}, _responsePromise{std::move(promise)} {}
+
+ bool isFinished() const {
+ return _finished.load();
+ }
+
+ /**
+ * Use this function to respond to the request. Errors in the 'Status' portion of the
+ * 'StatusWith' correspond to 'local errors' (i.e. those on the sending node), while the BSONObj
+ * corresponds to the raw BSON received as a response to the request over-the-wire.
+ */
+ void respondWith(StatusWith<BSONObj> response) {
+ bool expected = false;
+ if (_finished.compareAndSwap(&expected, true))
+ _responsePromise.setFrom(response);
+ }
+
+ BSONObj _cmd;
+ StringData _dbName;
+ HostAndPort _target;
+
+private:
+ AtomicWord<bool> _finished{false};
+ Promise<BSONObj> _responsePromise;
+};
+
+/**
+ * The SyncMockRemoteCommandRunner is a mock implementation that can be interacted with in a
+ * synchronous/ blocking manner. It allows you to:
+ * -> Synchronously introspect onto what requests have been scheduled.
+ * -> Synchronously respond to those requests.
+ * -> Wait (blocking) for a request to be scheduled and respond to it.
+ * See the member funtions below for details.
+ */
+class SyncMockRemoteCommandRunner : public detail::RemoteCommandRunner {
+public:
+ /**
+ * Mock implementation of the core functionality of the RCR. Records the provided request, and
+ * notifies waiters that a new request has been scheduled.
+ */
+ ExecutorFuture<detail::RemoteCommandInternalResponse> _doRequest(
+ StringData dbName,
+ BSONObj cmdBSON,
+ std::unique_ptr<RemoteCommandHostTargeter> targeter,
+ OperationContext* opCtx,
+ std::shared_ptr<TaskExecutor> exec,
+ CancellationToken token) final {
+ auto [p, f] = makePromiseFuture<BSONObj>();
+ return targeter->resolve(token)
+ .thenRunOn(exec)
+ .onError([](Status s) -> StatusWith<std::vector<HostAndPort>> {
+ return Status{RemoteCommandExecutionErrorInfo(s),
+ "Remote command execution failed"};
+ })
+ .then([=, f = std::move(f), p = std::move(p)](auto&& targets) mutable {
+ stdx::lock_guard lg{_m};
+ _requests.emplace_back(cmdBSON, dbName, targets[0], std::move(p));
+ _hasRequestsCV.notify_one();
+ return std::move(f).then([targetUsed = targets[0]](StatusWith<BSONObj> resp) {
+ if (!resp.isOK()) {
+ uassertStatusOK(Status{RemoteCommandExecutionErrorInfo(resp.getStatus()),
+ "Remote command execution failed"});
+ }
+ Status maybeError(detail::makeErrorIfNeeded(
+ RemoteCommandOnAnyResponse(targetUsed, resp.getValue(), Microseconds(1))));
+ uassertStatusOK(maybeError);
+ return detail::RemoteCommandInternalResponse{resp.getValue(), targetUsed};
+ });
+ });
+ }
+
+ /**
+ * Gets the oldest request the RCR has recieved that has not yet been responded to. Blocking
+ * call - if no unresponded requests are currently queued in the mock, this call blocks until
+ * one arrives.
+ */
+ RequestInfo& getNextRequest() {
+ stdx::unique_lock lk(_m);
+ auto pred = [this] {
+ auto nextUnprocessedRequest =
+ std::find_if(_requests.begin(), _requests.end(), [](const RequestInfo& r) {
+ return !r.isFinished();
+ });
+ return nextUnprocessedRequest != _requests.end();
+ };
+ _hasRequestsCV.wait(lk, pred);
+ return *std::find_if(_requests.begin(), _requests.end(), [](const RequestInfo& r) {
+ return !r.isFinished();
+ });
+ }
+
+ /**
+ * Responds to the first unprocessed request and then responds to it with the result of calling
+ * `responseFn` with the request as argument. If there are no unprocessed requests, blocks until
+ * there is.
+ */
+ void onCommand(std::function<StatusWith<BSONObj>(const RequestInfo&)> responseFn) {
+ auto& request = getNextRequest();
+ request.respondWith(responseFn(request));
+ }
+
+private:
+ // All requests that have been scheduled on the mock.
+ std::deque<RequestInfo> _requests;
+ // Protects the above _requests queue.
+ Mutex _m;
+ stdx::condition_variable _hasRequestsCV;
+};
+
+/**
+ * The AsyncMockRemoteCommandRunner allows you to asynchrously register expectations about what
+ * requests will be registered; you can:
+ * -> Create an 'expectation' that a request will be scheduled some time in the future,
+ * asynchronously.
+ * -> Provide responses to those requests.
+ * -> Ensure that any expectations were met/requests meeting all expectations were
+ * received and responded to.
+ * -> Examine if any unexpected requests were received and, if so, what they were.
+ * See the member funtions below for details.
*/
-class MockRemoteCommandRunner : public detail::RemoteCommandRunner {
+class AsyncMockRemoteCommandRunner : public detail::RemoteCommandRunner {
public:
+ struct Request {
+ BSONObj toBSON() const {
+ BSONObjBuilder bob;
+ BSONObjBuilder subBob(bob.subobjStart("AsyncMockRemoteCommandRunner::Request"));
+ subBob.append("dbName: ", dbName);
+ subBob.append("command: ", cmdBSON);
+ subBob.append("target: ", target.toString());
+ subBob.done();
+ return bob.obj();
+ }
+ std::string dbName;
+ BSONObj cmdBSON;
+ HostAndPort target;
+ };
+
+ /**
+ * Mock implementation of the core functionality of the RCR. Checks that the request is expected
+ * by ensuring that we have an unmet expectation registered that matches it. Then responds to
+ * the request with the response provided by the matching expectation. If more the one
+ * expectation matches the request, the first unmet one registered with `expect` will be used.
+ * See `expect` below for details. If no expectation matches the request, the request is
+ * responded to with a generic error, and the request is recorded. Users that want to make sure
+ * that all requests are expected can use the hadUnexpectedRequests/unexpectedRequests member
+ * functions to inspect that state.
+ */
ExecutorFuture<detail::RemoteCommandInternalResponse> _doRequest(
StringData dbName,
BSONObj cmdBSON,
@@ -47,15 +208,139 @@ public:
OperationContext* opCtx,
std::shared_ptr<TaskExecutor> exec,
CancellationToken token) final {
- return ExecutorFuture(
- exec,
- detail::RemoteCommandInternalResponse{_mockResult, targeter->resolve(token).get()[0]});
+ auto [p, f] = makePromiseFuture<BSONObj>();
+ return targeter->resolve(token).thenRunOn(exec).then(
+ [this, p = std::move(p), cmdBSON, dbName = dbName.toString()](auto&& targets) {
+ stdx::lock_guard lg(_m);
+ Request req{dbName, cmdBSON, targets[0]};
+ auto expectation =
+ std::find_if(_expectations.begin(),
+ _expectations.end(),
+ [&](const Expectation& e) { return e.matcher(req) && !e.met; });
+ if (expectation == _expectations.end()) {
+ // This request was not expected. We record it and return a generic error
+ // response.
+ _unexpectedRequests.push_back(req);
+ uassertStatusOK(
+ Status{RemoteCommandExecutionErrorInfo(Status(
+ ErrorCodes::InternalErrorNotSupported, "Unexpected request")),
+ "Remote command execution failed"});
+ }
+ auto ans = expectation->response;
+ expectation->promise.emplaceValue();
+ expectation->met = true;
+ if (!ans.isOK()) {
+ uassertStatusOK(Status{RemoteCommandExecutionErrorInfo(ans.getStatus()),
+ "Remote command execution failed"});
+ }
+ Status maybeError(detail::makeErrorIfNeeded(
+ RemoteCommandOnAnyResponse(targets[0], ans.getValue(), Microseconds(1))));
+ uassertStatusOK(maybeError);
+ return detail::RemoteCommandInternalResponse{ans.getValue(), targets[0]};
+ });
}
- void setMockResult(BSONObj obj) {
- _mockResult = obj;
+
+ using RequestMatcher = std::function<bool(const Request&)>;
+ struct Expectation {
+ Expectation(RequestMatcher m,
+ StatusWith<BSONObj> response,
+ Promise<void>&& p,
+ std::string name)
+ : matcher{m}, response{response}, promise{std::move(p)}, name{name} {}
+ RequestMatcher matcher;
+ StatusWith<BSONObj> response;
+ Promise<void> promise;
+ bool met{false};
+ std::string name;
+ };
+
+ /**
+ * Register an expectation with the mock that a request matching `matcher` will be scheduled on
+ * the mock. The provided RequestMatcher can inspect the cmdObj and target HostAndPort of a
+ * request, and should return 'true' if the request is the expected one. Once the expectation is
+ * 'met' (i.e. the mock has received a matching request), the request is responded to with the
+ * provided `response`. Note that expectations are "one-shot" - each expectation can match
+ * exactly one request; once an expectation has been matched to a scheduled request, it is
+ * considered 'met' and will not be considered in the future. Additionally, each expectation is
+ * given a name, which allows it to be serialized and identified.
+ *
+ * Errors in the 'Status' portion of the 'StatusWith' response correspond to 'local errors'
+ * (i.e. those on the sending node), while the BSONObj corresponds to the raw BSON received as a
+ * response to the request over-the-wire.
+ */
+ SemiFuture<void> expect(RequestMatcher matcher,
+ StatusWith<BSONObj> response,
+ std::string name) {
+ auto [p, f] = makePromiseFuture<void>();
+ stdx::lock_guard lg(_m);
+ _expectations.emplace_back(matcher, response, std::move(p), std::move(name));
+ return std::move(f).semi();
+ }
+
+ bool hasUnmetExpectations() {
+ stdx::lock_guard lg(_m);
+ auto it = std::find_if(_expectations.begin(),
+ _expectations.end(),
+ [&](const Expectation& e) { return !e.met; });
+ return it != _expectations.end();
+ }
+
+ // Return the name of the first expectation registered that isn't met. If all have been met,
+ // return boost::none.
+ boost::optional<std::string> getFirstUnmetExpectation() {
+ stdx::lock_guard lg(_m);
+ auto it = std::find_if(_expectations.begin(),
+ _expectations.end(),
+ [&](const Expectation& e) { return !e.met; });
+ if (it == _expectations.end()) {
+ return boost::none;
+ }
+ return it->name;
+ }
+
+ // Return the set of names of all unmet expectations.
+ StringSet getUnmetExpectations() {
+ StringSet set;
+ stdx::lock_guard lg(_m);
+ for (auto&& expectation : _expectations) {
+ if (!expectation.met) {
+ set.insert(expectation.name);
+ }
+ }
+ return set;
+ }
+
+ bool hadUnexpectedRequests() {
+ stdx::lock_guard lg(_m);
+ return !_unexpectedRequests.empty();
+ }
+
+ boost::optional<Request> getFirstUnexpectedRequest() {
+ stdx::lock_guard lg(_m);
+ if (!_unexpectedRequests.empty()) {
+ return _unexpectedRequests[0];
+ }
+ return {};
+ }
+
+ std::vector<Request> getUnexpectedRequests() {
+ stdx::lock_guard lg(_m);
+ return _unexpectedRequests;
}
private:
- BSONObj _mockResult;
+ std::vector<Expectation> _expectations;
+ std::vector<Request> _unexpectedRequests;
+ // Protects the above _expectations queue.
+ Mutex _m;
};
+
+std::ostream& operator<<(std::ostream& s, const AsyncMockRemoteCommandRunner::Request& o) {
+ return s << o.toBSON();
+}
+
+std::ostream& operator<<(std::ostream& s, const AsyncMockRemoteCommandRunner::Expectation& o) {
+ return s << o.name;
+}
+
} // namespace mongo::executor::remote_command_runner