diff options
Diffstat (limited to 'src/mongo/executor/mock_remote_command_runner.h')
-rw-r--r-- | src/mongo/executor/mock_remote_command_runner.h | 305 |
1 files changed, 295 insertions, 10 deletions
diff --git a/src/mongo/executor/mock_remote_command_runner.h b/src/mongo/executor/mock_remote_command_runner.h index 85c2d1881b1..2441b166d35 100644 --- a/src/mongo/executor/mock_remote_command_runner.h +++ b/src/mongo/executor/mock_remote_command_runner.h @@ -27,19 +27,180 @@ * it in the license file. */ +#include <deque> + #include "mongo/executor/remote_command_runner.h" +#include "mongo/executor/remote_command_runner_error_info.h" #include "mongo/executor/remote_command_targeter.h" #include "mongo/executor/task_executor.h" +#include "mongo/util/future_util.h" +#include "mongo/util/producer_consumer_queue.h" namespace mongo::executor::remote_command_runner { /** - * A mock implementation of the RemoteCommandRunner. In unit tests, swap out the RemoteCommandRunner - * decoration on the service context with an instance of this mock to inject your own behavior and - * check invariants. + * This header provides two mock implementations of the remote_command_runner::doRequest API. In + * unit tests, swap out the RemoteCommandRunner decoration on the ServiceContext with an instance + * of one of these mocks to inject your own behavior and check invariants. + * + * See the header comments for each mock for details. + */ + +/** + * The SyncMockRemoteCommandRunner's representation of a RPC request. Exposed to users of the mock + * via the onCommand/getNextRequest APIs, which can be used to examine what requests have been + * scheduled on the mock and to schedule responsess to them. + */ +class RequestInfo { +public: + RequestInfo(BSONObj cmd, StringData dbName, HostAndPort target, Promise<BSONObj>&& promise) + : _cmd{cmd}, _dbName{dbName}, _target{target}, _responsePromise{std::move(promise)} {} + + bool isFinished() const { + return _finished.load(); + } + + /** + * Use this function to respond to the request. Errors in the 'Status' portion of the + * 'StatusWith' correspond to 'local errors' (i.e. those on the sending node), while the BSONObj + * corresponds to the raw BSON received as a response to the request over-the-wire. + */ + void respondWith(StatusWith<BSONObj> response) { + bool expected = false; + if (_finished.compareAndSwap(&expected, true)) + _responsePromise.setFrom(response); + } + + BSONObj _cmd; + StringData _dbName; + HostAndPort _target; + +private: + AtomicWord<bool> _finished{false}; + Promise<BSONObj> _responsePromise; +}; + +/** + * The SyncMockRemoteCommandRunner is a mock implementation that can be interacted with in a + * synchronous/ blocking manner. It allows you to: + * -> Synchronously introspect onto what requests have been scheduled. + * -> Synchronously respond to those requests. + * -> Wait (blocking) for a request to be scheduled and respond to it. + * See the member funtions below for details. + */ +class SyncMockRemoteCommandRunner : public detail::RemoteCommandRunner { +public: + /** + * Mock implementation of the core functionality of the RCR. Records the provided request, and + * notifies waiters that a new request has been scheduled. + */ + ExecutorFuture<detail::RemoteCommandInternalResponse> _doRequest( + StringData dbName, + BSONObj cmdBSON, + std::unique_ptr<RemoteCommandHostTargeter> targeter, + OperationContext* opCtx, + std::shared_ptr<TaskExecutor> exec, + CancellationToken token) final { + auto [p, f] = makePromiseFuture<BSONObj>(); + return targeter->resolve(token) + .thenRunOn(exec) + .onError([](Status s) -> StatusWith<std::vector<HostAndPort>> { + return Status{RemoteCommandExecutionErrorInfo(s), + "Remote command execution failed"}; + }) + .then([=, f = std::move(f), p = std::move(p)](auto&& targets) mutable { + stdx::lock_guard lg{_m}; + _requests.emplace_back(cmdBSON, dbName, targets[0], std::move(p)); + _hasRequestsCV.notify_one(); + return std::move(f).then([targetUsed = targets[0]](StatusWith<BSONObj> resp) { + if (!resp.isOK()) { + uassertStatusOK(Status{RemoteCommandExecutionErrorInfo(resp.getStatus()), + "Remote command execution failed"}); + } + Status maybeError(detail::makeErrorIfNeeded( + RemoteCommandOnAnyResponse(targetUsed, resp.getValue(), Microseconds(1)))); + uassertStatusOK(maybeError); + return detail::RemoteCommandInternalResponse{resp.getValue(), targetUsed}; + }); + }); + } + + /** + * Gets the oldest request the RCR has recieved that has not yet been responded to. Blocking + * call - if no unresponded requests are currently queued in the mock, this call blocks until + * one arrives. + */ + RequestInfo& getNextRequest() { + stdx::unique_lock lk(_m); + auto pred = [this] { + auto nextUnprocessedRequest = + std::find_if(_requests.begin(), _requests.end(), [](const RequestInfo& r) { + return !r.isFinished(); + }); + return nextUnprocessedRequest != _requests.end(); + }; + _hasRequestsCV.wait(lk, pred); + return *std::find_if(_requests.begin(), _requests.end(), [](const RequestInfo& r) { + return !r.isFinished(); + }); + } + + /** + * Responds to the first unprocessed request and then responds to it with the result of calling + * `responseFn` with the request as argument. If there are no unprocessed requests, blocks until + * there is. + */ + void onCommand(std::function<StatusWith<BSONObj>(const RequestInfo&)> responseFn) { + auto& request = getNextRequest(); + request.respondWith(responseFn(request)); + } + +private: + // All requests that have been scheduled on the mock. + std::deque<RequestInfo> _requests; + // Protects the above _requests queue. + Mutex _m; + stdx::condition_variable _hasRequestsCV; +}; + +/** + * The AsyncMockRemoteCommandRunner allows you to asynchrously register expectations about what + * requests will be registered; you can: + * -> Create an 'expectation' that a request will be scheduled some time in the future, + * asynchronously. + * -> Provide responses to those requests. + * -> Ensure that any expectations were met/requests meeting all expectations were + * received and responded to. + * -> Examine if any unexpected requests were received and, if so, what they were. + * See the member funtions below for details. */ -class MockRemoteCommandRunner : public detail::RemoteCommandRunner { +class AsyncMockRemoteCommandRunner : public detail::RemoteCommandRunner { public: + struct Request { + BSONObj toBSON() const { + BSONObjBuilder bob; + BSONObjBuilder subBob(bob.subobjStart("AsyncMockRemoteCommandRunner::Request")); + subBob.append("dbName: ", dbName); + subBob.append("command: ", cmdBSON); + subBob.append("target: ", target.toString()); + subBob.done(); + return bob.obj(); + } + std::string dbName; + BSONObj cmdBSON; + HostAndPort target; + }; + + /** + * Mock implementation of the core functionality of the RCR. Checks that the request is expected + * by ensuring that we have an unmet expectation registered that matches it. Then responds to + * the request with the response provided by the matching expectation. If more the one + * expectation matches the request, the first unmet one registered with `expect` will be used. + * See `expect` below for details. If no expectation matches the request, the request is + * responded to with a generic error, and the request is recorded. Users that want to make sure + * that all requests are expected can use the hadUnexpectedRequests/unexpectedRequests member + * functions to inspect that state. + */ ExecutorFuture<detail::RemoteCommandInternalResponse> _doRequest( StringData dbName, BSONObj cmdBSON, @@ -47,15 +208,139 @@ public: OperationContext* opCtx, std::shared_ptr<TaskExecutor> exec, CancellationToken token) final { - return ExecutorFuture( - exec, - detail::RemoteCommandInternalResponse{_mockResult, targeter->resolve(token).get()[0]}); + auto [p, f] = makePromiseFuture<BSONObj>(); + return targeter->resolve(token).thenRunOn(exec).then( + [this, p = std::move(p), cmdBSON, dbName = dbName.toString()](auto&& targets) { + stdx::lock_guard lg(_m); + Request req{dbName, cmdBSON, targets[0]}; + auto expectation = + std::find_if(_expectations.begin(), + _expectations.end(), + [&](const Expectation& e) { return e.matcher(req) && !e.met; }); + if (expectation == _expectations.end()) { + // This request was not expected. We record it and return a generic error + // response. + _unexpectedRequests.push_back(req); + uassertStatusOK( + Status{RemoteCommandExecutionErrorInfo(Status( + ErrorCodes::InternalErrorNotSupported, "Unexpected request")), + "Remote command execution failed"}); + } + auto ans = expectation->response; + expectation->promise.emplaceValue(); + expectation->met = true; + if (!ans.isOK()) { + uassertStatusOK(Status{RemoteCommandExecutionErrorInfo(ans.getStatus()), + "Remote command execution failed"}); + } + Status maybeError(detail::makeErrorIfNeeded( + RemoteCommandOnAnyResponse(targets[0], ans.getValue(), Microseconds(1)))); + uassertStatusOK(maybeError); + return detail::RemoteCommandInternalResponse{ans.getValue(), targets[0]}; + }); } - void setMockResult(BSONObj obj) { - _mockResult = obj; + + using RequestMatcher = std::function<bool(const Request&)>; + struct Expectation { + Expectation(RequestMatcher m, + StatusWith<BSONObj> response, + Promise<void>&& p, + std::string name) + : matcher{m}, response{response}, promise{std::move(p)}, name{name} {} + RequestMatcher matcher; + StatusWith<BSONObj> response; + Promise<void> promise; + bool met{false}; + std::string name; + }; + + /** + * Register an expectation with the mock that a request matching `matcher` will be scheduled on + * the mock. The provided RequestMatcher can inspect the cmdObj and target HostAndPort of a + * request, and should return 'true' if the request is the expected one. Once the expectation is + * 'met' (i.e. the mock has received a matching request), the request is responded to with the + * provided `response`. Note that expectations are "one-shot" - each expectation can match + * exactly one request; once an expectation has been matched to a scheduled request, it is + * considered 'met' and will not be considered in the future. Additionally, each expectation is + * given a name, which allows it to be serialized and identified. + * + * Errors in the 'Status' portion of the 'StatusWith' response correspond to 'local errors' + * (i.e. those on the sending node), while the BSONObj corresponds to the raw BSON received as a + * response to the request over-the-wire. + */ + SemiFuture<void> expect(RequestMatcher matcher, + StatusWith<BSONObj> response, + std::string name) { + auto [p, f] = makePromiseFuture<void>(); + stdx::lock_guard lg(_m); + _expectations.emplace_back(matcher, response, std::move(p), std::move(name)); + return std::move(f).semi(); + } + + bool hasUnmetExpectations() { + stdx::lock_guard lg(_m); + auto it = std::find_if(_expectations.begin(), + _expectations.end(), + [&](const Expectation& e) { return !e.met; }); + return it != _expectations.end(); + } + + // Return the name of the first expectation registered that isn't met. If all have been met, + // return boost::none. + boost::optional<std::string> getFirstUnmetExpectation() { + stdx::lock_guard lg(_m); + auto it = std::find_if(_expectations.begin(), + _expectations.end(), + [&](const Expectation& e) { return !e.met; }); + if (it == _expectations.end()) { + return boost::none; + } + return it->name; + } + + // Return the set of names of all unmet expectations. + StringSet getUnmetExpectations() { + StringSet set; + stdx::lock_guard lg(_m); + for (auto&& expectation : _expectations) { + if (!expectation.met) { + set.insert(expectation.name); + } + } + return set; + } + + bool hadUnexpectedRequests() { + stdx::lock_guard lg(_m); + return !_unexpectedRequests.empty(); + } + + boost::optional<Request> getFirstUnexpectedRequest() { + stdx::lock_guard lg(_m); + if (!_unexpectedRequests.empty()) { + return _unexpectedRequests[0]; + } + return {}; + } + + std::vector<Request> getUnexpectedRequests() { + stdx::lock_guard lg(_m); + return _unexpectedRequests; } private: - BSONObj _mockResult; + std::vector<Expectation> _expectations; + std::vector<Request> _unexpectedRequests; + // Protects the above _expectations queue. + Mutex _m; }; + +std::ostream& operator<<(std::ostream& s, const AsyncMockRemoteCommandRunner::Request& o) { + return s << o.toBSON(); +} + +std::ostream& operator<<(std::ostream& s, const AsyncMockRemoteCommandRunner::Expectation& o) { + return s << o.name; +} + } // namespace mongo::executor::remote_command_runner |