summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAdam Midvidy <amidvidy@gmail.com>2015-07-29 09:24:57 -0400
committerAdam Midvidy <amidvidy@gmail.com>2015-07-29 15:04:50 -0400
commitb0f76ffbe26eacfa7666a9a137d9e0956778b85d (patch)
treec943913ed5ef18b9bab3f2f9d4b9b05401fd654c /src
parent54287f9a7a35e335b91689f5f5e7d5efcdb7dd15 (diff)
downloadmongo-b0f76ffbe26eacfa7666a9a137d9e0956778b85d.tar.gz
SERVER-19439 implement connection hooking in NetworkInterfaceMock
Diffstat (limited to 'src')
-rw-r--r--src/mongo/executor/SConscript10
-rw-r--r--src/mongo/executor/network_interface_mock.cpp115
-rw-r--r--src/mongo/executor/network_interface_mock.h42
-rw-r--r--src/mongo/executor/network_interface_mock_test.cpp401
4 files changed, 562 insertions, 6 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index 961e38cf47f..ce159af25a5 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -44,6 +44,16 @@ env.Library('network_interface_mock',
'task_executor_interface',
])
+env.CppUnitTest(
+ target='network_interface_mock_test',
+ source=[
+ 'network_interface_mock_test.cpp'
+ ],
+ LIBDEPS=[
+ 'network_interface_mock',
+ ],
+)
+
env.Library(target='network_test_env',
source=['network_test_env.cpp',],
LIBDEPS=[
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index b4e2b22ddd6..4e8c427d12e 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -26,11 +26,17 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
#include "mongo/platform/basic.h"
#include "mongo/executor/network_interface_mock.h"
+#include <algorithm>
+#include <iterator>
+
#include "mongo/stdx/functional.h"
+#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -71,12 +77,26 @@ void NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHa
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(!_inShutdown);
const Date_t now = _now_inlock();
- NetworkOperationIterator insertBefore = _unscheduled.begin();
- while ((insertBefore != _unscheduled.end()) &&
- (insertBefore->getNextConsiderationDate() <= now)) {
- ++insertBefore;
+ auto op = NetworkOperation(cbHandle, request, now, onFinish);
+
+ // If we don't have a hook, or we have already 'connected' to this host, enqueue the op.
+ if (!_hook || _connections.count(request.target)) {
+ _enqueueOperation_inlock(std::move(op));
+ } else {
+ _connectThenEnqueueOperation_inlock(request.target, std::move(op));
+ }
+}
+
+void NetworkInterfaceMock::setHandshakeReplyForHost(
+ const mongo::HostAndPort& host, mongo::executor::RemoteCommandResponse&& reply) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto it = _handshakeReplies.find(host);
+ if (it == std::end(_handshakeReplies)) {
+ auto res = _handshakeReplies.emplace(host, std::move(reply));
+ invariant(res.second);
+ } else {
+ it->second = std::move(reply);
}
- _unscheduled.insert(insertBefore, NetworkOperation(cbHandle, request, now, onFinish));
}
static bool findAndCancelIf(
@@ -290,8 +310,91 @@ void NetworkInterfaceMock::waitForWorkUntil(Date_t when) {
_waitForWork_inlock(&lk);
}
+void NetworkInterfaceMock::_enqueueOperation_inlock(
+ mongo::executor::NetworkInterfaceMock::NetworkOperation&& op) {
+ auto insertBefore =
+ std::upper_bound(std::begin(_unscheduled),
+ std::end(_unscheduled),
+ op,
+ [](const NetworkOperation& a, const NetworkOperation& b) {
+ return a.getNextConsiderationDate() < b.getNextConsiderationDate();
+ });
+
+ _unscheduled.emplace(insertBefore, std::move(op));
+}
+
+void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort& target,
+ NetworkOperation&& op) {
+ invariant(_hook); // if there is no hook, we shouldn't even hit this codepath
+ invariant(!_connections.count(target));
+
+ auto handshakeReplyIter = _handshakeReplies.find(target);
+
+ auto handshakeReply = (handshakeReplyIter != std::end(_handshakeReplies))
+ ? handshakeReplyIter->second
+ : RemoteCommandResponse(BSONObj(), BSONObj(), Milliseconds(0));
+
+ auto valid = _hook->validateHost(target, handshakeReply);
+ if (!valid.isOK()) {
+ op.setResponse(_now_inlock(), valid);
+ op.finishResponse();
+ return;
+ }
+
+ auto swHookPostconnectCommand = _hook->makeRequest(target);
+
+ if (!swHookPostconnectCommand.isOK()) {
+ op.setResponse(_now_inlock(), swHookPostconnectCommand.getStatus());
+ op.finishResponse();
+ return;
+ }
+
+ boost::optional<RemoteCommandRequest> hookPostconnectCommand =
+ std::move(swHookPostconnectCommand.getValue());
+
+ if (!hookPostconnectCommand) {
+ // If we don't have a post connect command, enqueue the actual command.
+ _enqueueOperation_inlock(std::move(op));
+ _connections.emplace(op.getRequest().target);
+ return;
+ }
+
+ // The completion handler for the postconnect command schedules the original command.
+ auto postconnectCompletionHandler =
+ [this, op](StatusWith<RemoteCommandResponse> response) mutable {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (!response.isOK()) {
+ op.setResponse(_now_inlock(), response.getStatus());
+ op.finishResponse();
+ return;
+ }
+
+ auto handleStatus =
+ _hook->handleReply(op.getRequest().target, std::move(response.getValue()));
+
+ if (!handleStatus.isOK()) {
+ op.setResponse(_now_inlock(), handleStatus);
+ op.finishResponse();
+ return;
+ }
+
+ _enqueueOperation_inlock(std::move(op));
+ _connections.emplace(op.getRequest().target);
+ };
+
+ auto postconnectOp = NetworkOperation(op.getCallbackHandle(),
+ std::move(*hookPostconnectCommand),
+ _now_inlock(),
+ std::move(postconnectCompletionHandler));
+
+ _enqueueOperation_inlock(std::move(postconnectOp));
+}
+
void NetworkInterfaceMock::setConnectionHook(std::unique_ptr<ConnectionHook> hook) {
- MONGO_UNREACHABLE;
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_hasStarted);
+ invariant(!_hook);
+ _hook = std::move(hook);
}
void NetworkInterfaceMock::signalWorkAvailable() {
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 91491b578e0..f8fc61840f8 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -28,7 +28,10 @@
#pragma once
+#include <memory>
#include <queue>
+#include <unordered_map>
+#include <unordered_set>
#include <utility>
#include <vector>
@@ -167,6 +170,19 @@ public:
*/
void runReadyNetworkOperations();
+ /**
+ * Sets the reply of the 'isMaster' handshake for a specific host. This reply will only
+ * be given to the 'validateHost' method of the ConnectionHook set on this object - NOT
+ * to the completion handlers of any 'isMaster' commands scheduled with 'startCommand'.
+ *
+ * This reply will persist until it is changed again using this method.
+ *
+ * If the NetworkInterfaceMock conducts a handshake with a simulated host which has not
+ * had a handshake reply set, a default constructed RemoteCommandResponse will be passed
+ * to validateHost if a hook is set.
+ */
+ void setHandshakeReplyForHost(const HostAndPort& host, RemoteCommandResponse&& reply);
+
private:
/**
* Information describing a scheduled alarm.
@@ -218,6 +234,16 @@ private:
bool _isExecutorThreadRunnable_inlock();
/**
+ * Enqueues a network operation to run in order of 'consideration date'.
+ */
+ void _enqueueOperation_inlock(NetworkOperation&& op);
+
+ /**
+ * "Connects" to a remote host, and then enqueues the provided operation.
+ */
+ void _connectThenEnqueueOperation_inlock(const HostAndPort& target, NetworkOperation&& op);
+
+ /**
* Runs all ready network operations, called while holding "lk". May drop and
* reaquire "lk" several times, but will not return until the executor has blocked
* in waitFor*.
@@ -273,6 +299,18 @@ private:
// Heap of alarms, with the next alarm always on top.
std::priority_queue<AlarmInfo, std::vector<AlarmInfo>, std::greater<AlarmInfo>> _alarms; // (M)
+
+ // The connection hook.
+ std::unique_ptr<ConnectionHook> _hook; // (R)
+
+ // The set of hosts we have seen so far. If we see a new host, we will execute the
+ // ConnectionHook's validation and post-connection logic.
+ //
+ // TODO: provide a way to simulate disconnections.
+ std::unordered_set<HostAndPort> _connections; // (M)
+
+ // The handshake replies set for each host.
+ std::unordered_map<HostAndPort, RemoteCommandResponse> _handshakeReplies; // (M)
};
/**
@@ -306,6 +344,10 @@ public:
return cbHandle == _cbHandle;
}
+ const TaskExecutor::CallbackHandle& getCallbackHandle() const {
+ return _cbHandle;
+ }
+
/**
* Gets the request that initiated this operation.
*/
diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp
new file mode 100644
index 00000000000..8e623778458
--- /dev/null
+++ b/src/mongo/executor/network_interface_mock_test.cpp
@@ -0,0 +1,401 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <iostream>
+#include <memory>
+#include <utility>
+
+#include "mongo/base/status.h"
+#include "mongo/executor/network_interface.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_mock.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace executor {
+namespace {
+
+template <typename ValidateFunc, typename RequestFunc, typename ReplyFunc>
+class TestConnectionHook final : public NetworkInterface::ConnectionHook {
+public:
+ TestConnectionHook(ValidateFunc&& validateFunc,
+ RequestFunc&& requestFunc,
+ ReplyFunc&& replyFunc)
+ : _validateFunc(std::forward<ValidateFunc>(validateFunc)),
+ _requestFunc(std::forward<RequestFunc>(requestFunc)),
+ _replyFunc(std::forward<ReplyFunc>(replyFunc)) {}
+
+ Status validateHost(const HostAndPort& remoteHost,
+ const RemoteCommandResponse& isMasterReply) override {
+ return _validateFunc(remoteHost, isMasterReply);
+ }
+
+ StatusWith<boost::optional<RemoteCommandRequest>> makeRequest(const HostAndPort& remoteHost) {
+ return _requestFunc(remoteHost);
+ }
+
+ Status handleReply(const HostAndPort& remoteHost, RemoteCommandResponse&& response) {
+ return _replyFunc(remoteHost, std::move(response));
+ }
+
+private:
+ ValidateFunc _validateFunc;
+ RequestFunc _requestFunc;
+ ReplyFunc _replyFunc;
+};
+
+template <typename Val, typename Req, typename Rep>
+static std::unique_ptr<TestConnectionHook<Val, Req, Rep>> makeTestHook(Val&& validateFunc,
+ Req&& requestFunc,
+ Rep&& replyFunc) {
+ return stdx::make_unique<TestConnectionHook<Val, Req, Rep>>(std::forward<Val>(validateFunc),
+ std::forward<Req>(requestFunc),
+ std::forward<Rep>(replyFunc));
+}
+
+class NetworkInterfaceMockTest : public mongo::unittest::Test {
+public:
+ NetworkInterfaceMockTest() : _net{}, _executor(&_net, 1) {}
+
+ NetworkInterfaceMock& net() {
+ return _net;
+ }
+
+ ThreadPoolMock& executor() {
+ return _executor;
+ }
+
+ HostAndPort testHost() {
+ return {"localHost", 27017};
+ }
+
+ // intentionally not done in setUp as some methods need to be called prior to starting
+ // the network.
+ void startNetwork() {
+ net().startup();
+ executor().startup();
+ }
+
+ virtual void tearDown() override {
+ net().exitNetwork();
+ executor().shutdown();
+ // Wake up sleeping executor threads so they clean up.
+ net().signalWorkAvailable();
+ executor().join();
+ net().shutdown();
+ }
+
+private:
+ NetworkInterfaceMock _net;
+ ThreadPoolMock _executor;
+};
+
+TEST_F(NetworkInterfaceMockTest, ConnectionHook) {
+ bool validateCalled = false;
+ bool hostCorrectForValidate = false;
+ bool replyCorrectForValidate;
+
+ bool makeRequestCalled = false;
+ bool hostCorrectForRequest = false;
+
+ bool handleReplyCalled = false;
+ bool gotExpectedReply = false;
+
+ RemoteCommandRequest expectedRequest{testHost(),
+ "test",
+ BSON("1" << 2),
+ BSON("some"
+ << "stuff")};
+
+ RemoteCommandResponse expectedResponse{BSON("foo"
+ << "bar"
+ << "baz"
+ << "garply"),
+ BSON("bar"
+ << "baz"),
+ Milliseconds(30)};
+
+ // need to copy as it will be moved
+ auto isMasterReplyData = BSON("iamyour"
+ << "father");
+
+ RemoteCommandResponse isMasterReply{
+ isMasterReplyData.copy(), BSON("blah" << 2), Milliseconds(20)};
+
+ net().setHandshakeReplyForHost(testHost(), std::move(isMasterReply));
+
+ // Since the contract of these methods is that they do not throw, we run the ASSERTs in
+ // the test scope.
+ net().setConnectionHook(makeTestHook(
+ [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) {
+ validateCalled = true;
+ hostCorrectForValidate = (remoteHost == testHost());
+ replyCorrectForValidate = (isMasterReply.data == isMasterReplyData);
+ return Status::OK();
+ },
+ [&](const HostAndPort& remoteHost) {
+ makeRequestCalled = true;
+ hostCorrectForRequest = (remoteHost == testHost());
+ return boost::make_optional<RemoteCommandRequest>(expectedRequest);
+ },
+ [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) {
+ handleReplyCalled = true;
+ hostCorrectForRequest = (remoteHost == testHost());
+ gotExpectedReply =
+ (expectedResponse.data == response.data); // Don't bother checking all fields.
+ return Status::OK();
+ }));
+
+ startNetwork();
+
+ TaskExecutor::CallbackHandle cb{};
+
+ bool commandFinished = false;
+ bool gotCorrectCommandReply = false;
+
+ RemoteCommandRequest actualCommandExpected{
+ testHost(), "testDB", BSON("test" << 1), rpc::makeEmptyMetadata()};
+ RemoteCommandResponse actualResponseExpected{BSON("1212121212"
+ << "12121212121212"),
+ BSONObj(),
+ Milliseconds(0)};
+
+ net().startCommand(cb,
+ actualCommandExpected,
+ [&](StatusWith<RemoteCommandResponse> resp) {
+ commandFinished = true;
+ if (resp.isOK()) {
+ gotCorrectCommandReply = (actualResponseExpected.toString() ==
+ resp.getValue().toString());
+ }
+ });
+
+ // At this point validate and makeRequest should have been called.
+ ASSERT(validateCalled);
+ ASSERT(hostCorrectForValidate);
+ ASSERT(replyCorrectForValidate);
+ ASSERT(makeRequestCalled);
+ ASSERT(hostCorrectForRequest);
+
+ // handleReply should not have been called as we haven't responded to the reply yet.
+ ASSERT(!handleReplyCalled);
+ // we haven't gotten to the actual command yet
+ ASSERT(!commandFinished);
+
+ {
+ net().enterNetwork();
+ ASSERT(net().hasReadyRequests());
+ auto req = net().getNextReadyRequest();
+ ASSERT(req->getRequest().cmdObj == expectedRequest.cmdObj);
+ net().scheduleResponse(req, net().now(), expectedResponse);
+ net().runReadyNetworkOperations();
+ net().exitNetwork();
+ }
+
+ // We should have responded to the post connect command.
+ ASSERT(handleReplyCalled);
+ ASSERT(gotExpectedReply);
+
+ // We should not have responsed to the actual command.
+ ASSERT(!commandFinished);
+
+ {
+ net().enterNetwork();
+ ASSERT(net().hasReadyRequests());
+ auto actualCommand = net().getNextReadyRequest();
+ ASSERT(actualCommand->getRequest().cmdObj == actualCommandExpected.cmdObj);
+ net().scheduleResponse(actualCommand, net().now(), actualResponseExpected);
+ net().runReadyNetworkOperations();
+ net().exitNetwork();
+ }
+
+ ASSERT(commandFinished);
+ ASSERT(gotCorrectCommandReply);
+}
+
+TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) {
+ net().setConnectionHook(makeTestHook(
+ [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
+ // We just need some obscure non-OK code.
+ return {ErrorCodes::ConflictingOperationInProgress, "blah"};
+ },
+ [&](const HostAndPort& remoteHost)
+ -> StatusWith<boost::optional<RemoteCommandRequest>> { MONGO_UNREACHABLE; },
+ [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response)
+ -> Status { MONGO_UNREACHABLE; }));
+
+ startNetwork();
+
+ TaskExecutor::CallbackHandle cb{};
+
+ bool commandFinished = false;
+ bool statusPropagated = false;
+
+ net().startCommand(cb,
+ RemoteCommandRequest{},
+ [&](StatusWith<RemoteCommandResponse> resp) {
+ commandFinished = true;
+
+ statusPropagated = resp.getStatus().code() ==
+ ErrorCodes::ConflictingOperationInProgress;
+ });
+
+ {
+ net().enterNetwork();
+ // We should have short-circuited the network and immediately called the callback.
+ // If we change isMaster replies to go through the normal network mechanism,
+ // this test will need to change.
+ ASSERT(!net().hasReadyRequests());
+ net().exitNetwork();
+ }
+
+ ASSERT(commandFinished);
+ ASSERT(statusPropagated);
+}
+
+TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) {
+ bool makeRequestCalled = false;
+ net().setConnectionHook(makeTestHook(
+ [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply)
+ -> Status { return Status::OK(); },
+ [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
+ makeRequestCalled = true;
+ return {boost::none};
+ },
+ [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response)
+ -> Status { MONGO_UNREACHABLE; }));
+
+ startNetwork();
+
+ TaskExecutor::CallbackHandle cb{};
+
+ bool commandFinished = false;
+
+ net().startCommand(cb,
+ RemoteCommandRequest{},
+ [&](StatusWith<RemoteCommandResponse> resp) { commandFinished = true; });
+
+ {
+ net().enterNetwork();
+ ASSERT(net().hasReadyRequests());
+ auto req = net().getNextReadyRequest();
+ net().scheduleResponse(req, net().now(), RemoteCommandResponse{});
+ net().runReadyNetworkOperations();
+ net().exitNetwork();
+ }
+
+ ASSERT(commandFinished);
+}
+
+TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) {
+ bool makeRequestCalled = false;
+ net().setConnectionHook(makeTestHook(
+ [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply)
+ -> Status { return Status::OK(); },
+ [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
+ makeRequestCalled = true;
+ return {ErrorCodes::InvalidSyncSource, "blah"};
+ },
+ [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response)
+ -> Status { MONGO_UNREACHABLE; }));
+
+ startNetwork();
+
+ TaskExecutor::CallbackHandle cb{};
+
+ bool commandFinished = false;
+ bool errorPropagated = false;
+
+ net().startCommand(cb,
+ RemoteCommandRequest{},
+ [&](StatusWith<RemoteCommandResponse> resp) {
+ commandFinished = true;
+ errorPropagated =
+ resp.getStatus().code() == ErrorCodes::InvalidSyncSource;
+ });
+
+ {
+ net().enterNetwork();
+ ASSERT(!net().hasReadyRequests());
+ net().exitNetwork();
+ }
+
+ ASSERT(commandFinished);
+ ASSERT(errorPropagated);
+}
+
+TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) {
+ bool handleReplyCalled = false;
+ net().setConnectionHook(makeTestHook(
+ [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply)
+ -> Status { return Status::OK(); },
+ [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
+ return boost::make_optional<RemoteCommandRequest>({});
+ },
+ [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) -> Status {
+ handleReplyCalled = true;
+ return {ErrorCodes::CappedPositionLost, "woot"};
+ }));
+
+ startNetwork();
+
+ TaskExecutor::CallbackHandle cb{};
+
+ bool commandFinished = false;
+ bool errorPropagated = false;
+
+ net().startCommand(cb,
+ RemoteCommandRequest{},
+ [&](StatusWith<RemoteCommandResponse> resp) {
+ commandFinished = true;
+ errorPropagated =
+ resp.getStatus().code() == ErrorCodes::CappedPositionLost;
+ });
+
+ ASSERT(!handleReplyCalled);
+
+ {
+ net().enterNetwork();
+ ASSERT(net().hasReadyRequests());
+ auto req = net().getNextReadyRequest();
+ net().scheduleResponse(req, net().now(), RemoteCommandResponse{});
+ net().runReadyNetworkOperations();
+ net().exitNetwork();
+ }
+
+ ASSERT(handleReplyCalled);
+ ASSERT(commandFinished);
+ ASSERT(errorPropagated);
+}
+
+} // namespace
+} // namespace executor
+} // namespace mongo