diff options
author | Blake Oler <blake.oler@mongodb.com> | 2018-10-23 17:25:26 -0400 |
---|---|---|
committer | Blake Oler <blake.oler@mongodb.com> | 2018-10-24 17:45:16 -0400 |
commit | c23ef8117d14b50d560202c7d0c4ed0c8cb9948d (patch) | |
tree | a6d1551a469fb5b05275117189c9540b601b479f | |
parent | 3c2cd0967e421ff1cbe6345d8755c3995bebedcc (diff) | |
download | mongo-c23ef8117d14b50d560202c7d0c4ed0c8cb9948d.tar.gz |
SERVER-37735 Ensure the full logical session id is included in commands sent by the ShardingTaskExecutor
-rw-r--r-- | src/mongo/db/s/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_task_executor.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_task_executor_test.cpp | 150 |
3 files changed, 191 insertions, 5 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index d1816c7d94d..4c8efa6c89f 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -32,6 +32,18 @@ env.Library( ], ) +env.CppUnitTest( + target='sharding_task_executor_test', + source=[ + 'sharding_task_executor_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/s/shard_server_test_fixture', + 'sharding_task_executor', + ], +) + env.Library( target='migration_types', source=[ diff --git a/src/mongo/db/s/sharding_task_executor.cpp b/src/mongo/db/s/sharding_task_executor.cpp index be43f7080fc..b86eb4da271 100644 --- a/src/mongo/db/s/sharding_task_executor.cpp +++ b/src/mongo/db/s/sharding_task_executor.cpp @@ -119,18 +119,41 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCom return _executor->scheduleRemoteCommand(request, cb); } - boost::optional<RemoteCommandRequest> newRequest; + boost::optional<RemoteCommandRequest> requestWithFixedLsid = [&] { + boost::optional<RemoteCommandRequest> newRequest; + + if (!request.opCtx->getLogicalSessionId()) { + return newRequest; + } + + if (request.cmdObj.hasField("lsid")) { + auto cmdObjLsid = + LogicalSessionFromClient::parse("lsid"_sd, request.cmdObj["lsid"].Obj()); + + if (cmdObjLsid.getUid()) { + invariant(*cmdObjLsid.getUid() == request.opCtx->getLogicalSessionId()->getUid()); + return newRequest; + } + + newRequest.emplace(request); + newRequest->cmdObj = newRequest->cmdObj.removeField("lsid"); + } + + if (!newRequest) { + newRequest.emplace(request); + } - if (request.opCtx->getLogicalSessionId() && !request.cmdObj.hasField("lsid")) { - newRequest.emplace(request); BSONObjBuilder bob(std::move(newRequest->cmdObj)); { BSONObjBuilder subbob(bob.subobjStart("lsid")); request.opCtx->getLogicalSessionId()->serialize(&subbob); + subbob.done(); } newRequest->cmdObj = bob.obj(); - } + + return newRequest; + }(); std::shared_ptr<OperationTimeTracker> timeTracker = OperationTimeTracker::get(request.opCtx); @@ -190,7 +213,8 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCom } }; - return _executor->scheduleRemoteCommand(newRequest ? *newRequest : request, shardingCb); + return _executor->scheduleRemoteCommand(requestWithFixedLsid ? *requestWithFixedLsid : request, + shardingCb); } void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) { diff --git a/src/mongo/db/s/sharding_task_executor_test.cpp b/src/mongo/db/s/sharding_task_executor_test.cpp new file mode 100644 index 00000000000..94c3e5194dd --- /dev/null +++ b/src/mongo/db/s/sharding_task_executor_test.cpp @@ -0,0 +1,150 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharding_task_executor.h" + +#include "mongo/executor/network_interface.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/task_executor_test_common.h" +#include "mongo/executor/task_executor_test_fixture.h" +#include "mongo/executor/thread_pool_mock.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/s/shard_server_test_fixture.h" + +namespace mongo { +namespace { + +using executor::NetworkInterfaceMock; +using executor::RemoteCommandRequest; +using executor::TaskExecutor; + +class ShardingTaskExecutorTest : public ShardServerTestFixture { +private: + void setUp() final; + void tearDown() final; + +protected: + LogicalSessionId constructFullLsid(); + void assertOpCtxLsidEqualsCmdObjLsid(const BSONObj& cmdObj); + + executor::NetworkInterfaceMock* _network{nullptr}; + + std::unique_ptr<executor::ThreadPoolTaskExecutor> _threadPool; +}; + +void ShardingTaskExecutorTest::setUp() { + ShardServerTestFixture::setUp(); + + auto netForFixedTaskExecutor = std::make_unique<executor::NetworkInterfaceMock>(); + _network = netForFixedTaskExecutor.get(); + + _threadPool = makeThreadPoolTestExecutor(std::move(netForFixedTaskExecutor)); +} + +void ShardingTaskExecutorTest::tearDown() { + ShardServerTestFixture::tearDown(); +} + +LogicalSessionId ShardingTaskExecutorTest::constructFullLsid() { + LogicalSessionId lsid; + lsid.setId(UUID::gen()); + lsid.setUid(SHA256Block{}); + return lsid; +} + +void ShardingTaskExecutorTest::assertOpCtxLsidEqualsCmdObjLsid(const BSONObj& cmdObj) { + auto opCtxLsid = operationContext()->getLogicalSessionId(); + + ASSERT(opCtxLsid); + + auto cmdObjLsid = LogicalSessionFromClient::parse("lsid"_sd, cmdObj["lsid"].Obj()); + + ASSERT_EQ(opCtxLsid->getId(), cmdObjLsid.getId()); + ASSERT_EQ(opCtxLsid->getUid(), *cmdObjLsid.getUid()); +} + +TEST_F(ShardingTaskExecutorTest, MissingLsidAddsLsidInCommand) { + operationContext()->setLogicalSessionId(constructFullLsid()); + ASSERT(operationContext()->getLogicalSessionId()); + + executor::ShardingTaskExecutor executor(std::move(_threadPool)); + executor.startup(); + _network->enterNetwork(); + + const RemoteCommandRequest request(HostAndPort("localhost", 27017), + "mydb", + BSON("whatsUp" + << "doc"), + operationContext()); + + TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand( + request, [=](const executor::TaskExecutor::RemoteCommandCallbackArgs) -> void {})); + + ASSERT(_network->hasReadyRequests()); + NetworkInterfaceMock::NetworkOperationIterator noi = _network->getNextReadyRequest(); + auto cmdObj = noi->getRequest().cmdObj; + + assertOpCtxLsidEqualsCmdObjLsid(cmdObj); +} + +TEST_F(ShardingTaskExecutorTest, IncompleteLsidAddsLsidInCommand) { + operationContext()->setLogicalSessionId(constructFullLsid()); + ASSERT(operationContext()->getLogicalSessionId()); + + executor::ShardingTaskExecutor executor(std::move(_threadPool)); + executor.startup(); + _network->enterNetwork(); + + BSONObjBuilder bob; + bob.append("whatsUp", "doc"); + { + BSONObjBuilder subbob(bob.subobjStart("lsid")); + subbob << "id" << operationContext()->getLogicalSessionId()->getId(); + subbob.done(); + } + + const RemoteCommandRequest request( + HostAndPort("localhost", 27017), "mydb", bob.obj(), operationContext()); + + TaskExecutor::CallbackHandle cbHandle = unittest::assertGet(executor.scheduleRemoteCommand( + request, [=](const executor::TaskExecutor::RemoteCommandCallbackArgs) -> void {})); + + ASSERT(_network->hasReadyRequests()); + NetworkInterfaceMock::NetworkOperationIterator noi = _network->getNextReadyRequest(); + auto cmdObj = noi->getRequest().cmdObj; + + assertOpCtxLsidEqualsCmdObjLsid(cmdObj); +} + +} // namespace +} // namespace mongo |