diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2016-08-05 17:27:55 -0400 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2016-08-09 12:18:32 -0400 |
commit | 118a4ccdc2a1859309b1e2284952a7223e85b1d4 (patch) | |
tree | 6f5b40a60c42db8457fafb4229a9c00560f9fdd0 /src | |
parent | 91800fc61913358350b658406065c5d893d2ba2c (diff) | |
download | mongo-118a4ccdc2a1859309b1e2284952a7223e85b1d4.tar.gz |
SERVER-25267 SERVER-25265 Integrate compression with networking code
and snappy compressor
Diffstat (limited to 'src')
39 files changed, 427 insertions, 90 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 624ca0eb062..cae76f222db 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -372,7 +372,7 @@ if not has_option('noshell') and usemozjs: 'rpc/protocol', 'scripting/scripting', 'shell/mongojs', - 'transport/message_compressor_registry', + 'transport/message_compressor', 'util/net/network', 'util/options_parser/options_parser_init', 'util/processinfo', diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp index 2e25aa607c9..92eb63a29b8 100644 --- a/src/mongo/client/dbclient.cpp +++ b/src/mongo/client/dbclient.cpp @@ -740,6 +740,8 @@ executor::RemoteCommandResponse initWireVersion(DBClientConnection* conn, return serializeStatus; } + conn->getCompressorManager().clientBegin(&bob); + Date_t start{Date_t::now()}; auto result = conn->runCommandWithMetadata("admin", "isMaster", rpc::makeEmptyMetadata(), bob.done()); @@ -753,6 +755,8 @@ executor::RemoteCommandResponse initWireVersion(DBClientConnection* conn, conn->setWireVersions(minWireVersion, maxWireVersion); } + conn->getCompressorManager().clientFinish(isMasterObj); + return executor::RemoteCommandResponse{ std::move(isMasterObj), result->getMetadata().getOwned(), finish - start}; @@ -1288,7 +1292,9 @@ DBClientConnection::DBClientConnection(bool _autoReconnect, void DBClientConnection::say(Message& toSend, bool isRetry, string* actualServer) { checkConnection(); try { - port().say(toSend); + auto swm = _compressorManager.compressMessage(toSend); + uassertStatusOK(swm.getStatus()); + port().say(swm.getValue()); } catch (SocketException&) { _failed = true; throw; @@ -1296,12 +1302,18 @@ void DBClientConnection::say(Message& toSend, bool isRetry, string* actualServer } bool DBClientConnection::recv(Message& m) { - if (port().recv(m)) { - return true; + if (!port().recv(m)) { + _failed = true; + return false; } - _failed = true; - return false; + if (m.operation() == dbCompressed) { + auto swm = _compressorManager.decompressMessage(m); + uassertStatusOK(swm.getStatus()); + m = std::move(swm.getValue()); + } + + return true; } bool DBClientConnection::call(Message& toSend, @@ -1314,7 +1326,10 @@ bool DBClientConnection::call(Message& toSend, */ checkConnection(); try { - if (!port().call(toSend, response)) { + auto swm = _compressorManager.compressMessage(toSend); + uassertStatusOK(swm.getStatus()); + + if (!port().call(swm.getValue(), response)) { _failed = true; if (assertOk) uasserted(10278, @@ -1322,6 +1337,12 @@ bool DBClientConnection::call(Message& toSend, << getServerAddress()); return false; } + + if (response.operation() == dbCompressed) { + auto swm = _compressorManager.decompressMessage(response); + uassertStatusOK(swm.getStatus()); + response = std::move(swm.getValue()); + } } catch (SocketException&) { _failed = true; throw; diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h index 8eeef420cb0..23387aa932a 100644 --- a/src/mongo/client/dbclientinterface.h +++ b/src/mongo/client/dbclientinterface.h @@ -42,6 +42,7 @@ #include "mongo/rpc/protocol.h" #include "mongo/rpc/unique_message.h" #include "mongo/stdx/functional.h" +#include "mongo/transport/message_compressor_manager.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/message.h" @@ -1119,6 +1120,10 @@ public: uint64_t getSockCreationMicroSec() const; + MessageCompressorManager& getCompressorManager() { + return _compressorManager; + } + protected: int _minWireVersion{0}; int _maxWireVersion{0}; @@ -1162,6 +1167,8 @@ private: // Hook ran on every call to connect() HandshakeValidationHook _hook; + + MessageCompressorManager _compressorManager; }; BSONElement getErrField(const BSONObj& result); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index fea4ff19638..cdd7a006249 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -342,7 +342,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/util/cmdline_utils/cmdline_utils', - '$BUILD_DIR/mongo/transport/message_compressor_registry', + '$BUILD_DIR/mongo/transport/message_compressor', # The dependency on network is a temporary crutch that should go away once the # networking library has separate options '$BUILD_DIR/mongo/util/net/network', diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index cb74b9ec9af..90dd3fb451a 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -296,6 +296,8 @@ public: if (parameter) parameter->append(txn, result, "automationServiceDescriptor"); + txn->getClient()->session()->getCompressorManager().serverNegotiate(cmdObj, &result); + return true; } } cmdismaster; diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index b0a8d946b1e..bfd9330c4fe 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -56,6 +56,7 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/message_compressor_manager.h" #include "mongo/util/net/message.h" namespace mongo { @@ -161,6 +162,10 @@ private: rpc::ProtocolSet clientProtocols() const; void setServerProtocols(rpc::ProtocolSet protocols); + MessageCompressorManager& getCompressorManager() { + return _compressorManager; + } + private: std::unique_ptr<AsyncStreamInterface> _stream; @@ -169,6 +174,8 @@ private: // Dynamically initialized from [min max]WireVersionOutgoing. // Its expected that isMaster response is checked only on the caller. rpc::ProtocolSet _clientProtocols{rpc::supports::kNone}; + + MessageCompressorManager _compressorManager; }; /** diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp index 5dacda44e20..69fb2c0bc10 100644 --- a/src/mongo/executor/network_interface_asio_auth.cpp +++ b/src/mongo/executor/network_interface_asio_auth.cpp @@ -75,6 +75,8 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) { bob.append("hostInfo", sb.str()); } + op->connection().getCompressorManager().clientBegin(&bob); + requestBuilder.setCommandArgs(bob.done()); requestBuilder.setMetadata(rpc::makeEmptyMetadata()); @@ -132,6 +134,8 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) { op->setOperationProtocol(negotiatedProtocol.getValue()); + op->connection().getCompressorManager().clientFinish(commandReply.data); + if (_hook) { // Run the validation hook. auto validHost = callNoexcept( diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index 0ce9a1505de..875bd2e048d 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -204,6 +204,14 @@ ResponseStatus NetworkInterfaceASIO::AsyncCommand::response(AsyncOp* op, Date_t now, rpc::EgressMetadataHook* metadataHook) { auto& received = _toRecv; + if (received.operation() == dbCompressed) { + auto swm = conn().getCompressorManager().decompressMessage(received); + if (!swm.isOK()) { + return swm.getStatus(); + } + received = std::move(swm.getValue()); + } + switch (_type) { case CommandType::kRPC: { auto rs = decodeRPC(&received, protocol, now - _start, _target, metadataHook); diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp index 7a01fa35e8e..d595df6c6e8 100644 --- a/src/mongo/executor/network_interface_asio_operation.cpp +++ b/src/mongo/executor/network_interface_asio_operation.cpp @@ -171,8 +171,12 @@ Status NetworkInterfaceASIO::AsyncOp::beginCommand(Message&& newCommand, MONGO_ASYNC_OP_INVARIANT(_connection.is_initialized(), "Connection should not change over AsyncOp's lifetime"); + auto swm = _connection->getCompressorManager().compressMessage(newCommand); + if (!swm.isOK()) + return swm.getStatus(); + // Construct a new AsyncCommand object for each command. - _command.emplace(_connection.get_ptr(), type, std::move(newCommand), _owner->now(), target); + _command.emplace(_connection.get_ptr(), type, std::move(swm.getValue()), _owner->now(), target); return Status::OK(); } diff --git a/src/mongo/s/commands/cluster_is_master_cmd.cpp b/src/mongo/s/commands/cluster_is_master_cmd.cpp index 391ad28b039..399cf8fd4b3 100644 --- a/src/mongo/s/commands/cluster_is_master_cmd.cpp +++ b/src/mongo/s/commands/cluster_is_master_cmd.cpp @@ -38,6 +38,7 @@ #include "mongo/rpc/metadata/client_metadata_ismaster.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/batched_command_request.h" +#include "mongo/transport/message_compressor_manager.h" #include "mongo/util/map_util.h" namespace mongo { @@ -120,6 +121,8 @@ public: if (parameter) parameter->append(txn, result, "automationServiceDescriptor"); + txn->getClient()->session()->getCompressorManager().serverNegotiate(cmdObj, &result); + return true; } diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index f25260745b2..de35e14d9dd 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -938,6 +938,11 @@ var ReplSetTest = function(opts) { dest: getHostName() + ":" + _unbridgedPorts[n], }); + if (jsTestOptions().networkMessageCompressors) { + bridgeOptions["networkMessageCompressors"] = + jsTestOptions().networkMessageCompressors; + } + this.nodes[n] = new MongoBridge(bridgeOptions); } diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js index 83057daa6aa..4e3a12a1fa4 100644 --- a/src/mongo/shell/servers.js +++ b/src/mongo/shell/servers.js @@ -446,6 +446,10 @@ var MongoRunner, _startMongod, startMongoProgram, runMongoProgram, startMongoPro MongoRunner.savedOptions[opts.runId] = Object.merge(opts, {}); } + if (jsTestOptions().networkMessageCompressors) { + opts.networkMessageCompressors = jsTestOptions().networkMessageCompressors; + } + return opts; }; diff --git a/src/mongo/shell/shardingtest.js b/src/mongo/shell/shardingtest.js index 07122f2c16f..6a004ae280c 100644 --- a/src/mongo/shell/shardingtest.js +++ b/src/mongo/shell/shardingtest.js @@ -978,6 +978,11 @@ var ShardingTest = function(params) { otherParams.useBridge = otherParams.useBridge || false; otherParams.bridgeOptions = otherParams.bridgeOptions || {}; + if (jsTestOptions().networkMessageCompressors) { + otherParams.bridgeOptions["networkMessageCompressors"] = + jsTestOptions().networkMessageCompressors; + } + var keyFile = otherParams.keyFile; var hostName = getHostName(); diff --git a/src/mongo/shell/shell_options.cpp b/src/mongo/shell/shell_options.cpp index 12abad3f60a..49037e6e12d 100644 --- a/src/mongo/shell/shell_options.cpp +++ b/src/mongo/shell/shell_options.cpp @@ -233,11 +233,12 @@ bool handlePreValidationMongoShellOptions(const moe::Environment& params, Status storeMongoShellOptions(const moe::Environment& params, const std::vector<std::string>& args) { + Status ret = Status::OK(); if (params.count("quiet")) { mongo::serverGlobalParams.quiet = true; } #ifdef MONGO_CONFIG_SSL - Status ret = storeSSLClientOptions(params); + ret = storeSSLClientOptions(params); if (!ret.isOK()) { return ret; } diff --git a/src/mongo/shell/utils.js b/src/mongo/shell/utils.js index 27906af8780..1cf2cd1bef7 100644 --- a/src/mongo/shell/utils.js +++ b/src/mongo/shell/utils.js @@ -195,6 +195,7 @@ jsTestOptions = function() { // Note: does not support the array version mongosBinVersion: TestData.mongosBinVersion || "", shardMixedBinVersions: TestData.shardMixedBinVersions || false, + networkMessageCompressors: TestData.networkMessageCompressors }); } return _jsTestOptions; diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index d37751fe537..6299e26b81c 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -128,6 +128,7 @@ public: Message request; Message response; + MessageCompressorManager compressorManager; while (true) { try { @@ -138,6 +139,16 @@ public: break; } + if (request.operation() == dbCompressed) { + auto swm = compressorManager.decompressMessage(request); + if (!swm.isOK()) { + error() << "Error decompressing message: " << swm.getStatus(); + _mp->shutdown(); + return; + } + request = std::move(swm.getValue()); + } + std::unique_ptr<rpc::RequestInterface> cmdRequest; if (request.operation() == dbQuery || request.operation() == dbCommand) { cmdRequest = rpc::makeRequest(&request); @@ -245,6 +256,16 @@ public: exhaust = q.queryOptions & QueryOption_Exhaust; } while (exhaust) { + if (response.operation() == dbCompressed) { + auto swm = compressorManager.decompressMessage(response); + if (!swm.isOK()) { + error() << "Error decompressing message: " << swm.getStatus(); + _mp->shutdown(); + return; + } + response = std::move(swm.getValue()); + } + MsgData::View header = response.header(); QueryResult::View qr = header.view2ptr(); if (qr.getCursorId()) { diff --git a/src/mongo/tools/mongobridge_options_init.cpp b/src/mongo/tools/mongobridge_options_init.cpp index 5207fc0b49d..dadd88b24b4 100644 --- a/src/mongo/tools/mongobridge_options_init.cpp +++ b/src/mongo/tools/mongobridge_options_init.cpp @@ -30,6 +30,7 @@ #include <iostream> +#include "mongo/transport/message_compressor_registry.h" #include "mongo/util/exit_code.h" #include "mongo/util/options_parser/startup_option_init.h" #include "mongo/util/options_parser/startup_options.h" @@ -37,6 +38,9 @@ namespace mongo { MONGO_GENERAL_STARTUP_OPTIONS_REGISTER(MongoBridgeOptions)(InitializerContext* context) { + auto ret = addMessageCompressionOptions(&moe::startupOptions, false); + if (!ret.isOK()) + return ret; return addMongoBridgeOptions(&moe::startupOptions); } @@ -58,6 +62,13 @@ MONGO_STARTUP_OPTIONS_STORE(MongoBridgeOptions)(InitializerContext* context) { std::cerr << "try '" << context->args()[0] << " --help' for more information" << std::endl; quickExit(EXIT_BADOPTIONS); } + + ret = storeMessageCompressionOptions(moe::startupOptionsParsed); + if (!ret.isOK()) { + std::cerr << ret.toString() << std::endl; + quickExit(EXIT_BADOPTIONS); + } + return Status::OK(); } } diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 4b41ec5d085..83fb5729e01 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -23,6 +23,7 @@ env.Library( '$BUILD_DIR/mongo/unittest/unittest', '$BUILD_DIR/mongo/util/foundation', '$BUILD_DIR/mongo/util/net/network', + '$BUILD_DIR/mongo/transport/message_compressor', ], ) @@ -99,16 +100,19 @@ env.CppUnitTest( LIBDEPS=[ 'transport_layer_mock', ], +) env.Library( target='message_compressor', source=[ - 'message_compressor_registry.cpp', 'message_compressor_manager.cpp', + 'message_compressor_registry.cpp', + 'message_compressor_snappy.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/util/options_parser/options_parser', + '$BUILD_DIR/third_party/shim_snappy', ] ) diff --git a/src/mongo/transport/message_compressor_base.h b/src/mongo/transport/message_compressor_base.h index c0a2c14b72d..a2c05a6069e 100644 --- a/src/mongo/transport/message_compressor_base.h +++ b/src/mongo/transport/message_compressor_base.h @@ -33,8 +33,17 @@ #include "mongo/base/string_data.h" #include "mongo/platform/atomic_word.h" +#include <type_traits> + namespace mongo { -using MessageCompressorId = uint8_t; +enum class MessageCompressor : uint8_t { + kNoop = 0, + kSnappy = 1, + kExtended = 255, +}; + +StringData getMessageCompressorName(MessageCompressor id); +using MessageCompressorId = std::underlying_type<MessageCompressor>::type; class MessageCompressorBase { MONGO_DISALLOW_COPYING(MessageCompressorBase); @@ -109,8 +118,9 @@ protected: /* * This is called by sub-classes to intialize their ID/name fields. */ - MessageCompressorBase(MessageCompressorId id, StringData name) - : _id{id}, _name{name.toString()} {} + MessageCompressorBase(MessageCompressor id) + : _id{static_cast<MessageCompressorId>(id)}, + _name{getMessageCompressorName(id).toString()} {} /* * Called by sub-classes to bump their bytesIn/bytesOut counters for compression diff --git a/src/mongo/transport/message_compressor_manager.cpp b/src/mongo/transport/message_compressor_manager.cpp index 42f3e9c8c0d..78fa19ad8a3 100644 --- a/src/mongo/transport/message_compressor_manager.cpp +++ b/src/mongo/transport/message_compressor_manager.cpp @@ -59,10 +59,10 @@ struct CompressionHeader { CompressionHeader(int32_t _opcode, int32_t _size, uint8_t _id) : originalOpCode{_opcode}, uncompressedSize{_size}, compressorId{_id} {} - CompressionHeader(ConstDataRangeCursor cursor) { - originalOpCode = cursor.readAndAdvance<LittleEndian<std::int32_t>>().getValue(); - uncompressedSize = cursor.readAndAdvance<LittleEndian<std::int32_t>>().getValue(); - compressorId = cursor.readAndAdvance<LittleEndian<uint8_t>>().getValue(); + CompressionHeader(ConstDataRangeCursor* cursor) { + originalOpCode = cursor->readAndAdvance<LittleEndian<std::int32_t>>().getValue(); + uncompressedSize = cursor->readAndAdvance<LittleEndian<std::int32_t>>().getValue(); + compressorId = cursor->readAndAdvance<LittleEndian<uint8_t>>().getValue(); } static size_t size() { @@ -93,6 +93,8 @@ StatusWith<Message> MessageCompressorManager::compressMessage(const Message& msg inputHeader.getNetworkOp(), inputHeader.dataLen(), compressor->getId()); if (bufferSize > MaxMessageSizeBytes) { + LOG(3) << "Compressed message would be larger than " << MaxMessageSizeBytes + << ", returning original uncompressed message"; return {msg}; } @@ -122,7 +124,7 @@ StatusWith<Message> MessageCompressorManager::compressMessage(const Message& msg StatusWith<Message> MessageCompressorManager::decompressMessage(const Message& msg) { auto inputHeader = msg.header(); ConstDataRangeCursor input(inputHeader.data(), inputHeader.data() + inputHeader.dataLen()); - CompressionHeader compressionHeader(input); + CompressionHeader compressionHeader(&input); auto compressor = _registry->getCompressor(compressionHeader.compressorId); if (!compressor) { @@ -145,6 +147,10 @@ StatusWith<Message> MessageCompressorManager::decompressMessage(const Message& m if (!sws.isOK()) return sws.getStatus(); + if (sws.getValue() != static_cast<std::size_t>(compressionHeader.uncompressedSize)) { + return {ErrorCodes::BadValue, "Decompressing message returned less data than expected"}; + } + outMessage.setLen(sws.getValue() + MsgData::MsgDataHeaderSize); return {Message(outputMessageBuffer)}; @@ -224,8 +230,8 @@ void MessageCompressorManager::serverNegotiate(const BSONObj& input, BSONObjBuil if ((cur = _registry->getCompressor(curName))) { LOG(3) << cur->getName() << " is supported"; _negotiated.push_back(cur); - } else { // Otherwise the compressor is not supported and we skip over it. - LOG(3) << cur->getName() << " is not supported"; + } else { // Otherwise the compressor is not supported and we skip over it. + LOG(3) << curName << " is not supported"; } } diff --git a/src/mongo/transport/message_compressor_manager.h b/src/mongo/transport/message_compressor_manager.h index a3d35027ffa..7af084996db 100644 --- a/src/mongo/transport/message_compressor_manager.h +++ b/src/mongo/transport/message_compressor_manager.h @@ -48,13 +48,16 @@ public: /* * Default constructor. Uses the global MessageCompressorRegistry. */ - explicit MessageCompressorManager(); + MessageCompressorManager(); /* * Constructs a manager from a specific MessageCompressorRegistry - used by the unit tests * to test various registry configurations. */ - MessageCompressorManager(MessageCompressorRegistry* factory); + explicit MessageCompressorManager(MessageCompressorRegistry* factory); + + MessageCompressorManager(MessageCompressorManager&&) = default; + MessageCompressorManager& operator=(MessageCompressorManager&&) = default; /* * Called by a client constructing an isMaster request. This function will append the result diff --git a/src/mongo/transport/message_compressor_manager_test.cpp b/src/mongo/transport/message_compressor_manager_test.cpp index c431f102e23..383bb1e3260 100644 --- a/src/mongo/transport/message_compressor_manager_test.cpp +++ b/src/mongo/transport/message_compressor_manager_test.cpp @@ -30,10 +30,11 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/stdx/memory.h" -#include "mongo/transport/message_compressor_registry.h" #include "mongo/transport/message_compressor_manager.h" #include "mongo/transport/message_compressor_noop.h" +#include "mongo/transport/message_compressor_registry.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/net/message.h" #include <string> #include <vector> @@ -49,7 +50,7 @@ MessageCompressorRegistry buildRegistry() { ret.registerImplementation(std::move(compressor)); ret.finalizeSupportedCompressors(); - return std::move(ret); + return ret; } void checkNegotiationResult(const BSONObj& result, const std::vector<std::string>& algos) { @@ -80,7 +81,57 @@ void checkServerNegotiation(const BSONObj& input, const std::vector<std::string> manager.serverNegotiate(input, &serverOutput); checkNegotiationResult(serverOutput.done(), expected); } -} // namespace + +void checkFidelity(const Message& msg, std::unique_ptr<MessageCompressorBase> compressor) { + MessageCompressorRegistry registry; + const auto originalView = msg.singleData(); + const auto compressorName = compressor->getName(); + + std::vector<std::string> compressorList = {compressorName}; + registry.setSupportedCompressors(std::move(compressorList)); + registry.registerImplementation(std::move(compressor)); + registry.finalizeSupportedCompressors(); + + MessageCompressorManager mgr(®istry); + auto negotiator = BSON("isMaster" << 1 << "compression" << BSON_ARRAY(compressorName)); + BSONObjBuilder negotiatorOut; + mgr.serverNegotiate(negotiator, &negotiatorOut); + checkNegotiationResult(negotiatorOut.done(), {compressorName}); + + auto swm = mgr.compressMessage(msg); + ASSERT_OK(swm.getStatus()); + auto compressedMsg = std::move(swm.getValue()); + const auto compressedMsgView = compressedMsg.singleData(); + + ASSERT_EQ(compressedMsgView.getId(), originalView.getId()); + ASSERT_EQ(compressedMsgView.getResponseToMsgId(), originalView.getResponseToMsgId()); + ASSERT_EQ(compressedMsgView.getNetworkOp(), dbCompressed); + + swm = mgr.decompressMessage(compressedMsg); + ASSERT_OK(swm.getStatus()); + auto decompressedMsg = std::move(swm.getValue()); + + const auto decompressedMsgView = decompressedMsg.singleData(); + ASSERT_EQ(decompressedMsgView.getId(), originalView.getId()); + ASSERT_EQ(decompressedMsgView.getResponseToMsgId(), originalView.getResponseToMsgId()); + ASSERT_EQ(decompressedMsgView.getNetworkOp(), originalView.getNetworkOp()); + ASSERT_EQ(decompressedMsgView.getLen(), originalView.getLen()); + + ASSERT_EQ(memcmp(decompressedMsgView.data(), originalView.data(), originalView.dataLen()), 0); +} + +Message buildMessage() { + const auto data = std::string{"Hello, world!"}; + const auto bufferSize = MsgData::MsgDataHeaderSize + data.size(); + auto buf = SharedBuffer::allocate(bufferSize); + MsgData::View testView(buf.get()); + testView.setId(123456); + testView.setResponseToMsgId(654321); + testView.setOperation(dbQuery); + testView.setLen(bufferSize); + memcpy(testView.data(), data.data(), data.size()); + return Message{buf}; +} TEST(MessageCompressorManager, NoCompressionRequested) { auto input = BSON("isMaster" << 1); @@ -120,4 +171,16 @@ TEST(MessageCompressorManager, FullNormalCompression) { clientManager.clientFinish(serverObj); } + +TEST(NoopMessageCompressor, Fidelity) { + auto testMessage = buildMessage(); + checkFidelity(testMessage, stdx::make_unique<NoopMessageCompressor>()); +} + +TEST(SnappyMessageCompressor, Fidelity) { + auto testMessage = buildMessage(); + checkFidelity(testMessage, stdx::make_unique<NoopMessageCompressor>()); +} + } // namespace mongo +} // namespace diff --git a/src/mongo/transport/message_compressor_noop.h b/src/mongo/transport/message_compressor_noop.h index 54a2c795f9d..b0602482b78 100644 --- a/src/mongo/transport/message_compressor_noop.h +++ b/src/mongo/transport/message_compressor_noop.h @@ -26,28 +26,26 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork - #include "mongo/transport/message_compressor_base.h" namespace mongo { class NoopMessageCompressor final : public MessageCompressorBase { public: - NoopMessageCompressor() : MessageCompressorBase(0, "noop") {} + NoopMessageCompressor() : MessageCompressorBase(MessageCompressor::kNoop) {} std::size_t getMaxCompressedSize(size_t inputSize) override { return inputSize; } StatusWith<std::size_t> compressData(ConstDataRange input, DataRange output) override { - memcpy(const_cast<char*>(output.data()), input.data(), input.length()); + output.write(input); counterHitCompress(input.length(), input.length()); return {input.length()}; } StatusWith<std::size_t> decompressData(ConstDataRange input, DataRange output) override { - memcpy(const_cast<char*>(output.data()), input.data(), input.length()); + output.write(input); counterHitDecompress(input.length(), input.length()); return {input.length()}; } diff --git a/src/mongo/transport/message_compressor_registry.cpp b/src/mongo/transport/message_compressor_registry.cpp index e64d73b063e..a9cf4cda0b9 100644 --- a/src/mongo/transport/message_compressor_registry.cpp +++ b/src/mongo/transport/message_compressor_registry.cpp @@ -33,6 +33,7 @@ #include "mongo/base/init.h" #include "mongo/stdx/memory.h" #include "mongo/transport/message_compressor_noop.h" +#include "mongo/transport/message_compressor_snappy.h" #include "mongo/util/options_parser/option_section.h" #include <boost/algorithm/string/classification.hpp> @@ -40,6 +41,18 @@ namespace mongo { +StringData getMessageCompressorName(MessageCompressor id) { + switch (id) { + case MessageCompressor::kNoop: + return "noop"_sd; + case MessageCompressor::kSnappy: + return "snappy"_sd; + default: + fassert(40269, "Invalid message compressor ID"); + } + MONGO_UNREACHABLE; +} + MessageCompressorRegistry& MessageCompressorRegistry::get() { static MessageCompressorRegistry globalRegistry; return globalRegistry; @@ -48,7 +61,7 @@ MessageCompressorRegistry& MessageCompressorRegistry::get() { void MessageCompressorRegistry::registerImplementation( std::unique_ptr<MessageCompressorBase> impl) { // It's an error to register a compressor that's already been registered - fassert(40254, + fassert(40270, _compressorsByName.find(impl->getName()) == _compressorsByName.end() && _compressorsByIds[impl->getId()] == nullptr); @@ -61,13 +74,15 @@ void MessageCompressorRegistry::registerImplementation( _compressorsByIds[impl->getId()] = std::move(impl); } -void MessageCompressorRegistry::finalizeSupportedCompressors() { - // Remove compressor names from the compressorNames list if they were never registered. - // This prevents _compressorNames from having totally bogus names specified by users. - std::remove_if( - _compressorNames.begin(), _compressorNames.end(), [this](const std::string& name) { - return _compressorsByName.find(name) == _compressorsByName.end(); - }); +Status MessageCompressorRegistry::finalizeSupportedCompressors() { + for (auto it = _compressorNames.begin(); it != _compressorNames.end(); ++it) { + if (_compressorsByName.find(*it) == _compressorsByName.end()) { + std::stringstream ss; + ss << "Invalid network message compressor specified in configuration: " << *it; + return {ErrorCodes::BadValue, ss.str()}; + } + } + return Status::OK(); } const std::vector<std::string>& MessageCompressorRegistry::getCompressorNames() const { @@ -119,7 +134,7 @@ Status storeMessageCompressionOptions(const moe::Environment& params) { // This instantiates and registers the "noop" compressor. It must happen after option storage // because that's when the configuration of the compressors gets set. MONGO_INITIALIZER_GENERAL(NoopMessageCompressorInit, - ("EndStartupOptionHandling"), + ("EndStartupOptionStorage"), ("AllCompressorsRegistered")) (InitializerContext* context) { auto& compressorRegistry = MessageCompressorRegistry::get(); @@ -131,7 +146,6 @@ MONGO_INITIALIZER_GENERAL(NoopMessageCompressorInit, // any compressor. It must be run after all the compressors have registered themselves with // the global registry. MONGO_INITIALIZER(AllCompressorsRegistered)(InitializerContext* context) { - MessageCompressorRegistry::get().finalizeSupportedCompressors(); - return Status::OK(); + return MessageCompressorRegistry::get().finalizeSupportedCompressors(); } } // namespace mongo diff --git a/src/mongo/transport/message_compressor_registry.h b/src/mongo/transport/message_compressor_registry.h index 721185cfb1c..9d8549ed0e3 100644 --- a/src/mongo/transport/message_compressor_registry.h +++ b/src/mongo/transport/message_compressor_registry.h @@ -107,7 +107,7 @@ public: * calls to registerImplementation. It will remove any compressor names that aren't keys in * the _compressors map. */ - void finalizeSupportedCompressors(); + Status finalizeSupportedCompressors(); private: StringMap<MessageCompressorBase*> _compressorsByName; diff --git a/src/mongo/transport/message_compressor_registry_test.cpp b/src/mongo/transport/message_compressor_registry_test.cpp index b3a766f2f55..a14f067e606 100644 --- a/src/mongo/transport/message_compressor_registry_test.cpp +++ b/src/mongo/transport/message_compressor_registry_test.cpp @@ -73,15 +73,17 @@ TEST(MessageCompressorRegistry, NothingRegistered) { TEST(MessageCompressorRegistry, SetSupported) { MessageCompressorRegistry registry; auto compressor = stdx::make_unique<NoopMessageCompressor>(); - auto compressorPtr = compressor.get(); + auto compressorId = compressor->getId(); + auto compressorName = compressor->getName(); std::vector<std::string> compressorList = {"foobar"}; registry.setSupportedCompressors(std::move(compressorList)); registry.registerImplementation(std::move(compressor)); - registry.finalizeSupportedCompressors(); + auto ret = registry.finalizeSupportedCompressors(); + ASSERT_NOT_OK(ret); - ASSERT_NULL(registry.getCompressor(compressorPtr->getName())); - ASSERT_NULL(registry.getCompressor(compressorPtr->getId())); + ASSERT_NULL(registry.getCompressor(compressorId)); + ASSERT_NULL(registry.getCompressor(compressorName)); } } // namespace } // namespace mongo diff --git a/src/mongo/transport/message_compressor_snappy.cpp b/src/mongo/transport/message_compressor_snappy.cpp new file mode 100644 index 00000000000..db1e0c9dfca --- /dev/null +++ b/src/mongo/transport/message_compressor_snappy.cpp @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/base/init.h" +#include "mongo/stdx/memory.h" +#include "mongo/transport/message_compressor_registry.h" +#include "mongo/transport/message_compressor_snappy.h" + +#include "third_party/snappy-1.1.3/snappy.h" + +namespace mongo { + +SnappyMessageCompressor::SnappyMessageCompressor() + : MessageCompressorBase(MessageCompressor::kSnappy) {} + +std::size_t SnappyMessageCompressor::getMaxCompressedSize(size_t inputSize) { + return snappy::MaxCompressedLength(inputSize); +} + +StatusWith<std::size_t> SnappyMessageCompressor::compressData(ConstDataRange input, + DataRange output) { + size_t outLength; + snappy::RawCompress(input.data(), input.length(), const_cast<char*>(output.data()), &outLength); + + counterHitCompress(input.length(), outLength); + return {outLength}; +} + +StatusWith<std::size_t> SnappyMessageCompressor::decompressData(ConstDataRange input, + DataRange output) { + bool ret = + snappy::RawUncompress(input.data(), input.length(), const_cast<char*>(output.data())); + + if (!ret) { + return Status{ErrorCodes::BadValue, "Compressed message was invalid or corrupted"}; + } + + counterHitDecompress(input.length(), output.length()); + return output.length(); +} + + +MONGO_INITIALIZER_GENERAL(SnappyMessageCompressorInit, + ("EndStartupOptionHandling"), + ("AllCompressorsRegistered")) +(InitializerContext* context) { + auto& compressorRegistry = MessageCompressorRegistry::get(); + compressorRegistry.registerImplementation(stdx::make_unique<SnappyMessageCompressor>()); + return Status::OK(); +} +} // namespace mongo diff --git a/src/mongo/transport/message_compressor_snappy.h b/src/mongo/transport/message_compressor_snappy.h new file mode 100644 index 00000000000..3521370df09 --- /dev/null +++ b/src/mongo/transport/message_compressor_snappy.h @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/transport/message_compressor_base.h" + +namespace mongo { +class SnappyMessageCompressor final : public MessageCompressorBase { +public: + SnappyMessageCompressor(); + + std::size_t getMaxCompressedSize(size_t inputSize) override; + + StatusWith<std::size_t> compressData(ConstDataRange input, DataRange output) override; + + StatusWith<std::size_t> decompressData(ConstDataRange input, DataRange output) override; +}; + + +} // namespace mongo diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp index efd43fd9a9b..841851d5ccd 100644 --- a/src/mongo/transport/service_entry_point_test_suite.cpp +++ b/src/mongo/transport/service_entry_point_test_suite.cpp @@ -127,13 +127,13 @@ ServiceEntryPointTestSuite::MockTLHarness::MockTLHarness() _asyncWait(kDefaultAsyncWait), _end(kDefaultEnd) {} -Ticket ServiceEntryPointTestSuite::MockTLHarness::sourceMessage(const Session& session, +Ticket ServiceEntryPointTestSuite::MockTLHarness::sourceMessage(Session& session, Message* message, Date_t expiration) { return _sourceMessage(session, message, expiration); } -Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(const Session& session, +Ticket ServiceEntryPointTestSuite::MockTLHarness::sinkMessage(Session& session, const Message& message, Date_t expiration) { return _sinkMessage(session, message, expiration); @@ -191,19 +191,17 @@ Status ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError(transport:: return _defaultWait(std::move(ticket)); } -Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(const Session& s, - Message* m, - Date_t d) { +Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSource(Session& s, Message* m, Date_t d) { return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, m, d)); } -Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(const Session& s, +Ticket ServiceEntryPointTestSuite::MockTLHarness::_defaultSink(Session& s, const Message&, Date_t d) { return Ticket(this, stdx::make_unique<ServiceEntryPointTestSuite::MockTicket>(s, d)); } -Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(const Session& s, +Ticket ServiceEntryPointTestSuite::MockTLHarness::_sinkThenErrorOnWait(Session& s, const Message& m, Date_t d) { _wait = stdx::bind(&ServiceEntryPointTestSuite::MockTLHarness::_waitOnceThenError, this, _1); @@ -264,7 +262,7 @@ void ServiceEntryPointTestSuite::halfLifeCycleTest() { // Step 1: SEP gets a ticket to source a Message // Step 2: SEP calls wait() on the ticket and receives a Message // Step 3: SEP gets a ticket to sink a Message - _tl->_sinkMessage = [this](const Session& session, const Message& m, Date_t expiration) { + _tl->_sinkMessage = [this](Session& session, const Message& m, Date_t expiration) { // Step 4: SEP calls wait() on the ticket and receives an error _tl->_wait = diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h index 623f0533267..2249c86b9bd 100644 --- a/src/mongo/transport/service_entry_point_test_suite.h +++ b/src/mongo/transport/service_entry_point_test_suite.h @@ -120,11 +120,11 @@ private: MockTLHarness(); transport::Ticket sourceMessage( - const transport::Session& session, + transport::Session& session, Message* message, Date_t expiration = transport::Ticket::kNoExpirationDate) override; transport::Ticket sinkMessage( - const transport::Session& session, + transport::Session& session, const Message& message, Date_t expiration = transport::Ticket::kNoExpirationDate) override; Status wait(transport::Ticket&& ticket) override; @@ -141,10 +141,8 @@ private: ServiceEntryPointTestSuite::MockTicket* getMockTicket(const transport::Ticket& ticket); // Mocked method hooks - stdx::function<transport::Ticket(const transport::Session&, Message*, Date_t)> - _sourceMessage; - stdx::function<transport::Ticket(const transport::Session&, const Message&, Date_t)> - _sinkMessage; + stdx::function<transport::Ticket(transport::Session&, Message*, Date_t)> _sourceMessage; + stdx::function<transport::Ticket(transport::Session&, const Message&, Date_t)> _sinkMessage; stdx::function<Status(transport::Ticket)> _wait; stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait; stdx::function<void(const transport::Session&)> _end; @@ -154,11 +152,9 @@ private: stdx::function<void(void)> _shutdown = [] {}; // Pre-set hook methods - transport::Ticket _defaultSource(const transport::Session& s, Message* m, Date_t d); - transport::Ticket _defaultSink(const transport::Session& s, const Message&, Date_t d); - transport::Ticket _sinkThenErrorOnWait(const transport::Session& s, - const Message& m, - Date_t d); + transport::Ticket _defaultSource(transport::Session& s, Message* m, Date_t d); + transport::Ticket _defaultSink(transport::Session& s, const Message&, Date_t d); + transport::Ticket _sinkThenErrorOnWait(transport::Session& s, const Message& m, Date_t d); Status _defaultWait(transport::Ticket ticket); Status _waitError(transport::Ticket ticket); diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 8551f607037..c7ec5cd28f0 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/transport/message_compressor_manager.h" #include "mongo/transport/session_id.h" #include "mongo/transport/ticket.h" #include "mongo/util/net/hostandport.h" @@ -152,6 +153,10 @@ public: return _ended; } + MessageCompressorManager& getCompressorManager() { + return _messageCompressorManager; + } + private: bool _ended = false; @@ -163,6 +168,8 @@ private: TagMask _tags; TransportLayer* _tl; + + MessageCompressorManager _messageCompressorManager; }; } // namespace transport diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index 44995fc375b..3634ec68de5 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -98,7 +98,7 @@ public: * TransportLayer is unable to source a Message, this will be a failed status, * and the passed-in Message buffer may be left in an invalid state. */ - virtual Ticket sourceMessage(const Session& session, + virtual Ticket sourceMessage(Session& session, Message* message, Date_t expiration = Ticket::kNoExpirationDate) = 0; @@ -117,7 +117,7 @@ public: * This method does NOT take ownership of the sunk Message, which must be cleaned * up by the caller. */ - virtual Ticket sinkMessage(const Session& session, + virtual Ticket sinkMessage(Session& session, const Message& message, Date_t expiration = Ticket::kNoExpirationDate) = 0; diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp index 8d1f95b6a23..3c004eb793f 100644 --- a/src/mongo/transport/transport_layer_legacy.cpp +++ b/src/mongo/transport/transport_layer_legacy.cpp @@ -98,13 +98,19 @@ Status TransportLayerLegacy::start() { TransportLayerLegacy::~TransportLayerLegacy() = default; -Ticket TransportLayerLegacy::sourceMessage(const Session& session, - Message* message, - Date_t expiration) { - auto sourceCb = [message](AbstractMessagingPort* amp) -> Status { +Ticket TransportLayerLegacy::sourceMessage(Session& session, Message* message, Date_t expiration) { + auto& compressorMgr = session.getCompressorManager(); + auto sourceCb = [message, &compressorMgr](AbstractMessagingPort* amp) -> Status { if (!amp->recv(*message)) { return {ErrorCodes::HostUnreachable, "Recv failed"}; } + + if (message->operation() == dbCompressed) { + auto swm = compressorMgr.decompressMessage(*message); + if (!swm.isOK()) + return swm.getStatus(); + *message = swm.getValue(); + } return Status::OK(); }; @@ -137,12 +143,18 @@ TransportLayer::Stats TransportLayerLegacy::sessionStats() { return stats; } -Ticket TransportLayerLegacy::sinkMessage(const Session& session, +Ticket TransportLayerLegacy::sinkMessage(Session& session, const Message& message, Date_t expiration) { - auto sinkCb = [&message](AbstractMessagingPort* amp) -> Status { + auto& compressorMgr = session.getCompressorManager(); + auto sinkCb = [&message, &compressorMgr](AbstractMessagingPort* amp) -> Status { try { - amp->say(message); + auto swm = compressorMgr.compressMessage(message); + if (!swm.isOK()) + return swm.getStatus(); + const auto& compressedMessage = swm.getValue(); + amp->say(compressedMessage); + return Status::OK(); } catch (const SocketException& e) { return {ErrorCodes::HostUnreachable, e.what()}; diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h index 1ef193b4754..7472212ffc9 100644 --- a/src/mongo/transport/transport_layer_legacy.h +++ b/src/mongo/transport/transport_layer_legacy.h @@ -67,11 +67,11 @@ public: Status setup(); Status start() override; - Ticket sourceMessage(const Session& session, + Ticket sourceMessage(Session& session, Message* message, Date_t expiration = Ticket::kNoExpirationDate) override; - Ticket sinkMessage(const Session& session, + Ticket sinkMessage(Session& session, const Message& message, Date_t expiration = Ticket::kNoExpirationDate) override; diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index 7ba1797a21f..e513155e5cd 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -43,13 +43,11 @@ namespace transport { TransportLayerManager::TransportLayerManager() = default; -Ticket TransportLayerManager::sourceMessage(const Session& session, - Message* message, - Date_t expiration) { +Ticket TransportLayerManager::sourceMessage(Session& session, Message* message, Date_t expiration) { return session.getTransportLayer()->sourceMessage(session, message, expiration); } -Ticket TransportLayerManager::sinkMessage(const Session& session, +Ticket TransportLayerManager::sinkMessage(Session& session, const Message& message, Date_t expiration) { return session.getTransportLayer()->sinkMessage(session, message, expiration); diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index aeed86edcfe..20d27d6571c 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -54,10 +54,10 @@ class TransportLayerManager final : public TransportLayer { public: TransportLayerManager(); - Ticket sourceMessage(const Session& session, + Ticket sourceMessage(Session& session, Message* message, Date_t expiration = Ticket::kNoExpirationDate) override; - Ticket sinkMessage(const Session& session, + Ticket sinkMessage(Session& session, const Message& message, Date_t expiration = Ticket::kNoExpirationDate) override; diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp index 3f71b5d16e0..e7fa76d2e9b 100644 --- a/src/mongo/transport/transport_layer_mock.cpp +++ b/src/mongo/transport/transport_layer_mock.cpp @@ -64,9 +64,7 @@ boost::optional<Message*> TransportLayerMock::TicketMock::msg() const { TransportLayerMock::TransportLayerMock() : _shutdown(false) {} -Ticket TransportLayerMock::sourceMessage(const Session& session, - Message* message, - Date_t expiration) { +Ticket TransportLayerMock::sourceMessage(Session& session, Message* message, Date_t expiration) { if (inShutdown()) { return Ticket(TransportLayer::ShutdownStatus); } else if (!owns(session.id())) { @@ -79,7 +77,7 @@ Ticket TransportLayerMock::sourceMessage(const Session& session, stdx::make_unique<TransportLayerMock::TicketMock>(&session, message, expiration)); } -Ticket TransportLayerMock::sinkMessage(const Session& session, +Ticket TransportLayerMock::sinkMessage(Session& session, const Message& message, Date_t expiration) { if (inShutdown()) { diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h index f519713e9bc..38ab3eed0f1 100644 --- a/src/mongo/transport/transport_layer_mock.h +++ b/src/mongo/transport/transport_layer_mock.h @@ -76,10 +76,10 @@ public: TransportLayerMock(); ~TransportLayerMock(); - Ticket sourceMessage(const Session& session, + Ticket sourceMessage(Session& session, Message* message, Date_t expiration = Ticket::kNoExpirationDate) override; - Ticket sinkMessage(const Session& session, + Ticket sinkMessage(Session& session, const Message& message, Date_t expiration = Ticket::kNoExpirationDate) override; diff --git a/src/mongo/util/net/abstract_message_port.h b/src/mongo/util/net/abstract_message_port.h index e4884e75133..10860552a7d 100644 --- a/src/mongo/util/net/abstract_message_port.h +++ b/src/mongo/util/net/abstract_message_port.h @@ -86,7 +86,7 @@ public: /** * Sends the message. */ - virtual void say(Message& toSend, int responseTo = 0) = 0; + virtual void say(Message& toSend, int responseTo) = 0; /** * Sends the message (does not set headers). |