summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2020-02-25 18:56:25 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-27 02:55:43 +0000
commitb80b1796ee3ac8ffb9336caedbf14522b3cf562d (patch)
tree1e3778c960fe2a969f88ebf2c14ade327e397267
parent74961aa50ded11df8612bed9e0bd40f6c9466560 (diff)
downloadmongo-b80b1796ee3ac8ffb9336caedbf14522b3cf562d.tar.gz
SERVER-46223: Support OP_MSG exhaust in mongobridge
-rw-r--r--src/mongo/shell/replsettest.js10
-rw-r--r--src/mongo/tools/bridge.cpp88
-rw-r--r--src/mongo/transport/service_state_machine.cpp1
3 files changed, 77 insertions, 22 deletions
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index ffdfd86e1bd..e88d5fad36c 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -2792,16 +2792,6 @@ var ReplSetTest = function(opts) {
options.setParameter.numInitialSyncConnectAttempts =
options.setParameter.numInitialSyncConnectAttempts || 60;
- // TODO SERVER-46223: Support exhaust in mongobridge.
- if (_useBridge && (options.binVersion === undefined || options.binVersion === "latest")) {
- options.setParameter.oplogFetcherUsesExhaust =
- options.setParameter.oplogFetcherUsesExhaust || false;
- } else if (_useBridge && options.binVersion === "last-stable") {
- // On a 4.2 binary, make sure we do not set oplogFetcherUsesExhaust since it is an
- // unknown parameter in 4.2.
- delete options.setParameter.oplogFetcherUsesExhaust;
- }
-
if (tojson(options) != tojson({}))
printjson(options);
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp
index b82c8f2338a..bae7b9fe1d4 100644
--- a/src/mongo/tools/bridge.cpp
+++ b/src/mongo/tools/bridge.cpp
@@ -194,6 +194,35 @@ public:
return _prng.nextCanonicalDouble();
}
+ void setInExhaust(bool inExhaust) {
+ _inExhaust = inExhaust;
+ }
+
+ bool inExhaust() const {
+ return _inExhaust;
+ }
+
+ // Handle response for request with kExhaustSupported flag or response from the exhaust stream.
+ // This sets up the internal states for the ProxiedConnection and returns whether there is
+ // "moreToCome" from the exhaust stream.
+ bool handleExhaustResponse(Message& response) {
+ // Only support OP_MSG exhaust cursors.
+ invariant(response.operation() == dbMsg);
+ uassert(4622300,
+ "Response ID did not match the sent message ID.",
+ !_lastExhaustRequestId ||
+ response.header().getResponseToMsgId() == _lastExhaustRequestId);
+ if (response.operation() == dbCompressed) {
+ MessageCompressorManager compressorManager;
+ response = uassertStatusOK(compressorManager.decompressMessage(response));
+ }
+ _inExhaust = OpMsg::isFlagSet(response, OpMsg::kMoreToCome);
+ if (_inExhaust) {
+ _lastExhaustRequestId = response.header().getId();
+ }
+ return _inExhaust;
+ }
+
static ProxiedConnection& get(const transport::SessionHandle& session);
private:
@@ -204,6 +233,8 @@ private:
PseudoRandom _prng;
boost::optional<HostAndPort> _host;
bool _seenFirstMessage = false;
+ bool _inExhaust = false;
+ int _lastExhaustRequestId = 0;
};
const transport::Session::Decoration<ProxiedConnection> ProxiedConnection::_get =
@@ -221,15 +252,11 @@ public:
};
DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const Message& request) {
- uassert(51754,
- "Mongobridge does not support exhaust",
- !OpMsg::isFlagSet(request, OpMsg::kExhaustSupported));
-
if (request.operation() == dbQuery) {
DbMessage d(request);
QueryMessage q(d);
if (q.queryOptions & QueryOption_Exhaust) {
- uasserted(51755, "Mongobridge does not support exhaust");
+ uasserted(51755, "Mongobridge does not support OP_QUERY exhaust");
}
}
@@ -237,6 +264,20 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const
auto& dest = ProxiedConnection::get(source);
auto brCtx = BridgeContext::get();
+ // If the bridge decides to return something else other than a response from an active exhaust
+ // stream, make sure we close the exhaust stream properly.
+ auto earlyExhaustExitGuard = makeGuard([&] {
+ if (dest.inExhaust()) {
+ LOGV2(4622301, "mongobridge shutting down exhaust stream", "remote"_attr = dest);
+ dest.setInExhaust(false);
+ // Active exhaust stream should have a session.
+ invariant(dest.getSession());
+ // Close the connection to the dest server to end the exhaust stream.
+ dest->end();
+ dest.setSession(nullptr);
+ }
+ });
+
if (!dest.getSession()) {
dest.setSession([]() -> transport::SessionHandle {
HostAndPort destAddr{mongoBridgeGlobalParams.destUri};
@@ -344,19 +385,30 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const
break;
}
- uassertStatusOK(dest->sinkMessage(request));
+ // If we get another type of request (e.g. exhaust cleanup killCursor request from the service
+ // state machine), unset the exhaust mode.
+ if (dest.inExhaust() &&
+ (request.operation() != dbMsg || !OpMsg::isFlagSet(request, OpMsg::kExhaustSupported))) {
+ dest.setInExhaust(false);
+ }
+
+ // Skip sending request in exhaust mode.
+ if (!dest.inExhaust()) {
+ uassertStatusOK(dest->sinkMessage(request));
+ }
// Send the message we received from 'source' to 'dest'. 'dest' returns a response for
// OP_QUERY, OP_GET_MORE, and OP_MSG messages that we respond with.
if (!isFireAndForgetCommand &&
(request.operation() == dbQuery || request.operation() == dbGetMore ||
request.operation() == dbMsg)) {
- // TODO dbMsg moreToCome
- // Forward the message to 'dest' and receive its reply in 'response'.
auto response = uassertStatusOK(dest->sourceMessage());
- uassert(50765,
- "Response ID did not match the sent message ID.",
- response.header().getResponseToMsgId() == request.header().getId());
+
+ if (!dest.inExhaust()) {
+ uassert(50765,
+ "Response ID did not match the sent message ID.",
+ response.header().getResponseToMsgId() == request.header().getId());
+ }
// Reload the message handling settings for 'host' in case they were changed
// while waiting for a response from 'dest'.
@@ -374,10 +426,22 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const
return {Message()};
}
+ // Only support OP_MSG exhaust cursors.
+ bool isExhaust = false;
+ if (request.operation() == dbMsg && OpMsg::isFlagSet(request, OpMsg::kExhaustSupported)) {
+ isExhaust = dest.handleExhaustResponse(response);
+ earlyExhaustExitGuard.dismiss();
+ }
+
// The original checksum won't be valid once the network layer replaces requestId. Remove it
// because the network layer re-checksums the response.
OpMsg::removeChecksum(&response);
- return {std::move(response)};
+
+ // Return a DbResponse with shouldRunAgainForExhaust being set to isExhaust to indicate
+ // whether this should be run again to receive more responses from the exhaust stream.
+ // We do not need to set 'nextInvocation' in the DbResponse because mongobridge
+ // only receives responses but ignores the next request if it is in exhaust mode.
+ return {std::move(response), isExhaust};
} else {
return {Message()};
}
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 7e33191b98d..6f24f61213b 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -516,6 +516,7 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) {
} else {
_state.store(State::Source);
_inMessage.reset();
+ _inExhaust = false;
return _scheduleNextWithGuard(std::move(guard),
ServiceExecutor::kDeferredTask,
transport::ServiceExecutorTaskName::kSSMSourceMessage);