summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2016-08-05 17:27:55 -0400
committerJonathan Reams <jbreams@mongodb.com>2016-08-09 12:18:32 -0400
commit118a4ccdc2a1859309b1e2284952a7223e85b1d4 (patch)
tree6f5b40a60c42db8457fafb4229a9c00560f9fdd0 /src
parent91800fc61913358350b658406065c5d893d2ba2c (diff)
downloadmongo-118a4ccdc2a1859309b1e2284952a7223e85b1d4.tar.gz
SERVER-25267 SERVER-25265 Integrate compression with networking code
and snappy compressor
Diffstat (limited to 'src')
-rw-r--r--src/mongo/SConscript2
-rw-r--r--src/mongo/client/dbclient.cpp33
-rw-r--r--src/mongo/client/dbclientinterface.h7
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/repl/replication_info.cpp2
-rw-r--r--src/mongo/executor/network_interface_asio.h7
-rw-r--r--src/mongo/executor/network_interface_asio_auth.cpp4
-rw-r--r--src/mongo/executor/network_interface_asio_command.cpp8
-rw-r--r--src/mongo/executor/network_interface_asio_operation.cpp6
-rw-r--r--src/mongo/s/commands/cluster_is_master_cmd.cpp3
-rw-r--r--src/mongo/shell/replsettest.js5
-rw-r--r--src/mongo/shell/servers.js4
-rw-r--r--src/mongo/shell/shardingtest.js5
-rw-r--r--src/mongo/shell/shell_options.cpp3
-rw-r--r--src/mongo/shell/utils.js1
-rw-r--r--src/mongo/tools/bridge.cpp21
-rw-r--r--src/mongo/tools/mongobridge_options_init.cpp11
-rw-r--r--src/mongo/transport/SConscript6
-rw-r--r--src/mongo/transport/message_compressor_base.h16
-rw-r--r--src/mongo/transport/message_compressor_manager.cpp20
-rw-r--r--src/mongo/transport/message_compressor_manager.h7
-rw-r--r--src/mongo/transport/message_compressor_manager_test.cpp69
-rw-r--r--src/mongo/transport/message_compressor_noop.h8
-rw-r--r--src/mongo/transport/message_compressor_registry.cpp36
-rw-r--r--src/mongo/transport/message_compressor_registry.h2
-rw-r--r--src/mongo/transport/message_compressor_registry_test.cpp10
-rw-r--r--src/mongo/transport/message_compressor_snappy.cpp80
-rw-r--r--src/mongo/transport/message_compressor_snappy.h44
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.cpp14
-rw-r--r--src/mongo/transport/service_entry_point_test_suite.h18
-rw-r--r--src/mongo/transport/session.h7
-rw-r--r--src/mongo/transport/transport_layer.h4
-rw-r--r--src/mongo/transport/transport_layer_legacy.cpp26
-rw-r--r--src/mongo/transport/transport_layer_legacy.h4
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp6
-rw-r--r--src/mongo/transport/transport_layer_manager.h4
-rw-r--r--src/mongo/transport/transport_layer_mock.cpp6
-rw-r--r--src/mongo/transport/transport_layer_mock.h4
-rw-r--r--src/mongo/util/net/abstract_message_port.h2
39 files changed, 427 insertions, 90 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 624ca0eb062..cae76f222db 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -372,7 +372,7 @@ if not has_option('noshell') and usemozjs:
'rpc/protocol',
'scripting/scripting',
'shell/mongojs',
- 'transport/message_compressor_registry',
+ 'transport/message_compressor',
'util/net/network',
'util/options_parser/options_parser_init',
'util/processinfo',
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp
index 2e25aa607c9..92eb63a29b8 100644
--- a/src/mongo/client/dbclient.cpp
+++ b/src/mongo/client/dbclient.cpp
@@ -740,6 +740,8 @@ executor::RemoteCommandResponse initWireVersion(DBClientConnection* conn,
return serializeStatus;
}
+ conn->getCompressorManager().clientBegin(&bob);
+
Date_t start{Date_t::now()};
auto result =
conn->runCommandWithMetadata("admin", "isMaster", rpc::makeEmptyMetadata(), bob.done());
@@ -753,6 +755,8 @@ executor::RemoteCommandResponse initWireVersion(DBClientConnection* conn,
conn->setWireVersions(minWireVersion, maxWireVersion);
}
+ conn->getCompressorManager().clientFinish(isMasterObj);
+
return executor::RemoteCommandResponse{
std::move(isMasterObj), result->getMetadata().getOwned(), finish - start};
@@ -1288,7 +1292,9 @@ DBClientConnection::DBClientConnection(bool _autoReconnect,
void DBClientConnection::say(Message& toSend, bool isRetry, string* actualServer) {
checkConnection();
try {
- port().say(toSend);
+ auto swm = _compressorManager.compressMessage(toSend);
+ uassertStatusOK(swm.getStatus());
+ port().say(swm.getValue());
} catch (SocketException&) {
_failed = true;
throw;
@@ -1296,12 +1302,18 @@ void DBClientConnection::say(Message& toSend, bool isRetry, string* actualServer
}
bool DBClientConnection::recv(Message& m) {
- if (port().recv(m)) {
- return true;
+ if (!port().recv(m)) {
+ _failed = true;
+ return false;
}
- _failed = true;
- return false;
+ if (m.operation() == dbCompressed) {
+ auto swm = _compressorManager.decompressMessage(m);
+ uassertStatusOK(swm.getStatus());
+ m = std::move(swm.getValue());
+ }
+
+ return true;
}
bool DBClientConnection::call(Message& toSend,
@@ -1314,7 +1326,10 @@ bool DBClientConnection::call(Message& toSend,
*/
checkConnection();
try {
- if (!port().call(toSend, response)) {
+ auto swm = _compressorManager.compressMessage(toSend);
+ uassertStatusOK(swm.getStatus());
+
+ if (!port().call(swm.getValue(), response)) {
_failed = true;
if (assertOk)
uasserted(10278,
@@ -1322,6 +1337,12 @@ bool DBClientConnection::call(Message& toSend,
<< getServerAddress());
return false;
}
+
+ if (response.operation() == dbCompressed) {
+ auto swm = _compressorManager.decompressMessage(response);
+ uassertStatusOK(swm.getStatus());
+ response = std::move(swm.getValue());
+ }
} catch (SocketException&) {
_failed = true;
throw;
diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h
index 8eeef420cb0..23387aa932a 100644
--- a/src/mongo/client/dbclientinterface.h
+++ b/src/mongo/client/dbclientinterface.h
@@ -42,6 +42,7 @@
#include "mongo/rpc/protocol.h"
#include "mongo/rpc/unique_message.h"
#include "mongo/stdx/functional.h"
+#include "mongo/transport/message_compressor_manager.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/message.h"
@@ -1119,6 +1120,10 @@ public:
uint64_t getSockCreationMicroSec() const;
+ MessageCompressorManager& getCompressorManager() {
+ return _compressorManager;
+ }
+
protected:
int _minWireVersion{0};
int _maxWireVersion{0};
@@ -1162,6 +1167,8 @@ private:
// Hook ran on every call to connect()
HandshakeValidationHook _hook;
+
+ MessageCompressorManager _compressorManager;
};
BSONElement getErrField(const BSONObj& result);
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index fea4ff19638..cdd7a006249 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -342,7 +342,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/util/cmdline_utils/cmdline_utils',
- '$BUILD_DIR/mongo/transport/message_compressor_registry',
+ '$BUILD_DIR/mongo/transport/message_compressor',
# The dependency on network is a temporary crutch that should go away once the
# networking library has separate options
'$BUILD_DIR/mongo/util/net/network',
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index cb74b9ec9af..90dd3fb451a 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -296,6 +296,8 @@ public:
if (parameter)
parameter->append(txn, result, "automationServiceDescriptor");
+ txn->getClient()->session()->getCompressorManager().serverNegotiate(cmdObj, &result);
+
return true;
}
} cmdismaster;
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index b0a8d946b1e..bfd9330c4fe 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -56,6 +56,7 @@
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/message_compressor_manager.h"
#include "mongo/util/net/message.h"
namespace mongo {
@@ -161,6 +162,10 @@ private:
rpc::ProtocolSet clientProtocols() const;
void setServerProtocols(rpc::ProtocolSet protocols);
+ MessageCompressorManager& getCompressorManager() {
+ return _compressorManager;
+ }
+
private:
std::unique_ptr<AsyncStreamInterface> _stream;
@@ -169,6 +174,8 @@ private:
// Dynamically initialized from [min max]WireVersionOutgoing.
// Its expected that isMaster response is checked only on the caller.
rpc::ProtocolSet _clientProtocols{rpc::supports::kNone};
+
+ MessageCompressorManager _compressorManager;
};
/**
diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp
index 5dacda44e20..69fb2c0bc10 100644
--- a/src/mongo/executor/network_interface_asio_auth.cpp
+++ b/src/mongo/executor/network_interface_asio_auth.cpp
@@ -75,6 +75,8 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {
bob.append("hostInfo", sb.str());
}
+ op->connection().getCompressorManager().clientBegin(&bob);
+
requestBuilder.setCommandArgs(bob.done());
requestBuilder.setMetadata(rpc::makeEmptyMetadata());
@@ -132,6 +134,8 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {
op->setOperationProtocol(negotiatedProtocol.getValue());
+ op->connection().getCompressorManager().clientFinish(commandReply.data);
+
if (_hook) {
// Run the validation hook.
auto validHost = callNoexcept(
diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp
index 0ce9a1505de..875bd2e048d 100644
--- a/src/mongo/executor/network_interface_asio_command.cpp
+++ b/src/mongo/executor/network_interface_asio_command.cpp
@@ -204,6 +204,14 @@ ResponseStatus NetworkInterfaceASIO::AsyncCommand::response(AsyncOp* op,
Date_t now,
rpc::EgressMetadataHook* metadataHook) {
auto& received = _toRecv;
+ if (received.operation() == dbCompressed) {
+ auto swm = conn().getCompressorManager().decompressMessage(received);
+ if (!swm.isOK()) {
+ return swm.getStatus();
+ }
+ received = std::move(swm.getValue());
+ }
+
switch (_type) {
case CommandType::kRPC: {
auto rs = decodeRPC(&received, protocol, now - _start, _target, metadataHook);
diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp
index 7a01fa35e8e..d595df6c6e8 100644
--- a/src/mongo/executor/network_interface_asio_operation.cpp
+++ b/src/mongo/executor/network_interface_asio_operation.cpp
@@ -171,8 +171,12 @@ Status NetworkInterfaceASIO::AsyncOp::beginCommand(Message&& newCommand,
MONGO_ASYNC_OP_INVARIANT(_connection.is_initialized(),
"Connection should not change over AsyncOp's lifetime");
+ auto swm = _connection->getCompressorManager().compressMessage(newCommand);
+ if (!swm.isOK())
+ return swm.getStatus();
+
// Construct a new AsyncCommand object for each command.
- _command.emplace(_connection.get_ptr(), type, std::move(newCommand), _owner->now(), target);
+ _command.emplace(_connection.get_ptr(), type, std::move(swm.getValue()), _owner->now(), target);
return Status::OK();
}
diff --git a/src/mongo/s/commands/cluster_is_master_cmd.cpp b/src/mongo/s/commands/cluster_is_master_cmd.cpp
index 391ad28b039..399cf8fd4b3 100644
--- a/src/mongo/s/commands/cluster_is_master_cmd.cpp
+++ b/src/mongo/s/commands/cluster_is_master_cmd.cpp
@@ -38,6 +38,7 @@
#include "mongo/rpc/metadata/client_metadata_ismaster.h"
#include "mongo/s/grid.h"
#include "mongo/s/write_ops/batched_command_request.h"
+#include "mongo/transport/message_compressor_manager.h"
#include "mongo/util/map_util.h"
namespace mongo {
@@ -120,6 +121,8 @@ public:
if (parameter)
parameter->append(txn, result, "automationServiceDescriptor");
+ txn->getClient()->session()->getCompressorManager().serverNegotiate(cmdObj, &result);
+
return true;
}
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index f25260745b2..de35e14d9dd 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -938,6 +938,11 @@ var ReplSetTest = function(opts) {
dest: getHostName() + ":" + _unbridgedPorts[n],
});
+ if (jsTestOptions().networkMessageCompressors) {
+ bridgeOptions["networkMessageCompressors"] =
+ jsTestOptions().networkMessageCompressors;
+ }
+
this.nodes[n] = new MongoBridge(bridgeOptions);
}
diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js
index 83057daa6aa..4e3a12a1fa4 100644
--- a/src/mongo/shell/servers.js
+++ b/src/mongo/shell/servers.js
@@ -446,6 +446,10 @@ var MongoRunner, _startMongod, startMongoProgram, runMongoProgram, startMongoPro
MongoRunner.savedOptions[opts.runId] = Object.merge(opts, {});
}
+ if (jsTestOptions().networkMessageCompressors) {
+ opts.networkMessageCompressors = jsTestOptions().networkMessageCompressors;
+ }
+
return opts;
};
diff --git a/src/mongo/shell/shardingtest.js b/src/mongo/shell/shardingtest.js
index 07122f2c16f..6a004ae280c 100644
--- a/src/mongo/shell/shardingtest.js
+++ b/src/mongo/shell/shardingtest.js
@@ -978,6 +978,11 @@ var ShardingTest = function(params) {
otherParams.useBridge = otherParams.useBridge || false;
otherParams.bridgeOptions = otherParams.bridgeOptions || {};
+ if (jsTestOptions().networkMessageCompressors) {
+ otherParams.bridgeOptions["networkMessageCompressors"] =
+ jsTestOptions().networkMessageCompressors;
+ }
+
var keyFile = otherParams.keyFile;
var hostName = getHostName();
diff --git a/src/mongo/shell/shell_options.cpp b/src/mongo/shell/shell_options.cpp
index 12abad3f60a..49037e6e12d 100644
--- a/src/mongo/shell/shell_options.cpp
+++ b/src/mongo/shell/shell_options.cpp
@@ -233,11 +233,12 @@ bool handlePreValidationMongoShellOptions(const moe::Environment& params,
Status storeMongoShellOptions(const moe::Environment& params,
const std::vector<std::string>& args) {
+ Status ret = Status::OK();
if (params.count("quiet")) {
mongo::serverGlobalParams.quiet = true;
}
#ifdef MONGO_CONFIG_SSL
- Status ret = storeSSLClientOptions(params);
+ ret = storeSSLClientOptions(params);
if (!ret.isOK()) {
return ret;
}
diff --git a/src/mongo/shell/utils.js b/src/mongo/shell/utils.js
index 27906af8780..1cf2cd1bef7 100644
--- a/src/mongo/shell/utils.js
+++ b/src/mongo/shell/utils.js
@@ -195,6 +195,7 @@ jsTestOptions = function() {
// Note: does not support the array version
mongosBinVersion: TestData.mongosBinVersion || "",
shardMixedBinVersions: TestData.shardMixedBinVersions || false,
+ networkMessageCompressors: TestData.networkMessageCompressors
});
}
return _jsTestOptions;
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp
index d37751fe537..6299e26b81c 100644
--- a/src/mongo/tools/bridge.cpp
+++ b/src/mongo/tools/bridge.cpp
@@ -128,6 +128,7 @@ public:
Message request;
Message response;
+ MessageCompressorManager compressorManager;
while (true) {
try {
@@ -138,6 +139,16 @@ public:
break;
}
+ if (request.operation() == dbCompressed) {
+ auto swm = compressorManager.decompressMessage(request);
+ if (!swm.isOK()) {
+ error() << "Error decompressing message: " << swm.getStatus();
+ _mp->shutdown();
+ return;
+ }
+ request = std::move(swm.getValue());
+ }
+
std::unique_ptr<rpc::RequestInterface> cmdRequest;
if (request.operation() == dbQuery || request.operation() == dbCommand) {
cmdRequest = rpc::makeRequest(&request);
@@ -245,6 +256,16 @@ public:
exhaust = q.queryOptions & QueryOption_Exhaust;
}
while (exhaust) {
+ if (response.operation() == dbCompressed) {
+ auto swm = compressorManager.decompressMessage(response);
+ if (!swm.isOK()) {
+ error() << "Error decompressing message: " << swm.getStatus();
+ _mp->shutdown();
+ return;
+ }
+ response = std::move(swm.getValue());
+ }
+
MsgData::View header = response.header();
QueryResult::View qr = header.view2ptr();
if (qr.getCursorId()) {
diff --git a/src/mongo/tools/mongobridge_options_init.cpp b/src/mongo/tools/mongobridge_options_init.cpp
index 5207fc0b49d..dadd88b24b4 100644
--- a/src/mongo/tools/mongobridge_options_init.cpp
+++ b/src/mongo/tools/mongobridge_options_init.cpp
@@ -30,6 +30,7 @@
#include <iostream>
+#include "mongo/transport/message_compressor_registry.h"
#include "mongo/util/exit_code.h"
#include "mongo/util/options_parser/startup_option_init.h"
#include "mongo/util/options_parser/startup_options.h"
@@ -37,6 +38,9 @@
namespace mongo {
MONGO_GENERAL_STARTUP_OPTIONS_REGISTER(MongoBridgeOptions)(InitializerContext* context) {
+ auto ret = addMessageCompressionOptions(&moe::startupOptions, false);
+ if (!ret.isOK())
+ return ret;
return addMongoBridgeOptions(&moe::startupOptions);
}
@@ -58,6 +62,13 @@ MONGO_STARTUP_OPTIONS_STORE(MongoBridgeOptions)(InitializerContext* context) {
std::cerr << "try '" << context->args()[0] << " --help' for more information" << std::endl;
quickExit(EXIT_BADOPTIONS);
}
+
+ ret = storeMessageCompressionOptions(moe::startupOptionsParsed);
+ if (!ret.isOK()) {
+ std::cerr << ret.toString() << std::endl;
+ quickExit(EXIT_BADOPTIONS);
+ }
+
return Status::OK();
}
}
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 4b41ec5d085..83fb5729e01 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -23,6 +23,7 @@ env.Library(
'$BUILD_DIR/mongo/unittest/unittest',
'$BUILD_DIR/mongo/util/foundation',
'$BUILD_DIR/mongo/util/net/network',
+ '$BUILD_DIR/mongo/transport/message_compressor',
],
)
@@ -99,16 +100,19 @@ env.CppUnitTest(
LIBDEPS=[
'transport_layer_mock',
],
+)
env.Library(
target='message_compressor',
source=[
- 'message_compressor_registry.cpp',
'message_compressor_manager.cpp',
+ 'message_compressor_registry.cpp',
+ 'message_compressor_snappy.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/util/options_parser/options_parser',
+ '$BUILD_DIR/third_party/shim_snappy',
]
)
diff --git a/src/mongo/transport/message_compressor_base.h b/src/mongo/transport/message_compressor_base.h
index c0a2c14b72d..a2c05a6069e 100644
--- a/src/mongo/transport/message_compressor_base.h
+++ b/src/mongo/transport/message_compressor_base.h
@@ -33,8 +33,17 @@
#include "mongo/base/string_data.h"
#include "mongo/platform/atomic_word.h"
+#include <type_traits>
+
namespace mongo {
-using MessageCompressorId = uint8_t;
+enum class MessageCompressor : uint8_t {
+ kNoop = 0,
+ kSnappy = 1,
+ kExtended = 255,
+};
+
+StringData getMessageCompressorName(MessageCompressor id);
+using MessageCompressorId = std::underlying_type<MessageCompressor>::type;
class MessageCompressorBase {
MONGO_DISALLOW_COPYING(MessageCompressorBase);
@@ -109,8 +118,9 @@ protected:
/*
* This is called by sub-classes to intialize their ID/name fields.
*/
- MessageCompressorBase(MessageCompressorId id, StringData name)
- : _id{id}, _name{name.toString()} {}
+ MessageCompressorBase(MessageCompressor id)
+ : _id{static_cast<MessageCompressorId>(id)},
+ _name{getMessageCompressorName(id).toString()} {}
/*
* Called by sub-classes to bump their bytesIn/bytesOut counters for compression
diff --git a/src/mongo/transport/message_compressor_manager.cpp b/src/mongo/transport/message_compressor_manager.cpp
index 42f3e9c8c0d..78fa19ad8a3 100644
--- a/src/mongo/transport/message_compressor_manager.cpp
+++ b/src/mongo/transport/message_compressor_manager.cpp
@@ -59,10 +59,10 @@ struct CompressionHeader {
CompressionHeader(int32_t _opcode, int32_t _size, uint8_t _id)
: originalOpCode{_opcode}, uncompressedSize{_size}, compressorId{_id} {}
- CompressionHeader(ConstDataRangeCursor cursor) {
- originalOpCode = cursor.readAndAdvance<LittleEndian<std::int32_t>>().getValue();
- uncompressedSize = cursor.readAndAdvance<LittleEndian<std::int32_t>>().getValue();
- compressorId = cursor.readAndAdvance<LittleEndian<uint8_t>>().getValue();
+ CompressionHeader(ConstDataRangeCursor* cursor) {
+ originalOpCode = cursor->readAndAdvance<LittleEndian<std::int32_t>>().getValue();
+ uncompressedSize = cursor->readAndAdvance<LittleEndian<std::int32_t>>().getValue();
+ compressorId = cursor->readAndAdvance<LittleEndian<uint8_t>>().getValue();
}
static size_t size() {
@@ -93,6 +93,8 @@ StatusWith<Message> MessageCompressorManager::compressMessage(const Message& msg
inputHeader.getNetworkOp(), inputHeader.dataLen(), compressor->getId());
if (bufferSize > MaxMessageSizeBytes) {
+ LOG(3) << "Compressed message would be larger than " << MaxMessageSizeBytes
+ << ", returning original uncompressed message";
return {msg};
}
@@ -122,7 +124,7 @@ StatusWith<Message> MessageCompressorManager::compressMessage(const Message& msg
StatusWith<Message> MessageCompressorManager::decompressMessage(const Message& msg) {
auto inputHeader = msg.header();
ConstDataRangeCursor input(inputHeader.data(), inputHeader.data() + inputHeader.dataLen());
- CompressionHeader compressionHeader(input);
+ CompressionHeader compressionHeader(&input);
auto compressor = _registry->getCompressor(compressionHeader.compressorId);
if (!compressor) {
@@ -145,6 +147,10 @@ StatusWith<Message> MessageCompressorManager::decompressMessage(const Message& m
if (!sws.isOK())
return sws.getStatus();
+ if (sws.getValue() != static_cast<std::size_t>(compressionHeader.uncompressedSize)) {
+ return {ErrorCodes::BadValue, "Decompressing message returned less data than expected"};
+ }
+
outMessage.setLen(sws.getValue() + MsgData::MsgDataHeaderSize);
return {Message(outputMessageBuffer)};
@@ -224,8 +230,8 @@ void MessageCompressorManager::serverNegotiate(const BSONObj& input, BSONObjBuil
if ((cur = _registry->getCompressor(curName))) {
LOG(3) << cur->getName() << " is supported";
_negotiated.push_back(cur);
- } else { // Otherwise the compressor is not supported and we skip over it.
- LOG(3) << cur->getName() << " is not supported";
+ } else { // Otherwise the compressor is not supported and we skip over it.
+ LOG(3) << curName << " is not supported";
}
}
diff --git a/src/mongo/transport/message_compressor_manager.h b/src/mongo/transport/message_compressor_manager.h
index a3d35027ffa..7af084996db 100644
--- a/src/mongo/transport/message_compressor_manager.h
+++ b/src/mongo/transport/message_compressor_manager.h
@@ -48,13 +48,16 @@ public:
/*
* Default constructor. Uses the global MessageCompressorRegistry.
*/
- explicit MessageCompressorManager();
+ MessageCompressorManager();
/*
* Constructs a manager from a specific MessageCompressorRegistry - used by the unit tests
* to test various registry configurations.
*/
- MessageCompressorManager(MessageCompressorRegistry* factory);
+ explicit MessageCompressorManager(MessageCompressorRegistry* factory);
+
+ MessageCompressorManager(MessageCompressorManager&&) = default;
+ MessageCompressorManager& operator=(MessageCompressorManager&&) = default;
/*
* Called by a client constructing an isMaster request. This function will append the result
diff --git a/src/mongo/transport/message_compressor_manager_test.cpp b/src/mongo/transport/message_compressor_manager_test.cpp
index c431f102e23..383bb1e3260 100644
--- a/src/mongo/transport/message_compressor_manager_test.cpp
+++ b/src/mongo/transport/message_compressor_manager_test.cpp
@@ -30,10 +30,11 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/stdx/memory.h"
-#include "mongo/transport/message_compressor_registry.h"
#include "mongo/transport/message_compressor_manager.h"
#include "mongo/transport/message_compressor_noop.h"
+#include "mongo/transport/message_compressor_registry.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/net/message.h"
#include <string>
#include <vector>
@@ -49,7 +50,7 @@ MessageCompressorRegistry buildRegistry() {
ret.registerImplementation(std::move(compressor));
ret.finalizeSupportedCompressors();
- return std::move(ret);
+ return ret;
}
void checkNegotiationResult(const BSONObj& result, const std::vector<std::string>& algos) {
@@ -80,7 +81,57 @@ void checkServerNegotiation(const BSONObj& input, const std::vector<std::string>
manager.serverNegotiate(input, &serverOutput);
checkNegotiationResult(serverOutput.done(), expected);
}
-} // namespace
+
+void checkFidelity(const Message& msg, std::unique_ptr<MessageCompressorBase> compressor) {
+ MessageCompressorRegistry registry;
+ const auto originalView = msg.singleData();
+ const auto compressorName = compressor->getName();
+
+ std::vector<std::string> compressorList = {compressorName};
+ registry.setSupportedCompressors(std::move(compressorList));
+ registry.registerImplementation(std::move(compressor));
+ registry.finalizeSupportedCompressors();
+
+ MessageCompressorManager mgr(&registry);
+ auto negotiator = BSON("isMaster" << 1 << "compression" << BSON_ARRAY(compressorName));
+ BSONObjBuilder negotiatorOut;
+ mgr.serverNegotiate(negotiator, &negotiatorOut);
+ checkNegotiationResult(negotiatorOut.done(), {compressorName});
+
+ auto swm = mgr.compressMessage(msg);
+ ASSERT_OK(swm.getStatus());
+ auto compressedMsg = std::move(swm.getValue());
+ const auto compressedMsgView = compressedMsg.singleData();
+
+ ASSERT_EQ(compressedMsgView.getId(), originalView.getId());
+ ASSERT_EQ(compressedMsgView.getResponseToMsgId(), originalView.getResponseToMsgId());
+ ASSERT_EQ(compressedMsgView.getNetworkOp(), dbCompressed);
+
+ swm = mgr.decompressMessage(compressedMsg);
+ ASSERT_OK(swm.getStatus());
+ auto decompressedMsg = std::move(swm.getValue());
+
+ const auto decompressedMsgView = decompressedMsg.singleData();
+ ASSERT_EQ(decompressedMsgView.getId(), originalView.getId());
+ ASSERT_EQ(decompressedMsgView.getResponseToMsgId(), originalView.getResponseToMsgId());
+ ASSERT_EQ(decompressedMsgView.getNetworkOp(), originalView.getNetworkOp());
+ ASSERT_EQ(decompressedMsgView.getLen(), originalView.getLen());
+
+ ASSERT_EQ(memcmp(decompressedMsgView.data(), originalView.data(), originalView.dataLen()), 0);
+}
+
+Message buildMessage() {
+ const auto data = std::string{"Hello, world!"};
+ const auto bufferSize = MsgData::MsgDataHeaderSize + data.size();
+ auto buf = SharedBuffer::allocate(bufferSize);
+ MsgData::View testView(buf.get());
+ testView.setId(123456);
+ testView.setResponseToMsgId(654321);
+ testView.setOperation(dbQuery);
+ testView.setLen(bufferSize);
+ memcpy(testView.data(), data.data(), data.size());
+ return Message{buf};
+}
TEST(MessageCompressorManager, NoCompressionRequested) {
auto input = BSON("isMaster" << 1);
@@ -120,4 +171,16 @@ TEST(MessageCompressorManager, FullNormalCompression) {
clientManager.clientFinish(serverObj);
}
+
+TEST(NoopMessageCompressor, Fidelity) {
+ auto testMessage = buildMessage();
+ checkFidelity(testMessage, stdx::make_unique<NoopMessageCompressor>());
+}
+
+TEST(SnappyMessageCompressor, Fidelity) {
+ auto testMessage = buildMessage();
+ checkFidelity(testMessage, stdx::make_unique<NoopMessageCompressor>());
+}
+
} // namespace mongo
+} // namespace
diff --git a/src/mongo/transport/message_compressor_noop.h b/src/mongo/transport/message_compressor_noop.h
index 54a2c795f9d..b0602482b78 100644
--- a/src/mongo/transport/message_compressor_noop.h
+++ b/src/mongo/transport/message_compressor_noop.h
@@ -26,28 +26,26 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
-
#include "mongo/transport/message_compressor_base.h"
namespace mongo {
class NoopMessageCompressor final : public MessageCompressorBase {
public:
- NoopMessageCompressor() : MessageCompressorBase(0, "noop") {}
+ NoopMessageCompressor() : MessageCompressorBase(MessageCompressor::kNoop) {}
std::size_t getMaxCompressedSize(size_t inputSize) override {
return inputSize;
}
StatusWith<std::size_t> compressData(ConstDataRange input, DataRange output) override {
- memcpy(const_cast<char*>(output.data()), input.data(), input.length());
+ output.write(input);
counterHitCompress(input.length(), input.length());
return {input.length()};
}
StatusWith<std::size_t> decompressData(ConstDataRange input, DataRange output) override {
- memcpy(const_cast<char*>(output.data()), input.data(), input.length());
+ output.write(input);
counterHitDecompress(input.length(), input.length());
return {input.length()};
}
diff --git a/src/mongo/transport/message_compressor_registry.cpp b/src/mongo/transport/message_compressor_registry.cpp
index e64d73b063e..a9cf4cda0b9 100644
--- a/src/mongo/transport/message_compressor_registry.cpp
+++ b/src/mongo/transport/message_compressor_registry.cpp
@@ -33,6 +33,7 @@
#include "mongo/base/init.h"
#include "mongo/stdx/memory.h"
#include "mongo/transport/message_compressor_noop.h"
+#include "mongo/transport/message_compressor_snappy.h"
#include "mongo/util/options_parser/option_section.h"
#include <boost/algorithm/string/classification.hpp>
@@ -40,6 +41,18 @@
namespace mongo {
+StringData getMessageCompressorName(MessageCompressor id) {
+ switch (id) {
+ case MessageCompressor::kNoop:
+ return "noop"_sd;
+ case MessageCompressor::kSnappy:
+ return "snappy"_sd;
+ default:
+ fassert(40269, "Invalid message compressor ID");
+ }
+ MONGO_UNREACHABLE;
+}
+
MessageCompressorRegistry& MessageCompressorRegistry::get() {
static MessageCompressorRegistry globalRegistry;
return globalRegistry;
@@ -48,7 +61,7 @@ MessageCompressorRegistry& MessageCompressorRegistry::get() {
void MessageCompressorRegistry::registerImplementation(
std::unique_ptr<MessageCompressorBase> impl) {
// It's an error to register a compressor that's already been registered
- fassert(40254,
+ fassert(40270,
_compressorsByName.find(impl->getName()) == _compressorsByName.end() &&
_compressorsByIds[impl->getId()] == nullptr);
@@ -61,13 +74,15 @@ void MessageCompressorRegistry::registerImplementation(
_compressorsByIds[impl->getId()] = std::move(impl);
}
-void MessageCompressorRegistry::finalizeSupportedCompressors() {
- // Remove compressor names from the compressorNames list if they were never registered.
- // This prevents _compressorNames from having totally bogus names specified by users.
- std::remove_if(
- _compressorNames.begin(), _compressorNames.end(), [this](const std::string& name) {
- return _compressorsByName.find(name) == _compressorsByName.end();
- });
+Status MessageCompressorRegistry::finalizeSupportedCompressors() {
+ for (auto it = _compressorNames.begin(); it != _compressorNames.end(); ++it) {
+ if (_compressorsByName.find(*it) == _compressorsByName.end()) {
+ std::stringstream ss;
+ ss << "Invalid network message compressor specified in configuration: " << *it;
+ return {ErrorCodes::BadValue, ss.str()};
+ }
+ }
+ return Status::OK();
}
const std::vector<std::string>& MessageCompressorRegistry::getCompressorNames() const {
@@ -119,7 +134,7 @@ Status storeMessageCompressionOptions(const moe::Environment& params) {
// This instantiates and registers the "noop" compressor. It must happen after option storage
// because that's when the configuration of the compressors gets set.
MONGO_INITIALIZER_GENERAL(NoopMessageCompressorInit,
- ("EndStartupOptionHandling"),
+ ("EndStartupOptionStorage"),
("AllCompressorsRegistered"))
(InitializerContext* context) {
auto& compressorRegistry = MessageCompressorRegistry::get();
@@ -131,7 +146,6 @@ MONGO_INITIALIZER_GENERAL(NoopMessageCompressorInit,
// any compressor. It must be run after all the compressors have registered themselves with
// the global registry.
MONGO_INITIALIZER(AllCompressorsRegistered)(InitializerContext* context) {
- MessageCompressorRegistry::get().finalizeSupportedCompressors();
- return Status::OK();
+ return MessageCompressorRegistry::get().finalizeSupportedCompressors();
}
} // namespace mongo
diff --git a/src/mongo/transport/message_compressor_registry.h b/src/mongo/transport/message_compressor_registry.h
index 721185cfb1c..9d8549ed0e3 100644
--- a/src/mongo/transport/message_compressor_registry.h
+++ b/src/mongo/transport/message_compressor_registry.h
@@ -107,7 +107,7 @@ public:
* calls to registerImplementation. It will remove any compressor names that aren't keys in
* the _compressors map.
*/
- void finalizeSupportedCompressors();
+ Status finalizeSupportedCompressors();
private:
StringMap<MessageCompressorBase*> _compressorsByName;
diff --git a/src/mongo/transport/message_compressor_registry_test.cpp b/src/mongo/transport/message_compressor_registry_test.cpp
index b3a766f2f55..a14f067e606 100644
--- a/src/mongo/transport/message_compressor_registry_test.cpp
+++ b/src/mongo/transport/message_compressor_registry_test.cpp
@@ -73,15 +73,17 @@ TEST(MessageCompressorRegistry, NothingRegistered) {
TEST(MessageCompressorRegistry, SetSupported) {
MessageCompressorRegistry registry;
auto compressor = stdx::make_unique<NoopMessageCompressor>();
- auto compressorPtr = compressor.get();
+ auto compressorId = compressor->getId();
+ auto compressorName = compressor->getName();
std::vector<std::string> compressorList = {"foobar"};
registry.setSupportedCompressors(std::move(compressorList));
registry.registerImplementation(std::move(compressor));
- registry.finalizeSupportedCompressors();
+ auto ret = registry.finalizeSupportedCompressors();
+ ASSERT_NOT_OK(ret);
- ASSERT_NULL(registry.getCompressor(compressorPtr->getName()));
- ASSERT_NULL(registry.getCompressor(compressorPtr->getId()));
+ ASSERT_NULL(registry.getCompressor(compressorId));
+ ASSERT_NULL(registry.getCompressor(compressorName));
}
} // namespace
} // namespace mongo
diff --git a/src/mongo/transport/message_compressor_snappy.cpp b/src/mongo/transport/message_compressor_snappy.cpp
new file mode 100644
index 00000000000..db1e0c9dfca
--- /dev/null
+++ b/src/mongo/transport/message_compressor_snappy.cpp
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/base/init.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/transport/message_compressor_registry.h"
+#include "mongo/transport/message_compressor_snappy.h"
+
+#include "third_party/snappy-1.1.3/snappy.h"
+
+namespace mongo {
+
+SnappyMessageCompressor::SnappyMessageCompressor()
+ : MessageCompressorBase(MessageCompressor::kSnappy) {}
+
+std::size_t SnappyMessageCompressor::getMaxCompressedSize(size_t inputSize) {
+ return snappy::MaxCompressedLength(inputSize);
+}
+
+StatusWith<std::size_t> SnappyMessageCompressor::compressData(ConstDataRange input,
+ DataRange output) {
+ size_t outLength;
+ snappy::RawCompress(input.data(), input.length(), const_cast<char*>(output.data()), &outLength);
+
+ counterHitCompress(input.length(), outLength);
+ return {outLength};
+}
+
+StatusWith<std::size_t> SnappyMessageCompressor::decompressData(ConstDataRange input,
+ DataRange output) {
+ bool ret =
+ snappy::RawUncompress(input.data(), input.length(), const_cast<char*>(output.data()));
+
+ if (!ret) {
+ return Status{ErrorCodes::BadValue, "Compressed message was invalid or corrupted"};
+ }
+
+ counterHitDecompress(input.length(), output.length());
+ return output.length();
+}
+
+
+MONGO_INITIALIZER_GENERAL(SnappyMessageCompressorInit,
+ ("EndStartupOptionHandling"),
+ ("AllCompressorsRegistered"))
+(InitializerContext* context) {
+ auto& compressorRegistry = MessageCompressorRegistry::get();
+ compressorRegistry.registerImplementation(stdx::make_unique<SnappyMessageCompressor>());
+ return Status::OK();
+}
+} // namespace mongo
diff --git a/src/mongo/transport/message_compressor_snappy.h b/src/mongo/transport/message_compressor_snappy.h
new file mode 100644
index 00000000000..3521370df09
--- /dev/null
+++ b/src/mongo/transport/message_compressor_snappy.h
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/transport/message_compressor_base.h"
+
+namespace mongo {
+class SnappyMessageCompressor final : public MessageCompressorBase {
+public:
+ SnappyMessageCompressor();
+
+ std::size_t getMaxCompressedSize(size_t inputSize) override;
+
+ StatusWith<std::size_t> compressData(ConstDataRange input, DataRange output) override;
+
+ StatusWith<std::size_t> decompressData(ConstDataRange input, DataRange output) override;
+};
+
+
+} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp
index efd43fd9a9b..841851d5ccd 100644
--- a/src/mongo/transport/service_entry_point_test_suite.cpp
+++ b/src/mongo/transport/service_entry_point_test_suite.cpp
@@ -127,13 +127,13 @@ ServiceEntryPointTestSuite::MockTLHarness::MockTLHarness()
_asyncWait(kDefaultAsyncWait),
_end(kDefaultEnd) {}
-Ticket ServiceEntryPointTestSuite::MockTLHarness::sourceMessage(const Session& session,
+Ticket ServiceEntryPointTestSuite::MockTLHarness::sourceMessage(Session& session,
Message* message,
Date_t expiration) {
return _sourceMessage(session, message, expiration);
}
-Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(const Session& session,
+Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(Session& session,
const Message& message,
Date_t expiration) {
return _sinkMessage(session, message, expiration);
@@ -191,19 +191,17 @@ Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport::
return _defaultWait(std::move(ticket));
}
-Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(const Session& s,
- Message* m,
- Date_t d) {
+Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(Session& s, Message* m, Date_t d) {
return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, m, d));
}
-Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const Session& s,
+Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(Session& s,
const Message&,
Date_t d) {
return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, d));
}
-Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const Session& s,
+Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(Session& s,
const Message& m,
Date_t d) {
_wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError, this, _1);
@@ -264,7 +262,7 @@ void ServiceEntryPointTestSuite::halfLifeCycleTest() {
// Step 1: SEP gets a ticket to source a Message
// Step 2: SEP calls wait() on the ticket and receives a Message
// Step 3: SEP gets a ticket to sink a Message
- _tl->_sinkMessage = [this](const Session& session, const Message& m, Date_t expiration) {
+ _tl->_sinkMessage = [this](Session& session, const Message& m, Date_t expiration) {
// Step 4: SEP calls wait() on the ticket and receives an error
_tl->_wait =
diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h
index 623f0533267..2249c86b9bd 100644
--- a/src/mongo/transport/service_entry_point_test_suite.h
+++ b/src/mongo/transport/service_entry_point_test_suite.h
@@ -120,11 +120,11 @@ private:
MockTLHarness();
transport::Ticket sourceMessage(
- const transport::Session& session,
+ transport::Session& session,
Message* message,
Date_t expiration = transport::Ticket::kNoExpirationDate) override;
transport::Ticket sinkMessage(
- const transport::Session& session,
+ transport::Session& session,
const Message& message,
Date_t expiration = transport::Ticket::kNoExpirationDate) override;
Status wait(transport::Ticket&& ticket) override;
@@ -141,10 +141,8 @@ private:
ServiceEntryPointTestSuite::MockTicket* getMockTicket(const transport::Ticket& ticket);
// Mocked method hooks
- stdx::function<transport::Ticket(const transport::Session&, Message*, Date_t)>
- _sourceMessage;
- stdx::function<transport::Ticket(const transport::Session&, const Message&, Date_t)>
- _sinkMessage;
+ stdx::function<transport::Ticket(transport::Session&, Message*, Date_t)> _sourceMessage;
+ stdx::function<transport::Ticket(transport::Session&, const Message&, Date_t)> _sinkMessage;
stdx::function<Status(transport::Ticket)> _wait;
stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait;
stdx::function<void(const transport::Session&)> _end;
@@ -154,11 +152,9 @@ private:
stdx::function<void(void)> _shutdown = [] {};
// Pre-set hook methods
- transport::Ticket _defaultSource(const transport::Session& s, Message* m, Date_t d);
- transport::Ticket _defaultSink(const transport::Session& s, const Message&, Date_t d);
- transport::Ticket _sinkThenErrorOnWait(const transport::Session& s,
- const Message& m,
- Date_t d);
+ transport::Ticket _defaultSource(transport::Session& s, Message* m, Date_t d);
+ transport::Ticket _defaultSink(transport::Session& s, const Message&, Date_t d);
+ transport::Ticket _sinkThenErrorOnWait(transport::Session& s, const Message& m, Date_t d);
Status _defaultWait(transport::Ticket ticket);
Status _waitError(transport::Ticket ticket);
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index 8551f607037..c7ec5cd28f0 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/base/disallow_copying.h"
+#include "mongo/transport/message_compressor_manager.h"
#include "mongo/transport/session_id.h"
#include "mongo/transport/ticket.h"
#include "mongo/util/net/hostandport.h"
@@ -152,6 +153,10 @@ public:
return _ended;
}
+ MessageCompressorManager& getCompressorManager() {
+ return _messageCompressorManager;
+ }
+
private:
bool _ended = false;
@@ -163,6 +168,8 @@ private:
TagMask _tags;
TransportLayer* _tl;
+
+ MessageCompressorManager _messageCompressorManager;
};
} // namespace transport
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index 44995fc375b..3634ec68de5 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -98,7 +98,7 @@ public:
* TransportLayer is unable to source a Message, this will be a failed status,
* and the passed-in Message buffer may be left in an invalid state.
*/
- virtual Ticket sourceMessage(const Session& session,
+ virtual Ticket sourceMessage(Session& session,
Message* message,
Date_t expiration = Ticket::kNoExpirationDate) = 0;
@@ -117,7 +117,7 @@ public:
* This method does NOT take ownership of the sunk Message, which must be cleaned
* up by the caller.
*/
- virtual Ticket sinkMessage(const Session& session,
+ virtual Ticket sinkMessage(Session& session,
const Message& message,
Date_t expiration = Ticket::kNoExpirationDate) = 0;
diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp
index 8d1f95b6a23..3c004eb793f 100644
--- a/src/mongo/transport/transport_layer_legacy.cpp
+++ b/src/mongo/transport/transport_layer_legacy.cpp
@@ -98,13 +98,19 @@ Status TransportLayerLegacy::start() {
TransportLayerLegacy::~TransportLayerLegacy() = default;
-Ticket TransportLayerLegacy::sourceMessage(const Session& session,
- Message* message,
- Date_t expiration) {
- auto sourceCb = [message](AbstractMessagingPort* amp) -> Status {
+Ticket TransportLayerLegacy::sourceMessage(Session& session, Message* message, Date_t expiration) {
+ auto& compressorMgr = session.getCompressorManager();
+ auto sourceCb = [message, &compressorMgr](AbstractMessagingPort* amp) -> Status {
if (!amp->recv(*message)) {
return {ErrorCodes::HostUnreachable, "Recv failed"};
}
+
+ if (message->operation() == dbCompressed) {
+ auto swm = compressorMgr.decompressMessage(*message);
+ if (!swm.isOK())
+ return swm.getStatus();
+ *message = swm.getValue();
+ }
return Status::OK();
};
@@ -137,12 +143,18 @@ TransportLayer::Stats TransportLayerLegacy::sessionStats() {
return stats;
}
-Ticket TransportLayerLegacy::sinkMessage(const Session& session,
+Ticket TransportLayerLegacy::sinkMessage(Session& session,
const Message& message,
Date_t expiration) {
- auto sinkCb = [&message](AbstractMessagingPort* amp) -> Status {
+ auto& compressorMgr = session.getCompressorManager();
+ auto sinkCb = [&message, &compressorMgr](AbstractMessagingPort* amp) -> Status {
try {
- amp->say(message);
+ auto swm = compressorMgr.compressMessage(message);
+ if (!swm.isOK())
+ return swm.getStatus();
+ const auto& compressedMessage = swm.getValue();
+ amp->say(compressedMessage);
+
return Status::OK();
} catch (const SocketException& e) {
return {ErrorCodes::HostUnreachable, e.what()};
diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h
index 1ef193b4754..7472212ffc9 100644
--- a/src/mongo/transport/transport_layer_legacy.h
+++ b/src/mongo/transport/transport_layer_legacy.h
@@ -67,11 +67,11 @@ public:
Status setup();
Status start() override;
- Ticket sourceMessage(const Session& session,
+ Ticket sourceMessage(Session& session,
Message* message,
Date_t expiration = Ticket::kNoExpirationDate) override;
- Ticket sinkMessage(const Session& session,
+ Ticket sinkMessage(Session& session,
const Message& message,
Date_t expiration = Ticket::kNoExpirationDate) override;
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
index 7ba1797a21f..e513155e5cd 100644
--- a/src/mongo/transport/transport_layer_manager.cpp
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -43,13 +43,11 @@ namespace transport {
TransportLayerManager::TransportLayerManager() = default;
-Ticket TransportLayerManager::sourceMessage(const Session& session,
- Message* message,
- Date_t expiration) {
+Ticket TransportLayerManager::sourceMessage(Session& session, Message* message, Date_t expiration) {
return session.getTransportLayer()->sourceMessage(session, message, expiration);
}
-Ticket TransportLayerManager::sinkMessage(const Session& session,
+Ticket TransportLayerManager::sinkMessage(Session& session,
const Message& message,
Date_t expiration) {
return session.getTransportLayer()->sinkMessage(session, message, expiration);
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
index aeed86edcfe..20d27d6571c 100644
--- a/src/mongo/transport/transport_layer_manager.h
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -54,10 +54,10 @@ class TransportLayerManager final : public TransportLayer {
public:
TransportLayerManager();
- Ticket sourceMessage(const Session& session,
+ Ticket sourceMessage(Session& session,
Message* message,
Date_t expiration = Ticket::kNoExpirationDate) override;
- Ticket sinkMessage(const Session& session,
+ Ticket sinkMessage(Session& session,
const Message& message,
Date_t expiration = Ticket::kNoExpirationDate) override;
diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp
index 3f71b5d16e0..e7fa76d2e9b 100644
--- a/src/mongo/transport/transport_layer_mock.cpp
+++ b/src/mongo/transport/transport_layer_mock.cpp
@@ -64,9 +64,7 @@ boost::optional<Message*> TransportLayerMock::TicketMock::msg() const {
TransportLayerMock::TransportLayerMock() : _shutdown(false) {}
-Ticket TransportLayerMock::sourceMessage(const Session& session,
- Message* message,
- Date_t expiration) {
+Ticket TransportLayerMock::sourceMessage(Session& session, Message* message, Date_t expiration) {
if (inShutdown()) {
return Ticket(TransportLayer::ShutdownStatus);
} else if (!owns(session.id())) {
@@ -79,7 +77,7 @@ Ticket TransportLayerMock::sourceMessage(const Session& session,
stdx::make_unique<TransportLayerMock::TicketMock>(&session, message, expiration));
}
-Ticket TransportLayerMock::sinkMessage(const Session& session,
+Ticket TransportLayerMock::sinkMessage(Session& session,
const Message& message,
Date_t expiration) {
if (inShutdown()) {
diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h
index f519713e9bc..38ab3eed0f1 100644
--- a/src/mongo/transport/transport_layer_mock.h
+++ b/src/mongo/transport/transport_layer_mock.h
@@ -76,10 +76,10 @@ public:
TransportLayerMock();
~TransportLayerMock();
- Ticket sourceMessage(const Session& session,
+ Ticket sourceMessage(Session& session,
Message* message,
Date_t expiration = Ticket::kNoExpirationDate) override;
- Ticket sinkMessage(const Session& session,
+ Ticket sinkMessage(Session& session,
const Message& message,
Date_t expiration = Ticket::kNoExpirationDate) override;
diff --git a/src/mongo/util/net/abstract_message_port.h b/src/mongo/util/net/abstract_message_port.h
index e4884e75133..10860552a7d 100644
--- a/src/mongo/util/net/abstract_message_port.h
+++ b/src/mongo/util/net/abstract_message_port.h
@@ -86,7 +86,7 @@ public:
/**
* Sends the message.
*/
- virtual void say(Message& toSend, int responseTo = 0) = 0;
+ virtual void say(Message& toSend, int responseTo) = 0;
/**
* Sends the message (does not set headers).