diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2020-02-25 18:56:25 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-27 02:55:43 +0000 |
commit | b80b1796ee3ac8ffb9336caedbf14522b3cf562d (patch) | |
tree | 1e3778c960fe2a969f88ebf2c14ade327e397267 | |
parent | 74961aa50ded11df8612bed9e0bd40f6c9466560 (diff) | |
download | mongo-b80b1796ee3ac8ffb9336caedbf14522b3cf562d.tar.gz |
SERVER-46223: Support OP_MSG exhaust in mongobridge
-rw-r--r-- | src/mongo/shell/replsettest.js | 10 | ||||
-rw-r--r-- | src/mongo/tools/bridge.cpp | 88 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 1 |
3 files changed, 77 insertions, 22 deletions
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index ffdfd86e1bd..e88d5fad36c 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -2792,16 +2792,6 @@ var ReplSetTest = function(opts) { options.setParameter.numInitialSyncConnectAttempts = options.setParameter.numInitialSyncConnectAttempts || 60; - // TODO SERVER-46223: Support exhaust in mongobridge. - if (_useBridge && (options.binVersion === undefined || options.binVersion === "latest")) { - options.setParameter.oplogFetcherUsesExhaust = - options.setParameter.oplogFetcherUsesExhaust || false; - } else if (_useBridge && options.binVersion === "last-stable") { - // On a 4.2 binary, make sure we do not set oplogFetcherUsesExhaust since it is an - // unknown parameter in 4.2. - delete options.setParameter.oplogFetcherUsesExhaust; - } - if (tojson(options) != tojson({})) printjson(options); diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index b82c8f2338a..bae7b9fe1d4 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -194,6 +194,35 @@ public: return _prng.nextCanonicalDouble(); } + void setInExhaust(bool inExhaust) { + _inExhaust = inExhaust; + } + + bool inExhaust() const { + return _inExhaust; + } + + // Handle response for request with kExhaustSupported flag or response from the exhaust stream. + // This sets up the internal states for the ProxiedConnection and returns whether there is + // "moreToCome" from the exhaust stream. + bool handleExhaustResponse(Message& response) { + // Only support OP_MSG exhaust cursors. + invariant(response.operation() == dbMsg); + uassert(4622300, + "Response ID did not match the sent message ID.", + !_lastExhaustRequestId || + response.header().getResponseToMsgId() == _lastExhaustRequestId); + if (response.operation() == dbCompressed) { + MessageCompressorManager compressorManager; + response = uassertStatusOK(compressorManager.decompressMessage(response)); + } + _inExhaust = OpMsg::isFlagSet(response, OpMsg::kMoreToCome); + if (_inExhaust) { + _lastExhaustRequestId = response.header().getId(); + } + return _inExhaust; + } + static ProxiedConnection& get(const transport::SessionHandle& session); private: @@ -204,6 +233,8 @@ private: PseudoRandom _prng; boost::optional<HostAndPort> _host; bool _seenFirstMessage = false; + bool _inExhaust = false; + int _lastExhaustRequestId = 0; }; const transport::Session::Decoration<ProxiedConnection> ProxiedConnection::_get = @@ -221,15 +252,11 @@ public: }; DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const Message& request) { - uassert(51754, - "Mongobridge does not support exhaust", - !OpMsg::isFlagSet(request, OpMsg::kExhaustSupported)); - if (request.operation() == dbQuery) { DbMessage d(request); QueryMessage q(d); if (q.queryOptions & QueryOption_Exhaust) { - uasserted(51755, "Mongobridge does not support exhaust"); + uasserted(51755, "Mongobridge does not support OP_QUERY exhaust"); } } @@ -237,6 +264,20 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const auto& dest = ProxiedConnection::get(source); auto brCtx = BridgeContext::get(); + // If the bridge decides to return something else other than a response from an active exhaust + // stream, make sure we close the exhaust stream properly. + auto earlyExhaustExitGuard = makeGuard([&] { + if (dest.inExhaust()) { + LOGV2(4622301, "mongobridge shutting down exhaust stream", "remote"_attr = dest); + dest.setInExhaust(false); + // Active exhaust stream should have a session. + invariant(dest.getSession()); + // Close the connection to the dest server to end the exhaust stream. + dest->end(); + dest.setSession(nullptr); + } + }); + if (!dest.getSession()) { dest.setSession([]() -> transport::SessionHandle { HostAndPort destAddr{mongoBridgeGlobalParams.destUri}; @@ -344,19 +385,30 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const break; } - uassertStatusOK(dest->sinkMessage(request)); + // If we get another type of request (e.g. exhaust cleanup killCursor request from the service + // state machine), unset the exhaust mode. + if (dest.inExhaust() && + (request.operation() != dbMsg || !OpMsg::isFlagSet(request, OpMsg::kExhaustSupported))) { + dest.setInExhaust(false); + } + + // Skip sending request in exhaust mode. + if (!dest.inExhaust()) { + uassertStatusOK(dest->sinkMessage(request)); + } // Send the message we received from 'source' to 'dest'. 'dest' returns a response for // OP_QUERY, OP_GET_MORE, and OP_MSG messages that we respond with. if (!isFireAndForgetCommand && (request.operation() == dbQuery || request.operation() == dbGetMore || request.operation() == dbMsg)) { - // TODO dbMsg moreToCome - // Forward the message to 'dest' and receive its reply in 'response'. auto response = uassertStatusOK(dest->sourceMessage()); - uassert(50765, - "Response ID did not match the sent message ID.", - response.header().getResponseToMsgId() == request.header().getId()); + + if (!dest.inExhaust()) { + uassert(50765, + "Response ID did not match the sent message ID.", + response.header().getResponseToMsgId() == request.header().getId()); + } // Reload the message handling settings for 'host' in case they were changed // while waiting for a response from 'dest'. @@ -374,10 +426,22 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const return {Message()}; } + // Only support OP_MSG exhaust cursors. + bool isExhaust = false; + if (request.operation() == dbMsg && OpMsg::isFlagSet(request, OpMsg::kExhaustSupported)) { + isExhaust = dest.handleExhaustResponse(response); + earlyExhaustExitGuard.dismiss(); + } + // The original checksum won't be valid once the network layer replaces requestId. Remove it // because the network layer re-checksums the response. OpMsg::removeChecksum(&response); - return {std::move(response)}; + + // Return a DbResponse with shouldRunAgainForExhaust being set to isExhaust to indicate + // whether this should be run again to receive more responses from the exhaust stream. + // We do not need to set 'nextInvocation' in the DbResponse because mongobridge + // only receives responses but ignores the next request if it is in exhaust mode. + return {std::move(response), isExhaust}; } else { return {Message()}; } diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 7e33191b98d..6f24f61213b 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -516,6 +516,7 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) { } else { _state.store(State::Source); _inMessage.reset(); + _inExhaust = false; return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask, transport::ServiceExecutorTaskName::kSSMSourceMessage); |