diff options
-rw-r--r-- | jstests/noPassthrough/exhaust_compression.js | 34 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 17 |
2 files changed, 50 insertions, 1 deletions
diff --git a/jstests/noPassthrough/exhaust_compression.js b/jstests/noPassthrough/exhaust_compression.js new file mode 100644 index 00000000000..3ede60b4aa3 --- /dev/null +++ b/jstests/noPassthrough/exhaust_compression.js @@ -0,0 +1,34 @@ +(function() { +'use strict'; + +var runTest = function(compressor) { + var mongo = MongoRunner.runMongod({networkMessageCompressors: compressor}); + + let shell = startParallelShell(function() { + var collName = 'exhaustCollection'; + var fp = 'beforeCompressingExhaustResponse'; + db[collName].drop(); + + const kDocumentCount = 10; + for (var i = 0; i < kDocumentCount; i++) { + assert.commandWorked(db.runCommand({insert: collName, documents: [{a: i}]})); + } + + const preRes = + assert.commandWorked(db.adminCommand({configureFailPoint: fp, mode: "alwaysOn"})); + + db.exhaustCollection.find({}).batchSize(2).addOption(DBQuery.Option.exhaust).toArray(); + + const postRes = + assert.commandWorked(db.adminCommand({configureFailPoint: fp, mode: "off"})); + + assert.eq(preRes.count + 1, postRes.count, "Exhaust messages are not compressed"); + }, mongo.port, false, "--networkMessageCompressors", compressor); + + shell(); + + MongoRunner.stopMongod(mongo); +}; + +runTest("snappy"); +}()); diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 1821f7407ed..effeeae3903 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -51,12 +51,14 @@ #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/debug_util.h" #include "mongo/util/exit.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/quick_exit.h" namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(beforeCompressingExhaustResponse); /** * Creates and returns a legacy exhaust message, if exhaust is allowed. The returned message is to * be used as the subsequent 'synthetic' exhaust request. Returns an empty message if exhaust is not @@ -300,6 +302,12 @@ void ServiceStateMachine::_sourceMessage(ThreadGuard guard) { _state.store(State::SourceWait); guard.release(); + // Reset the compressor only before sourcing a new message. This ensures the same compressor, + // if any, is used for sinking exhaust messages. For moreToCome messages, this allows resetting + // the compressor for each incoming (i.e., sourced) message, and using the latest compressor id + // for compressing the sink message. + _compressorId = boost::none; + auto sourceMsgImpl = [&] { if (_transportMode == transport::Mode::kSynchronous) { MONGO_IDLE_THREAD_BLOCK; @@ -422,7 +430,9 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) { auto& compressorMgr = MessageCompressorManager::forSession(_session()); - _compressorId = boost::none; + // Setup compressor and acquire a compressor id when processing compressed messages. Exhaust + // messages produced via `makeExhaustMessage(...)` are not compressed, so the body of this if + // statement only runs for sourced compressed messages. if (_inMessage.operation() == dbCompressed) { MessageCompressorId compressorId; auto swm = compressorMgr.decompressMessage(_inMessage, &compressorId); @@ -474,6 +484,11 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) { networkCounter.hitLogicalOut(toSink.size()); + if (MONGO_unlikely(beforeCompressingExhaustResponse.shouldFail( + [&](const BSONObj&) { return _compressorId.has_value() && _inExhaust; }))) { + // Nothing to do as we only need to record the incident. + } + if (_compressorId) { auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value()); uassertStatusOK(swm.getStatus()); |