summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/exhaust_compression.js34
-rw-r--r--src/mongo/transport/service_state_machine.cpp17
2 files changed, 50 insertions, 1 deletions
diff --git a/jstests/noPassthrough/exhaust_compression.js b/jstests/noPassthrough/exhaust_compression.js
new file mode 100644
index 00000000000..3ede60b4aa3
--- /dev/null
+++ b/jstests/noPassthrough/exhaust_compression.js
@@ -0,0 +1,34 @@
+(function() {
+'use strict';
+
+var runTest = function(compressor) {
+ var mongo = MongoRunner.runMongod({networkMessageCompressors: compressor});
+
+ let shell = startParallelShell(function() {
+ var collName = 'exhaustCollection';
+ var fp = 'beforeCompressingExhaustResponse';
+ db[collName].drop();
+
+ const kDocumentCount = 10;
+ for (var i = 0; i < kDocumentCount; i++) {
+ assert.commandWorked(db.runCommand({insert: collName, documents: [{a: i}]}));
+ }
+
+ const preRes =
+ assert.commandWorked(db.adminCommand({configureFailPoint: fp, mode: "alwaysOn"}));
+
+ db.exhaustCollection.find({}).batchSize(2).addOption(DBQuery.Option.exhaust).toArray();
+
+ const postRes =
+ assert.commandWorked(db.adminCommand({configureFailPoint: fp, mode: "off"}));
+
+ assert.eq(preRes.count + 1, postRes.count, "Exhaust messages are not compressed");
+ }, mongo.port, false, "--networkMessageCompressors", compressor);
+
+ shell();
+
+ MongoRunner.stopMongod(mongo);
+};
+
+runTest("snappy");
+}());
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 1821f7407ed..effeeae3903 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -51,12 +51,14 @@
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/exit.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/quick_exit.h"
namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(beforeCompressingExhaustResponse);
/**
* Creates and returns a legacy exhaust message, if exhaust is allowed. The returned message is to
* be used as the subsequent 'synthetic' exhaust request. Returns an empty message if exhaust is not
@@ -300,6 +302,12 @@ void ServiceStateMachine::_sourceMessage(ThreadGuard guard) {
_state.store(State::SourceWait);
guard.release();
+ // Reset the compressor only before sourcing a new message. This ensures the same compressor,
+ // if any, is used for sinking exhaust messages. For moreToCome messages, this allows resetting
+ // the compressor for each incoming (i.e., sourced) message, and using the latest compressor id
+ // for compressing the sink message.
+ _compressorId = boost::none;
+
auto sourceMsgImpl = [&] {
if (_transportMode == transport::Mode::kSynchronous) {
MONGO_IDLE_THREAD_BLOCK;
@@ -422,7 +430,9 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) {
auto& compressorMgr = MessageCompressorManager::forSession(_session());
- _compressorId = boost::none;
+ // Setup compressor and acquire a compressor id when processing compressed messages. Exhaust
+ // messages produced via `makeExhaustMessage(...)` are not compressed, so the body of this if
+ // statement only runs for sourced compressed messages.
if (_inMessage.operation() == dbCompressed) {
MessageCompressorId compressorId;
auto swm = compressorMgr.decompressMessage(_inMessage, &compressorId);
@@ -474,6 +484,11 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) {
networkCounter.hitLogicalOut(toSink.size());
+ if (MONGO_unlikely(beforeCompressingExhaustResponse.shouldFail(
+ [&](const BSONObj&) { return _compressorId.has_value() && _inExhaust; }))) {
+ // Nothing to do as we only need to record the incident.
+ }
+
if (_compressorId) {
auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value());
uassertStatusOK(swm.getStatus());