summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-04-30 15:21:45 -0400
committerJason Carey <jcarey@argv.me>2018-05-04 12:18:23 -0400
commit236bf5f077eb16f7bf8f0f7c5f103240bd061f12 (patch)
treebbf85db3916634483f8123fe60c2ccc1ab3eed63
parent3539922e751fc79f536a43aae6da422e5a4a03d0 (diff)
downloadmongo-236bf5f077eb16f7bf8f0f7c5f103240bd061f12.tar.gz
SERVER-34506 TLASIO test for isJustForContinuation
Adding an integration test for transport layer asio which uses a fail point to induce single byte at a time reads and writes. We use this, along with a debug block in the future header, to ensure that the continuation folding in futures is working properly with opportunisticRead/Write chaining. To test the other side of this, adding a new test command called "echo" which returns the command object passed to it (to allow for a large message body on the read and write path more easily).
-rw-r--r--jstests/core/views/views_all_commands.js1
-rw-r--r--jstests/sharding/database_and_shard_versioning_all_commands.js1
-rw-r--r--jstests/sharding/safe_secondary_reads_drop_recreate.js1
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js1
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js1
-rw-r--r--src/mongo/db/commands/generic.cpp48
-rw-r--r--src/mongo/transport/SConscript4
-rw-r--r--src/mongo/transport/session_asio.h41
-rw-r--r--src/mongo/transport/transport_layer_asio.h4
-rw-r--r--src/mongo/transport/transport_layer_asio_integration_test.cpp58
-rw-r--r--src/mongo/util/future.h22
11 files changed, 180 insertions, 2 deletions
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index 9d199e747ee..3f94ed4d176 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -227,6 +227,7 @@
}
},
dropUser: {skip: isUnrelated},
+ echo: {skip: isUnrelated},
emptycapped: {
command: {emptycapped: "view"},
expectFailure: true,
diff --git a/jstests/sharding/database_and_shard_versioning_all_commands.js b/jstests/sharding/database_and_shard_versioning_all_commands.js
index 943f281f29e..65fdb8ab2e4 100644
--- a/jstests/sharding/database_and_shard_versioning_all_commands.js
+++ b/jstests/sharding/database_and_shard_versioning_all_commands.js
@@ -194,6 +194,7 @@
},
dropRole: {skip: "always targets the config server"},
dropUser: {skip: "always targets the config server"},
+ echo: {skip: "does not forward command to primary shard"},
enableSharding: {skip: "does not forward command to primary shard"},
endSessions: {skip: "goes through the cluster write path"},
eval: {
diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js
index a602afa1d5e..5f8d40e27e8 100644
--- a/jstests/sharding/safe_secondary_reads_drop_recreate.js
+++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js
@@ -147,6 +147,7 @@
dropIndexes: {skip: "primary only"},
dropRole: {skip: "primary only"},
dropUser: {skip: "primary only"},
+ echo: {skip: "does not return user data"},
emptycapped: {skip: "primary only"},
enableSharding: {skip: "primary only"},
endSessions: {skip: "does not return user data"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
index 96be106e933..a3b3d9709f8 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
@@ -171,6 +171,7 @@
dropIndexes: {skip: "primary only"},
dropRole: {skip: "primary only"},
dropUser: {skip: "primary only"},
+ echo: {skip: "does not return user data"},
emptycapped: {skip: "primary only"},
enableSharding: {skip: "primary only"},
endSessions: {skip: "does not return user data"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
index ac110e3b7bb..d59eb48b653 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
@@ -149,6 +149,7 @@
dropIndexes: {skip: "primary only"},
dropRole: {skip: "primary only"},
dropUser: {skip: "primary only"},
+ echo: {skip: "does not return user data"},
emptycapped: {skip: "primary only"},
enableSharding: {skip: "primary only"},
endSessions: {skip: "does not return user data"},
diff --git a/src/mongo/db/commands/generic.cpp b/src/mongo/db/commands/generic.cpp
index 20a5c03543e..e157ff5a64a 100644
--- a/src/mongo/db/commands/generic.cpp
+++ b/src/mongo/db/commands/generic.cpp
@@ -116,6 +116,54 @@ public:
}
} pingCmd;
+class EchoCommand final : public TypedCommand<EchoCommand> {
+public:
+ struct Request {
+ static constexpr auto kCommandName = "echo"_sd;
+ static Request parse(const IDLParserErrorContext&, const OpMsgRequest& request) {
+ return Request{request};
+ }
+
+ const OpMsgRequest& request;
+ };
+
+ class Invocation final : public MinimalInvocationBase {
+ public:
+ using MinimalInvocationBase::MinimalInvocationBase;
+
+ private:
+ bool supportsWriteConcern() const override {
+ return false;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {}
+
+ NamespaceString ns() const override {
+ return NamespaceString(request().request.getDatabase());
+ }
+
+ void run(OperationContext* opCtx, CommandReplyBuilder* result) override {
+ result->append("echo", request().request.body);
+ }
+ };
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kAlways;
+ }
+
+ bool requiresAuth() const override {
+ return false;
+ }
+};
+constexpr StringData EchoCommand::Request::kCommandName;
+
+MONGO_INITIALIZER(RegisterEcho)(InitializerContext* context) {
+ if (getTestCommandsEnabled()) {
+ new EchoCommand();
+ }
+ return Status::OK();
+}
+
class ListCommandsCmd : public BasicCommand {
public:
std::string help() const override {
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index c56be5c9845..4c56abf9262 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -100,8 +100,12 @@ tlEnv.CppIntegrationTest(
'transport_layer_asio_integration_test.cpp',
],
LIBDEPS=[
+ 'transport_layer_egress_init',
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/async_client',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/util/net/network',
+ '$BUILD_DIR/mongo/util/version_impl',
'$BUILD_DIR/third_party/shim_asio',
],
)
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index ea10ec61af4..5f02ec48eb8 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -36,6 +36,7 @@
#include "mongo/transport/asio_utils.h"
#include "mongo/transport/baton.h"
#include "mongo/transport/transport_layer_asio.h"
+#include "mongo/util/fail_point.h"
#include "mongo/util/net/sock.h"
#ifdef MONGO_CONFIG_SSL
#include "mongo/util/net/ssl_manager.h"
@@ -50,6 +51,8 @@
namespace mongo {
namespace transport {
+MONGO_FP_DECLARE(transportLayerASIOshortOpportunisticReadWrite);
+
template <typename SuccessValue>
auto futurize(const std::error_code& ec, SuccessValue&& successValue) {
using Result = Future<std::decay_t<SuccessValue>>;
@@ -413,7 +416,24 @@ private:
const MutableBufferSequence& buffers,
const transport::BatonHandle& baton = nullptr) {
std::error_code ec;
- auto size = asio::read(stream, buffers, ec);
+ size_t size;
+
+ if (MONGO_FAIL_POINT(transportLayerASIOshortOpportunisticReadWrite) &&
+ _blockingMode == Async) {
+ asio::mutable_buffer localBuffer = buffers;
+
+ if (buffers.size()) {
+ localBuffer = asio::mutable_buffer(buffers.data(), 1);
+ }
+
+ size = asio::read(stream, localBuffer, ec);
+ if (!ec && buffers.size() > 1) {
+ ec = asio::error::would_block;
+ }
+ } else {
+ size = asio::read(stream, buffers, ec);
+ }
+
if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) &&
(_blockingMode == Async)) {
// asio::read is a loop internally, so some of buffers may have been read into already.
@@ -474,7 +494,24 @@ private:
const ConstBufferSequence& buffers,
const transport::BatonHandle& baton = nullptr) {
std::error_code ec;
- auto size = asio::write(stream, buffers, ec);
+ std::size_t size;
+
+ if (MONGO_FAIL_POINT(transportLayerASIOshortOpportunisticReadWrite) &&
+ _blockingMode == Async) {
+ asio::const_buffer localBuffer = buffers;
+
+ if (buffers.size()) {
+ localBuffer = asio::const_buffer(buffers.data(), 1);
+ }
+
+ size = asio::write(stream, localBuffer, ec);
+ if (!ec && buffers.size() > 1) {
+ ec = asio::error::would_block;
+ }
+ } else {
+ size = asio::write(stream, buffers, ec);
+ }
+
if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) &&
(_blockingMode == Async)) {
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index 2e774210c55..fc98263c2ce 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -40,6 +40,7 @@
#include "mongo/stdx/thread.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/transport/transport_mode.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/net/ssl_options.h"
#include "mongo/util/net/ssl_types.h"
@@ -66,6 +67,9 @@ class ServiceEntryPoint;
namespace transport {
+// This fail point simulates reads and writes that always return 1 byte and fail with EAGAIN
+MONGO_FP_FORWARD_DECLARE(transportLayerASIOshortOpportunisticReadWrite);
+
/**
* A TransportLayer implementation based on ASIO networking primitives.
*/
diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp
index a28eac3b322..cebbdb64650 100644
--- a/src/mongo/transport/transport_layer_asio_integration_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp
@@ -30,9 +30,17 @@
#include "mongo/platform/basic.h"
+#include "mongo/client/async_client.h"
#include "mongo/client/connection_string.h"
+#include "mongo/db/client.h"
+#include "mongo/db/service_context.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/session.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/transport/transport_layer_asio.h"
#include "mongo/unittest/integration_test.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "asio.hpp"
@@ -77,5 +85,55 @@ TEST(TransportLayerASIO, HTTPRequestGetsHTTPError) {
#endif
}
+// This test forces reads and writes to occur one byte at a time, verifying SERVER-34506 (the
+// isJustForContinuation optimization works).
+//
+// Because of the file size limit, it's only an effective check on debug builds (where the future
+// implementation checks the length of the future chain).
+TEST(TransportLayerASIO, ShortReadsAndWritesWork) {
+ const auto assertOK = [](executor::RemoteCommandResponse reply) {
+ ASSERT_OK(reply.status);
+ ASSERT(reply.data["ok"]) << reply.data;
+ };
+
+ auto connectionString = unittest::getFixtureConnectionString();
+ auto server = connectionString.getServers().front();
+
+ auto sc = getGlobalServiceContext();
+ auto reactor = sc->getTransportLayer()->getReactor(transport::TransportLayer::kEgress);
+
+ stdx::thread thread([&] { reactor->run(); });
+ const auto threadGuard = MakeGuard([&] {
+ reactor->stop();
+ thread.join();
+ });
+
+ AsyncDBClient::Handle handle =
+ AsyncDBClient::connect(server, transport::kGlobalSSLMode, sc, reactor).get();
+
+ handle->initWireVersion(__FILE__, nullptr).get();
+
+ FailPointEnableBlock fp("transportLayerASIOshortOpportunisticReadWrite");
+
+ const executor::RemoteCommandRequest ecr{
+ server, "admin", BSON("echo" << std::string(1 << 10, 'x')), BSONObj(), nullptr};
+
+ assertOK(handle->runCommandRequest(ecr).get());
+
+ auto client = sc->makeClient(__FILE__);
+ auto opCtx = client->makeOperationContext();
+
+ if (auto baton = sc->getTransportLayer()->makeBaton(opCtx.get())) {
+ auto future = handle->runCommandRequest(ecr, baton);
+ const auto batonGuard = MakeGuard([&] { baton->detach(); });
+
+ while (!future.isReady()) {
+ baton->run(nullptr, boost::none);
+ }
+
+ assertOK(future.get());
+ }
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h
index 2b4d2c9c4f4..7a8fece88ab 100644
--- a/src/mongo/util/future.h
+++ b/src/mongo/util/future.h
@@ -41,6 +41,7 @@
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/utility.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/debug_util.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/scopeguard.h"
@@ -356,6 +357,27 @@ public:
dassert(oldState == SSBState::kWaiting);
+ DEV {
+ // If you hit this limit one of two things has probably happened
+ //
+ // 1. The justForContinuation optimization isn't working.
+ // 2. You may be creating a variable length chain.
+ //
+ // If those statements don't mean anything to you, please ask an editor of this file.
+ // If they don't work here anymore, I'm sorry.
+ const size_t kMaxDepth = 32;
+
+ size_t depth = 0;
+ for (auto ssb = continuation.get(); ssb;
+ ssb = ssb->state.load(std::memory_order_acquire) == SSBState::kWaiting
+ ? ssb->continuation.get()
+ : nullptr) {
+ depth++;
+
+ invariant(depth < kMaxDepth);
+ }
+ }
+
if (callback) {
callback(this);
}