summaryrefslogtreecommitdiff
path: root/src/mongo/executor/network_interface_mock.cpp
diff options
context:
space:
mode:
authorAdam Midvidy <amidvidy@gmail.com>2015-07-24 16:52:35 -0400
committerAdam Midvidy <amidvidy@gmail.com>2015-07-28 18:47:34 -0400
commit90fdb2e96143bcd81097dc7242072666c1d5952f (patch)
tree37c03b57b378fbeb6ecd7ee3d0279a8c9761eb08 /src/mongo/executor/network_interface_mock.cpp
parent99103f65251459ea2fc8ad7e096284799702a234 (diff)
downloadmongo-90fdb2e96143bcd81097dc7242072666c1d5952f.tar.gz
SERVER-19439 implement connection hooking in NetworkInterfaceMock
Diffstat (limited to 'src/mongo/executor/network_interface_mock.cpp')
-rw-r--r--src/mongo/executor/network_interface_mock.cpp115
1 files changed, 109 insertions, 6 deletions
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index b4e2b22ddd6..61b8e8d7a9e 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -26,11 +26,17 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
#include "mongo/platform/basic.h"
#include "mongo/executor/network_interface_mock.h"
+#include <algorithm>
+#include <iterator>
+
#include "mongo/stdx/functional.h"
+#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -71,12 +77,26 @@ void NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHa
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(!_inShutdown);
const Date_t now = _now_inlock();
- NetworkOperationIterator insertBefore = _unscheduled.begin();
- while ((insertBefore != _unscheduled.end()) &&
- (insertBefore->getNextConsiderationDate() <= now)) {
- ++insertBefore;
+ auto op = NetworkOperation(cbHandle, request, now, onFinish);
+
+ // If we don't have a hook, or we have already 'connected' to this host, enqueue the op.
+ if (!_hook || _connections.count(request.target)) {
+ _enqueueOperation_inlock(std::move(op));
+ } else {
+ _connectThenEnqueueOperation_inlock(request.target, std::move(op));
+ }
+}
+
+void NetworkInterfaceMock::setHandshakeReplyForHost(
+ const mongo::HostAndPort& host, mongo::executor::RemoteCommandResponse&& reply) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto it = _handshakeReplies.find(host);
+ if (it == std::end(_handshakeReplies)) {
+ auto res = _handshakeReplies.emplace(host, std::move(reply));
+ invariant(res.second);
+ } else {
+ it->second = std::move(reply);
}
- _unscheduled.insert(insertBefore, NetworkOperation(cbHandle, request, now, onFinish));
}
static bool findAndCancelIf(
@@ -290,8 +310,91 @@ void NetworkInterfaceMock::waitForWorkUntil(Date_t when) {
_waitForWork_inlock(&lk);
}
+void NetworkInterfaceMock::_enqueueOperation_inlock(
+ mongo::executor::NetworkInterfaceMock::NetworkOperation&& op) {
+ auto insertBefore =
+ std::lower_bound(std::begin(_unscheduled),
+ std::end(_unscheduled),
+ op,
+ [](const NetworkOperation& a, const NetworkOperation& b) {
+ return a.getNextConsiderationDate() < b.getNextConsiderationDate();
+ });
+
+ _unscheduled.emplace(insertBefore, std::move(op));
+}
+
+void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort& target,
+ NetworkOperation&& op) {
+ invariant(_hook); // if there is no hook, we shouldn't even hit this codepath
+ invariant(!_connections.count(target));
+
+ auto handshakeReplyIter = _handshakeReplies.find(target);
+
+ auto handshakeReply = (handshakeReplyIter != std::end(_handshakeReplies))
+ ? handshakeReplyIter->second
+ : RemoteCommandResponse(BSONObj(), BSONObj(), Milliseconds(0));
+
+ auto valid = _hook->validateHost(target, handshakeReply);
+ if (!valid.isOK()) {
+ op.setResponse(_now_inlock(), valid);
+ op.finishResponse();
+ return;
+ }
+
+ auto swHookPostconnectCommand = _hook->makeRequest(target);
+
+ if (!swHookPostconnectCommand.isOK()) {
+ op.setResponse(_now_inlock(), swHookPostconnectCommand.getStatus());
+ op.finishResponse();
+ return;
+ }
+
+ boost::optional<RemoteCommandRequest> hookPostconnectCommand =
+ std::move(swHookPostconnectCommand.getValue());
+
+ if (!hookPostconnectCommand) {
+ // If we don't have a post connect command, enqueue the actual command.
+ _enqueueOperation_inlock(std::move(op));
+ _connections.emplace(op.getRequest().target);
+ return;
+ }
+
+ // The completion handler for the postconnect command schedules the original command.
+ auto postconnectCompletionHandler =
+ [this, op](StatusWith<RemoteCommandResponse> response) mutable {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (!response.isOK()) {
+ op.setResponse(_now_inlock(), response.getStatus());
+ op.finishResponse();
+ return;
+ }
+
+ auto handleStatus =
+ _hook->handleReply(op.getRequest().target, std::move(response.getValue()));
+
+ if (!handleStatus.isOK()) {
+ op.setResponse(_now_inlock(), handleStatus);
+ op.finishResponse();
+ return;
+ }
+
+ _enqueueOperation_inlock(std::move(op));
+ _connections.emplace(op.getRequest().target);
+ };
+
+ auto postconnectOp = NetworkOperation(op.getCallbackHandle(),
+ std::move(*hookPostconnectCommand),
+ _now_inlock(),
+ std::move(postconnectCompletionHandler));
+
+ _enqueueOperation_inlock(std::move(postconnectOp));
+}
+
void NetworkInterfaceMock::setConnectionHook(std::unique_ptr<ConnectionHook> hook) {
- MONGO_UNREACHABLE;
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_hasStarted);
+ invariant(!_hook);
+ _hook = std::move(hook);
}
void NetworkInterfaceMock::signalWorkAvailable() {