diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-01-22 11:32:23 -0500 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-03-08 17:49:26 -0500 |
commit | 66b82f4eb006b71d0c8a63ca982be8e943280941 (patch) | |
tree | 3a8b852fd8f9ab11869e2c7b88648125b81d5d7f /src/mongo | |
parent | 5bf81cc55555124280d88a54396df2a64bcef8ce (diff) | |
download | mongo-66b82f4eb006b71d0c8a63ca982be8e943280941.tar.gz |
SERVER-33017 Add metadata hook to update lastCommittedOpTime on mongos per shard
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/s/client/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/client/shard_remote_test.cpp | 168 | ||||
-rw-r--r-- | src/mongo/s/committed_optime_metadata_hook.cpp | 76 | ||||
-rw-r--r-- | src/mongo/s/committed_optime_metadata_hook.h | 58 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/sharding_router_test_fixture.cpp | 17 |
8 files changed, 304 insertions, 37 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index a1b5347d5b3..cf8c406bd1c 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -424,6 +424,7 @@ if not hygienic: 's/client/sharding_connection_hook', 's/commands/cluster_commands', 's/commands/shared_cluster_commands', + 's/committed_optime_metadata_hook', 's/coreshard', 's/is_mongos', 's/sharding_egress_metadata_hook_for_mongos', diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 49ccc44ba4d..7566bec565c 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -175,6 +175,7 @@ env.Library( '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/transport/transport_layer_mock', '$BUILD_DIR/mongo/util/clock_source_mock', + 'committed_optime_metadata_hook', 'sharding_egress_metadata_hook_for_mongos', 'sharding_task_executor', 'sharding_test_fixture_common', @@ -371,6 +372,17 @@ env.Library( ) env.Library( + target='committed_optime_metadata_hook', + source=[ + 'committed_optime_metadata_hook.cpp' + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/rpc/metadata', + 'coreshard', + ] +) + +env.Library( target='is_mongos', source=[ 'is_mongos.cpp', diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript index 325ccc23842..69a9389e784 100644 --- a/src/mongo/s/client/SConscript +++ b/src/mongo/s/client/SConscript @@ -26,6 +26,7 @@ env.CppUnitTest( ], LIBDEPS=[ '$BUILD_DIR/mongo/s/sharding_router_test_fixture', + '$BUILD_DIR/mongo/s/query/async_results_merger', 'sharding_client', ], ) diff --git a/src/mongo/s/client/shard_remote_test.cpp b/src/mongo/s/client/shard_remote_test.cpp index 078c3e5eda1..fe53c03dcc4 100644 --- a/src/mongo/s/client/shard_remote_test.cpp +++ b/src/mongo/s/client/shard_remote_test.cpp @@ -31,56 +31,73 @@ #include "mongo/client/connection_string.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/remote_command_targeter_factory_mock.h" +#include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/logical_time.h" +#include "mongo/db/query/cursor_response.h" +#include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_factory.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/client/shard_remote.h" +#include "mongo/s/query/establish_cursors.h" #include "mongo/s/shard_id.h" +#include "mongo/s/sharding_router_test_fixture.h" #include "mongo/unittest/unittest.h" namespace mongo { namespace { -const ShardId dummyShardId("dummyShard"); -const ConnectionString dummyShardCS("rs/dummy1:1234,dummy2:2345,dummy3:3456", - ConnectionString::SET); +const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345); -class ShardRemoteTest : public unittest::Test { -public: - ShardFactory* getShardFactory() { - return _shardFactory.get(); - } +const std::vector<ShardId> kTestShardIds = { + ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")}; +const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345), + HostAndPort("FakeShard2Host", 12345), + HostAndPort("FakeShard3Host", 12345)}; -private: +class ShardRemoteTest : public ShardingTestFixture { +protected: void setUp() { - auto targeterFactory = stdx::make_unique<RemoteCommandTargeterFactoryMock>(); - auto targeterFactoryPtr = targeterFactory.get(); - - ShardFactory::BuilderCallable setBuilder = - [targeterFactoryPtr](const ShardId& shardId, const ConnectionString& connStr) { - return stdx::make_unique<ShardRemote>( - shardId, connStr, targeterFactoryPtr->create(connStr)); - }; - - ShardFactory::BuilderCallable masterBuilder = - [targeterFactoryPtr](const ShardId& shardId, const ConnectionString& connStr) { - return stdx::make_unique<ShardRemote>( - shardId, connStr, targeterFactoryPtr->create(connStr)); - }; - - ShardFactory::BuildersMap buildersMap{ - {ConnectionString::SET, std::move(setBuilder)}, - {ConnectionString::MASTER, std::move(masterBuilder)}, - }; - - _shardFactory = - stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory)); + ShardingTestFixture::setUp(); + + configTargeter()->setFindHostReturnValue(kTestConfigShardHost); + + std::vector<ShardType> shards; + + for (size_t i = 0; i < kTestShardIds.size(); i++) { + ShardType shardType; + shardType.setName(kTestShardIds[i].toString()); + shardType.setHost(kTestShardHosts[i].toString()); + shards.push_back(shardType); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i])); + targeter->setFindHostReturnValue(kTestShardHosts[i]); + + targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]), + std::move(targeter)); + } + + setupShards(shards); } - std::unique_ptr<ShardFactory> _shardFactory; + void runDummyCommandOnShard(ShardId shardId) { + auto shard = shardRegistry()->getShardNoReload(shardId); + uassertStatusOK(shard->runCommand(operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "unusedDb", + BSON("unused" + << "cmd"), + Shard::RetryPolicy::kNoRetry)); + } }; +BSONObj makeLastCommittedOpTimeMetadata(LogicalTime time) { + return BSON("lastCommittedOpTime" << time.asTimestamp()); +} + TEST_F(ShardRemoteTest, GetAndSetLatestLastCommittedOpTime) { - auto shard = getShardFactory()->createShard(dummyShardId, dummyShardCS); + auto shard = shardRegistry()->getShardNoReload(kTestShardIds[0]); // Shards can store last committed opTimes. LogicalTime time(Timestamp(10, 2)); @@ -98,5 +115,90 @@ TEST_F(ShardRemoteTest, GetAndSetLatestLastCommittedOpTime) { ASSERT_EQ(laterTime, shard->getLastCommittedOpTime()); } +TEST_F(ShardRemoteTest, NetworkReplyWithLastCommittedOpTime) { + // Send a request to one shard. + auto targetedShard = kTestShardIds[0]; + auto future = launchAsync([&] { runDummyCommandOnShard(targetedShard); }); + + // Mock a find response with a returned lastCommittedOpTime. + LogicalTime expectedTime(Timestamp(100, 2)); + onFindWithMetadataCommand([&](const executor::RemoteCommandRequest& request) { + auto result = std::vector<BSONObj>{BSON("_id" << 1)}; + auto metadata = makeLastCommittedOpTimeMetadata(expectedTime); + return std::make_tuple(result, metadata); + }); + + future.timed_get(kFutureTimeout); + + // Verify the targeted shard has updated its lastCommittedOpTime. + ASSERT_EQ(expectedTime, + shardRegistry()->getShardNoReload(targetedShard)->getLastCommittedOpTime()); + + // Verify shards that were not targeted were not affected. + for (auto shardId : kTestShardIds) { + if (shardId != targetedShard) { + ASSERT_EQ(LogicalTime::kUninitialized, + shardRegistry()->getShardNoReload(shardId)->getLastCommittedOpTime()); + } + } +} + +TEST_F(ShardRemoteTest, NetworkReplyWithoutLastCommittedOpTime) { + // Send a request to one shard. + auto targetedShard = kTestShardIds[0]; + auto future = launchAsync([&] { runDummyCommandOnShard(targetedShard); }); + + // Mock a find response without a returned lastCommittedOpTime. + onFindWithMetadataCommand([&](const executor::RemoteCommandRequest& request) { + auto result = std::vector<BSONObj>{BSON("_id" << 1)}; + auto metadata = BSONObj(); + return std::make_tuple(result, metadata); + }); + + future.timed_get(kFutureTimeout); + + // Verify the targeted shard has not updated its lastCommittedOpTime. + ASSERT_EQ(LogicalTime::kUninitialized, + shardRegistry()->getShardNoReload(targetedShard)->getLastCommittedOpTime()); +} + +TEST_F(ShardRemoteTest, ScatterGatherRepliesWithLastCommittedOpTime) { + // Send requests to several shards. + auto nss = NamespaceString("test.foo"); + auto cmdObj = BSON("find" << nss.coll()); + std::vector<std::pair<ShardId, BSONObj>> remotes{ + {kTestShardIds[0], cmdObj}, {kTestShardIds[1], cmdObj}, {kTestShardIds[2], cmdObj}}; + + auto future = launchAsync([&] { + establishCursors(operationContext(), + executor(), + nss, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + remotes, + false); // allowPartialResults + }); + + // All remotes respond with a lastCommittedOpTime. + LogicalTime expectedTime(Timestamp(50, 1)); + for (auto remote : remotes) { + onCommandWithMetadata([&](const executor::RemoteCommandRequest& request) { + std::vector<BSONObj> batch = {BSON("_id" << 1)}; + CursorResponse cursorResponse(nss, CursorId(123), batch); + auto result = cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); + + return executor::RemoteCommandResponse( + result, makeLastCommittedOpTimeMetadata(expectedTime), Milliseconds(1)); + }); + } + + future.timed_get(kFutureTimeout); + + // Verify all shards updated their lastCommittedOpTime. + for (auto shardId : kTestShardIds) { + ASSERT_EQ(expectedTime, + shardRegistry()->getShardNoReload(shardId)->getLastCommittedOpTime()); + } +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/committed_optime_metadata_hook.cpp b/src/mongo/s/committed_optime_metadata_hook.cpp new file mode 100644 index 00000000000..743598d4ec9 --- /dev/null +++ b/src/mongo/s/committed_optime_metadata_hook.cpp @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/committed_optime_metadata_hook.h" + +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" + +namespace mongo { + +namespace rpc { + +namespace { +const char kLastCommittedOpTimeFieldName[] = "lastCommittedOpTime"; +} + +CommittedOpTimeMetadataHook::CommittedOpTimeMetadataHook(ServiceContext* service) + : _service(service) {} + +Status CommittedOpTimeMetadataHook::writeRequestMetadata(OperationContext* opCtx, + BSONObjBuilder* metadataBob) { + return Status::OK(); +} + +Status CommittedOpTimeMetadataHook::readReplyMetadata(OperationContext* opCtx, + StringData replySource, + const BSONObj& metadataObj) { + auto lastCommittedOpTimeField = metadataObj[kLastCommittedOpTimeFieldName]; + if (lastCommittedOpTimeField.eoo()) { + return Status::OK(); + } + + invariant(lastCommittedOpTimeField.type() == BSONType::bsonTimestamp); + + // replySource is the HostAndPort of a single server, except when this hook is triggered + // through DBClientReplicaSet, when it will be a replica set connection string. The + // shardRegistry stores connection strings and hosts in its lookup table, in addition to shard + // ids, so replySource can be correctly passed on to ShardRegistry::getShardNoReload. + auto shard = Grid::get(_service)->shardRegistry()->getShardNoReload(replySource.toString()); + if (shard) { + shard->updateLastCommittedOpTime(LogicalTime(lastCommittedOpTimeField.timestamp())); + } + + return Status::OK(); +} + +} // namespace rpc +} // namespace mongo diff --git a/src/mongo/s/committed_optime_metadata_hook.h b/src/mongo/s/committed_optime_metadata_hook.h new file mode 100644 index 00000000000..22f53ec8ccd --- /dev/null +++ b/src/mongo/s/committed_optime_metadata_hook.h @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/rpc/metadata/metadata_hook.h" + +namespace mongo { + +class BSONObj; +class BSONObjBuilder; +class OperationContext; +class ServiceContext; +class Status; + +namespace rpc { + +class CommittedOpTimeMetadataHook : public EgressMetadataHook { +public: + explicit CommittedOpTimeMetadataHook(ServiceContext* service); + + Status writeRequestMetadata(OperationContext* opCtx, BSONObjBuilder* metadataBob) override; + + Status readReplyMetadata(OperationContext* opCtx, + StringData replySource, + const BSONObj& metadataObj) override; + +private: + ServiceContext* _service; +}; + +} // namespace rpc +} // namespace mongo diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index c03b7a920df..449f51389db 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -75,6 +75,7 @@ #include "mongo/s/client/shard_remote.h" #include "mongo/s/client/sharding_connection_hook.h" #include "mongo/s/commands/kill_sessions_remote.h" +#include "mongo/s/committed_optime_metadata_hook.h" #include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/grid.h" #include "mongo/s/is_mongos.h" @@ -297,6 +298,8 @@ Status initializeSharding(OperationContext* opCtx) { auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); hookList->addHook( stdx::make_unique<rpc::LogicalTimeMetadataHook>(opCtx->getServiceContext())); + hookList->addHook( + stdx::make_unique<rpc::CommittedOpTimeMetadataHook>(opCtx->getServiceContext())); hookList->addHook(stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>( opCtx->getServiceContext())); return hookList; @@ -353,6 +356,10 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { unshardedHookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext)); unshardedHookList->addHook( stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(serviceContext)); + // TODO SERVER-33053: readReplyMetadata is not called on hooks added through + // ShardingConnectionHook with _shardedConnections=false, so this hook will not run for + // connections using globalConnPool. + unshardedHookList->addHook(stdx::make_unique<rpc::CommittedOpTimeMetadataHook>(serviceContext)); // Add sharding hooks to both connection pools - ShardingConnectionHook includes auth hooks globalConnPool.addHook(new ShardingConnectionHook(false, std::move(unshardedHookList))); @@ -361,6 +368,7 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { shardedHookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext)); shardedHookList->addHook( stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(serviceContext)); + shardedHookList->addHook(stdx::make_unique<rpc::CommittedOpTimeMetadataHook>(serviceContext)); shardConnectionPool.addHook(new ShardingConnectionHook(true, std::move(shardedHookList))); diff --git a/src/mongo/s/sharding_router_test_fixture.cpp b/src/mongo/s/sharding_router_test_fixture.cpp index 6b95164eaf7..cee379506c8 100644 --- a/src/mongo/s/sharding_router_test_fixture.cpp +++ b/src/mongo/s/sharding_router_test_fixture.cpp @@ -38,6 +38,7 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/query/collation/collator_factory_mock.h" @@ -47,6 +48,7 @@ #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/tracking_metadata.h" #include "mongo/s/balancer_configuration.h" @@ -59,6 +61,7 @@ #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/shard_remote.h" +#include "mongo/s/committed_optime_metadata_hook.h" #include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" @@ -116,17 +119,23 @@ void ShardingTestFixture::setUp() { _opCtx = _client->makeOperationContext(); // Set up executor pool used for most operations. + auto makeMetadataHookList = [&] { + auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); + hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(service)); + hookList->addHook(stdx::make_unique<rpc::CommittedOpTimeMetadataHook>(service)); + hookList->addHook(stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(service)); + return hookList; + }; + auto fixedNet = stdx::make_unique<executor::NetworkInterfaceMock>(); - fixedNet->setEgressMetadataHook( - stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(service)); + fixedNet->setEgressMetadataHook(makeMetadataHookList()); _mockNetwork = fixedNet.get(); auto fixedExec = makeShardingTestExecutor(std::move(fixedNet)); _networkTestEnv = stdx::make_unique<NetworkTestEnv>(fixedExec.get(), _mockNetwork); _executor = fixedExec.get(); auto netForPool = stdx::make_unique<executor::NetworkInterfaceMock>(); - netForPool->setEgressMetadataHook( - stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(service)); + netForPool->setEgressMetadataHook(makeMetadataHookList()); auto _mockNetworkForPool = netForPool.get(); auto execForPool = makeShardingTestExecutor(std::move(netForPool)); _networkTestEnvForPool = |