diff options
-rw-r--r-- | src/mongo/db/commands/async_command_execution_test.cpp | 157 |
1 files changed, 84 insertions, 73 deletions
diff --git a/src/mongo/db/commands/async_command_execution_test.cpp b/src/mongo/db/commands/async_command_execution_test.cpp index a5f1e38aeb6..308bac90967 100644 --- a/src/mongo/db/commands/async_command_execution_test.cpp +++ b/src/mongo/db/commands/async_command_execution_test.cpp @@ -50,99 +50,110 @@ using namespace fmt::literals; class AsyncCommandExecutionTest : public unittest::Test, public ScopedGlobalServiceContextForTest { public: - void runTestForCommand(StringData command) { - BSONObj syncResponse, asyncResponse; - - auto client = getServiceContext()->makeClient("Client"); - auto strand = ClientStrand::make(std::move(client)); - - { - auto ctx = makeExecutionContext(strand, command); - strand->run([&] { syncResponse = getSyncResponse(ctx); }); - } - - { - auto ctx = makeExecutionContext(strand, command); - asyncResponse = getAsyncResponse(strand, ctx); - } - - { - auto ctx = makeExecutionContext(strand, command); - killAsyncCommand(strand, ctx); - } - - ASSERT_BSONOBJ_EQ(syncResponse, asyncResponse); - } - -private: - struct ExecutionContext { - ServiceContext::UniqueOperationContext opCtx; - std::shared_ptr<RequestExecutionContext> rec; - std::shared_ptr<CommandInvocation> invocation; - }; + struct TestState; + void runTestForCommand(StringData command); +}; - ExecutionContext makeExecutionContext(ClientStrandPtr strand, StringData commandName) const { +// Sets up and maintains the environment (e.g., `opCtx`) required for running a test. +struct AsyncCommandExecutionTest::TestState { + TestState(ClientStrandPtr clientStrand, StringData cmdName) : strand(std::move(clientStrand)) { auto guard = strand->bind(); - ExecutionContext ctx; - ctx.opCtx = cc().makeOperationContext(); + opCtx = guard->makeOperationContext(); - auto rec = - std::make_shared<RequestExecutionContext>(ctx.opCtx.get(), mockMessage(commandName)); + auto mockMessage = [&] { + OpMsgBuilder builder; + builder.setBody(BSON(cmdName << 1 << "$db" + << "test")); + return builder.finish(); + }; + + // Setup the execution context + rec = std::make_shared<RequestExecutionContext>(opCtx.get(), mockMessage()); rec->setReplyBuilder(makeReplyBuilder(rpc::protocolForMessage(rec->getMessage()))); rec->setRequest(rpc::opMsgRequestFromAnyProtocol(rec->getMessage())); rec->setCommand(CommandHelpers::findCommand(rec->getRequest().getCommandName())); + // Setup the invocation auto cmd = rec->getCommand(); invariant(cmd); - ctx.invocation = cmd->parse(ctx.opCtx.get(), rec->getRequest()); - ctx.rec = std::move(rec); - return ctx; + invocation = cmd->parse(opCtx.get(), rec->getRequest()); } - BSONObj getSyncResponse(ExecutionContext& ctx) const { - ctx.invocation->run(ctx.rec->getOpCtx(), ctx.rec->getReplyBuilder()); - return ctx.rec->getReplyBuilder()->getBodyBuilder().done().getOwned(); + ~TestState() { + // Deleting the `opCtx` will modify the `Client`, so we must bind the strand first. + auto guard = strand->bind(); + opCtx.reset(); } - BSONObj getAsyncResponse(ClientStrandPtr strand, ExecutionContext& ctx) const { - Future<void> future; - { - auto guard = strand->bind(); - FailPointEnableBlock fp("hangBeforeRunningAsyncRequestExecutorTask"); - future = ctx.invocation->runAsync(ctx.rec); - ASSERT(!future.isReady()); - } - - ASSERT(future.getNoThrow().isOK()); - - return [&] { - auto guard = strand->bind(); - return ctx.rec->getReplyBuilder()->getBodyBuilder().done().getOwned(); - }(); + ClientStrandPtr strand; + ServiceContext::UniqueOperationContext opCtx; + std::shared_ptr<RequestExecutionContext> rec; + std::shared_ptr<CommandInvocation> invocation; +}; + +BSONObj getSyncResponse(AsyncCommandExecutionTest::TestState& state) { + state.invocation->run(state.rec->getOpCtx(), state.rec->getReplyBuilder()); + return state.rec->getReplyBuilder()->getBodyBuilder().done().getOwned(); +} + +BSONObj getAsyncResponse(AsyncCommandExecutionTest::TestState& state) { + Future<void> future; + { + auto guard = state.strand->bind(); + FailPointEnableBlock fp("hangBeforeRunningAsyncRequestExecutorTask"); + future = state.invocation->runAsync(state.rec); + ASSERT(!future.isReady()); } - void killAsyncCommand(ClientStrandPtr strand, ExecutionContext& ctx) const { - Future<void> future; - { - auto guard = strand->bind(); - FailPointEnableBlock fp("hangBeforeRunningAsyncRequestExecutorTask"); - future = ctx.invocation->runAsync(ctx.rec); + ASSERT(future.getNoThrow().isOK()); - auto opCtx = ctx.rec->getOpCtx(); - stdx::lock_guard<Client> lk(*opCtx->getClient()); - opCtx->getServiceContext()->killOperation(lk, opCtx, ErrorCodes::Interrupted); - } + return [&] { + auto guard = state.strand->bind(); + return state.rec->getReplyBuilder()->getBodyBuilder().done().getOwned(); + }(); +} + +void killAsyncCommand(AsyncCommandExecutionTest::TestState& state) { + Future<void> future; + { + auto guard = state.strand->bind(); + FailPointEnableBlock fp("hangBeforeRunningAsyncRequestExecutorTask"); + future = state.invocation->runAsync(state.rec); - ASSERT_EQ(future.getNoThrow().code(), ErrorCodes::Interrupted); + auto opCtx = state.rec->getOpCtx(); + stdx::lock_guard<Client> lk(*opCtx->getClient()); + opCtx->getServiceContext()->killOperation(lk, opCtx, ErrorCodes::Interrupted); } - Message mockMessage(StringData commandName) const { - OpMsgBuilder builder; - builder.setBody(BSON(commandName << 1 << "$db" - << "test")); - return builder.finish(); + ASSERT_EQ(future.getNoThrow().code(), ErrorCodes::Interrupted); +} + +void AsyncCommandExecutionTest::runTestForCommand(StringData command) { + BSONObj syncResponse, asyncResponse; + + auto client = getServiceContext()->makeClient("Client"); + auto strand = ClientStrand::make(std::move(client)); + + { + LOGV2(5399301, "Running the command synchronously", "command"_attr = command); + TestState state(strand, command); + strand->run([&] { syncResponse = getSyncResponse(state); }); } -}; + + { + LOGV2(5399302, "Running the command asynchronously", "command"_attr = command); + TestState state(strand, command); + asyncResponse = getAsyncResponse(state); + } + + { + LOGV2(5399303, "Canceling the command running asynchronously", "command"_attr = command); + TestState state(strand, command); + killAsyncCommand(state); + } + + ASSERT_BSONOBJ_EQ(syncResponse, asyncResponse); +} TEST_F(AsyncCommandExecutionTest, BuildInfo) { runTestForCommand("buildinfo"); |