From 236bf5f077eb16f7bf8f0f7c5f103240bd061f12 Mon Sep 17 00:00:00 2001 From: Jason Carey Date: Mon, 30 Apr 2018 15:21:45 -0400 Subject: SERVER-34506 TLASIO test for isJustForContinuation Adding an integration test for transport layer asio which uses a fail point to induce single byte at a time reads and writes. We use this, along with a debug block in the future header, to ensure that the continuation folding in futures is working properly with opportunisticRead/Write chaining. To test the other side of this, adding a new test command called "echo" which returns the command object passed to it (to allow for a large message body on the read and write path more easily). --- jstests/core/views/views_all_commands.js | 1 + .../database_and_shard_versioning_all_commands.js | 1 + .../sharding/safe_secondary_reads_drop_recreate.js | 1 + ...eads_single_migration_suspend_range_deletion.js | 1 + ...condary_reads_single_migration_waitForDelete.js | 1 + src/mongo/db/commands/generic.cpp | 48 ++++++++++++++++++ src/mongo/transport/SConscript | 4 ++ src/mongo/transport/session_asio.h | 41 ++++++++++++++- src/mongo/transport/transport_layer_asio.h | 4 ++ .../transport_layer_asio_integration_test.cpp | 58 ++++++++++++++++++++++ src/mongo/util/future.h | 22 ++++++++ 11 files changed, 180 insertions(+), 2 deletions(-) diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index 9d199e747ee..3f94ed4d176 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -227,6 +227,7 @@ } }, dropUser: {skip: isUnrelated}, + echo: {skip: isUnrelated}, emptycapped: { command: {emptycapped: "view"}, expectFailure: true, diff --git a/jstests/sharding/database_and_shard_versioning_all_commands.js b/jstests/sharding/database_and_shard_versioning_all_commands.js index 943f281f29e..65fdb8ab2e4 100644 --- a/jstests/sharding/database_and_shard_versioning_all_commands.js +++ b/jstests/sharding/database_and_shard_versioning_all_commands.js @@ -194,6 +194,7 @@ }, dropRole: {skip: "always targets the config server"}, dropUser: {skip: "always targets the config server"}, + echo: {skip: "does not forward command to primary shard"}, enableSharding: {skip: "does not forward command to primary shard"}, endSessions: {skip: "goes through the cluster write path"}, eval: { diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js index a602afa1d5e..5f8d40e27e8 100644 --- a/jstests/sharding/safe_secondary_reads_drop_recreate.js +++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js @@ -147,6 +147,7 @@ dropIndexes: {skip: "primary only"}, dropRole: {skip: "primary only"}, dropUser: {skip: "primary only"}, + echo: {skip: "does not return user data"}, emptycapped: {skip: "primary only"}, enableSharding: {skip: "primary only"}, endSessions: {skip: "does not return user data"}, diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js index 96be106e933..a3b3d9709f8 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js @@ -171,6 +171,7 @@ dropIndexes: {skip: "primary only"}, dropRole: {skip: "primary only"}, dropUser: {skip: "primary only"}, + echo: {skip: "does not return user data"}, emptycapped: {skip: "primary only"}, enableSharding: {skip: "primary only"}, endSessions: {skip: "does not return user data"}, diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js index ac110e3b7bb..d59eb48b653 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js @@ -149,6 +149,7 @@ dropIndexes: {skip: "primary only"}, dropRole: {skip: "primary only"}, dropUser: {skip: "primary only"}, + echo: {skip: "does not return user data"}, emptycapped: {skip: "primary only"}, enableSharding: {skip: "primary only"}, endSessions: {skip: "does not return user data"}, diff --git a/src/mongo/db/commands/generic.cpp b/src/mongo/db/commands/generic.cpp index 20a5c03543e..e157ff5a64a 100644 --- a/src/mongo/db/commands/generic.cpp +++ b/src/mongo/db/commands/generic.cpp @@ -116,6 +116,54 @@ public: } } pingCmd; +class EchoCommand final : public TypedCommand { +public: + struct Request { + static constexpr auto kCommandName = "echo"_sd; + static Request parse(const IDLParserErrorContext&, const OpMsgRequest& request) { + return Request{request}; + } + + const OpMsgRequest& request; + }; + + class Invocation final : public MinimalInvocationBase { + public: + using MinimalInvocationBase::MinimalInvocationBase; + + private: + bool supportsWriteConcern() const override { + return false; + } + + void doCheckAuthorization(OperationContext* opCtx) const override {} + + NamespaceString ns() const override { + return NamespaceString(request().request.getDatabase()); + } + + void run(OperationContext* opCtx, CommandReplyBuilder* result) override { + result->append("echo", request().request.body); + } + }; + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kAlways; + } + + bool requiresAuth() const override { + return false; + } +}; +constexpr StringData EchoCommand::Request::kCommandName; + +MONGO_INITIALIZER(RegisterEcho)(InitializerContext* context) { + if (getTestCommandsEnabled()) { + new EchoCommand(); + } + return Status::OK(); +} + class ListCommandsCmd : public BasicCommand { public: std::string help() const override { diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index c56be5c9845..4c56abf9262 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -100,8 +100,12 @@ tlEnv.CppIntegrationTest( 'transport_layer_asio_integration_test.cpp', ], LIBDEPS=[ + 'transport_layer_egress_init', '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/client/async_client', + '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/util/net/network', + '$BUILD_DIR/mongo/util/version_impl', '$BUILD_DIR/third_party/shim_asio', ], ) diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index ea10ec61af4..5f02ec48eb8 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -36,6 +36,7 @@ #include "mongo/transport/asio_utils.h" #include "mongo/transport/baton.h" #include "mongo/transport/transport_layer_asio.h" +#include "mongo/util/fail_point.h" #include "mongo/util/net/sock.h" #ifdef MONGO_CONFIG_SSL #include "mongo/util/net/ssl_manager.h" @@ -50,6 +51,8 @@ namespace mongo { namespace transport { +MONGO_FP_DECLARE(transportLayerASIOshortOpportunisticReadWrite); + template auto futurize(const std::error_code& ec, SuccessValue&& successValue) { using Result = Future>; @@ -413,7 +416,24 @@ private: const MutableBufferSequence& buffers, const transport::BatonHandle& baton = nullptr) { std::error_code ec; - auto size = asio::read(stream, buffers, ec); + size_t size; + + if (MONGO_FAIL_POINT(transportLayerASIOshortOpportunisticReadWrite) && + _blockingMode == Async) { + asio::mutable_buffer localBuffer = buffers; + + if (buffers.size()) { + localBuffer = asio::mutable_buffer(buffers.data(), 1); + } + + size = asio::read(stream, localBuffer, ec); + if (!ec && buffers.size() > 1) { + ec = asio::error::would_block; + } + } else { + size = asio::read(stream, buffers, ec); + } + if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) && (_blockingMode == Async)) { // asio::read is a loop internally, so some of buffers may have been read into already. @@ -474,7 +494,24 @@ private: const ConstBufferSequence& buffers, const transport::BatonHandle& baton = nullptr) { std::error_code ec; - auto size = asio::write(stream, buffers, ec); + std::size_t size; + + if (MONGO_FAIL_POINT(transportLayerASIOshortOpportunisticReadWrite) && + _blockingMode == Async) { + asio::const_buffer localBuffer = buffers; + + if (buffers.size()) { + localBuffer = asio::const_buffer(buffers.data(), 1); + } + + size = asio::write(stream, localBuffer, ec); + if (!ec && buffers.size() > 1) { + ec = asio::error::would_block; + } + } else { + size = asio::write(stream, buffers, ec); + } + if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) && (_blockingMode == Async)) { diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 2e774210c55..fc98263c2ce 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -40,6 +40,7 @@ #include "mongo/stdx/thread.h" #include "mongo/transport/transport_layer.h" #include "mongo/transport/transport_mode.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/net/ssl_options.h" #include "mongo/util/net/ssl_types.h" @@ -66,6 +67,9 @@ class ServiceEntryPoint; namespace transport { +// This fail point simulates reads and writes that always return 1 byte and fail with EAGAIN +MONGO_FP_FORWARD_DECLARE(transportLayerASIOshortOpportunisticReadWrite); + /** * A TransportLayer implementation based on ASIO networking primitives. */ diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp index a28eac3b322..cebbdb64650 100644 --- a/src/mongo/transport/transport_layer_asio_integration_test.cpp +++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp @@ -30,9 +30,17 @@ #include "mongo/platform/basic.h" +#include "mongo/client/async_client.h" #include "mongo/client/connection_string.h" +#include "mongo/db/client.h" +#include "mongo/db/service_context.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/transport/transport_layer_asio.h" #include "mongo/unittest/integration_test.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "asio.hpp" @@ -77,5 +85,55 @@ TEST(TransportLayerASIO, HTTPRequestGetsHTTPError) { #endif } +// This test forces reads and writes to occur one byte at a time, verifying SERVER-34506 (the +// isJustForContinuation optimization works). +// +// Because of the file size limit, it's only an effective check on debug builds (where the future +// implementation checks the length of the future chain). +TEST(TransportLayerASIO, ShortReadsAndWritesWork) { + const auto assertOK = [](executor::RemoteCommandResponse reply) { + ASSERT_OK(reply.status); + ASSERT(reply.data["ok"]) << reply.data; + }; + + auto connectionString = unittest::getFixtureConnectionString(); + auto server = connectionString.getServers().front(); + + auto sc = getGlobalServiceContext(); + auto reactor = sc->getTransportLayer()->getReactor(transport::TransportLayer::kEgress); + + stdx::thread thread([&] { reactor->run(); }); + const auto threadGuard = MakeGuard([&] { + reactor->stop(); + thread.join(); + }); + + AsyncDBClient::Handle handle = + AsyncDBClient::connect(server, transport::kGlobalSSLMode, sc, reactor).get(); + + handle->initWireVersion(__FILE__, nullptr).get(); + + FailPointEnableBlock fp("transportLayerASIOshortOpportunisticReadWrite"); + + const executor::RemoteCommandRequest ecr{ + server, "admin", BSON("echo" << std::string(1 << 10, 'x')), BSONObj(), nullptr}; + + assertOK(handle->runCommandRequest(ecr).get()); + + auto client = sc->makeClient(__FILE__); + auto opCtx = client->makeOperationContext(); + + if (auto baton = sc->getTransportLayer()->makeBaton(opCtx.get())) { + auto future = handle->runCommandRequest(ecr, baton); + const auto batonGuard = MakeGuard([&] { baton->detach(); }); + + while (!future.isReady()) { + baton->run(nullptr, boost::none); + } + + assertOK(future.get()); + } +} + } // namespace } // namespace mongo diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h index 2b4d2c9c4f4..7a8fece88ab 100644 --- a/src/mongo/util/future.h +++ b/src/mongo/util/future.h @@ -41,6 +41,7 @@ #include "mongo/stdx/mutex.h" #include "mongo/stdx/utility.h" #include "mongo/util/assert_util.h" +#include "mongo/util/debug_util.h" #include "mongo/util/intrusive_counter.h" #include "mongo/util/scopeguard.h" @@ -356,6 +357,27 @@ public: dassert(oldState == SSBState::kWaiting); + DEV { + // If you hit this limit one of two things has probably happened + // + // 1. The justForContinuation optimization isn't working. + // 2. You may be creating a variable length chain. + // + // If those statements don't mean anything to you, please ask an editor of this file. + // If they don't work here anymore, I'm sorry. + const size_t kMaxDepth = 32; + + size_t depth = 0; + for (auto ssb = continuation.get(); ssb; + ssb = ssb->state.load(std::memory_order_acquire) == SSBState::kWaiting + ? ssb->continuation.get() + : nullptr) { + depth++; + + invariant(depth < kMaxDepth); + } + } + if (callback) { callback(this); } -- cgit v1.2.1