summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2018-01-22 11:32:23 -0500
committerJack Mulrow <jack.mulrow@mongodb.com>2018-03-08 17:49:26 -0500
commit66b82f4eb006b71d0c8a63ca982be8e943280941 (patch)
tree3a8b852fd8f9ab11869e2c7b88648125b81d5d7f /src/mongo
parent5bf81cc55555124280d88a54396df2a64bcef8ce (diff)
downloadmongo-66b82f4eb006b71d0c8a63ca982be8e943280941.tar.gz
SERVER-33017 Add metadata hook to update lastCommittedOpTime on mongos per shard
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/s/SConscript12
-rw-r--r--src/mongo/s/client/SConscript1
-rw-r--r--src/mongo/s/client/shard_remote_test.cpp168
-rw-r--r--src/mongo/s/committed_optime_metadata_hook.cpp76
-rw-r--r--src/mongo/s/committed_optime_metadata_hook.h58
-rw-r--r--src/mongo/s/server.cpp8
-rw-r--r--src/mongo/s/sharding_router_test_fixture.cpp17
8 files changed, 304 insertions, 37 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index a1b5347d5b3..cf8c406bd1c 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -424,6 +424,7 @@ if not hygienic:
's/client/sharding_connection_hook',
's/commands/cluster_commands',
's/commands/shared_cluster_commands',
+ 's/committed_optime_metadata_hook',
's/coreshard',
's/is_mongos',
's/sharding_egress_metadata_hook_for_mongos',
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 49ccc44ba4d..7566bec565c 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -175,6 +175,7 @@ env.Library(
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/transport/transport_layer_mock',
'$BUILD_DIR/mongo/util/clock_source_mock',
+ 'committed_optime_metadata_hook',
'sharding_egress_metadata_hook_for_mongos',
'sharding_task_executor',
'sharding_test_fixture_common',
@@ -371,6 +372,17 @@ env.Library(
)
env.Library(
+ target='committed_optime_metadata_hook',
+ source=[
+ 'committed_optime_metadata_hook.cpp'
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/rpc/metadata',
+ 'coreshard',
+ ]
+)
+
+env.Library(
target='is_mongos',
source=[
'is_mongos.cpp',
diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript
index 325ccc23842..69a9389e784 100644
--- a/src/mongo/s/client/SConscript
+++ b/src/mongo/s/client/SConscript
@@ -26,6 +26,7 @@ env.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/s/sharding_router_test_fixture',
+ '$BUILD_DIR/mongo/s/query/async_results_merger',
'sharding_client',
],
)
diff --git a/src/mongo/s/client/shard_remote_test.cpp b/src/mongo/s/client/shard_remote_test.cpp
index 078c3e5eda1..fe53c03dcc4 100644
--- a/src/mongo/s/client/shard_remote_test.cpp
+++ b/src/mongo/s/client/shard_remote_test.cpp
@@ -31,56 +31,73 @@
#include "mongo/client/connection_string.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/client/remote_command_targeter_factory_mock.h"
+#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/logical_time.h"
+#include "mongo/db/query/cursor_response.h"
+#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_factory.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/shard_remote.h"
+#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/shard_id.h"
+#include "mongo/s/sharding_router_test_fixture.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
namespace {
-const ShardId dummyShardId("dummyShard");
-const ConnectionString dummyShardCS("rs/dummy1:1234,dummy2:2345,dummy3:3456",
- ConnectionString::SET);
+const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
-class ShardRemoteTest : public unittest::Test {
-public:
- ShardFactory* getShardFactory() {
- return _shardFactory.get();
- }
+const std::vector<ShardId> kTestShardIds = {
+ ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")};
+const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345),
+ HostAndPort("FakeShard2Host", 12345),
+ HostAndPort("FakeShard3Host", 12345)};
-private:
+class ShardRemoteTest : public ShardingTestFixture {
+protected:
void setUp() {
- auto targeterFactory = stdx::make_unique<RemoteCommandTargeterFactoryMock>();
- auto targeterFactoryPtr = targeterFactory.get();
-
- ShardFactory::BuilderCallable setBuilder =
- [targeterFactoryPtr](const ShardId& shardId, const ConnectionString& connStr) {
- return stdx::make_unique<ShardRemote>(
- shardId, connStr, targeterFactoryPtr->create(connStr));
- };
-
- ShardFactory::BuilderCallable masterBuilder =
- [targeterFactoryPtr](const ShardId& shardId, const ConnectionString& connStr) {
- return stdx::make_unique<ShardRemote>(
- shardId, connStr, targeterFactoryPtr->create(connStr));
- };
-
- ShardFactory::BuildersMap buildersMap{
- {ConnectionString::SET, std::move(setBuilder)},
- {ConnectionString::MASTER, std::move(masterBuilder)},
- };
-
- _shardFactory =
- stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory));
+ ShardingTestFixture::setUp();
+
+ configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
+
+ std::vector<ShardType> shards;
+
+ for (size_t i = 0; i < kTestShardIds.size(); i++) {
+ ShardType shardType;
+ shardType.setName(kTestShardIds[i].toString());
+ shardType.setHost(kTestShardHosts[i].toString());
+ shards.push_back(shardType);
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i]));
+ targeter->setFindHostReturnValue(kTestShardHosts[i]);
+
+ targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]),
+ std::move(targeter));
+ }
+
+ setupShards(shards);
}
- std::unique_ptr<ShardFactory> _shardFactory;
+ void runDummyCommandOnShard(ShardId shardId) {
+ auto shard = shardRegistry()->getShardNoReload(shardId);
+ uassertStatusOK(shard->runCommand(operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "unusedDb",
+ BSON("unused"
+ << "cmd"),
+ Shard::RetryPolicy::kNoRetry));
+ }
};
+BSONObj makeLastCommittedOpTimeMetadata(LogicalTime time) {
+ return BSON("lastCommittedOpTime" << time.asTimestamp());
+}
+
TEST_F(ShardRemoteTest, GetAndSetLatestLastCommittedOpTime) {
- auto shard = getShardFactory()->createShard(dummyShardId, dummyShardCS);
+ auto shard = shardRegistry()->getShardNoReload(kTestShardIds[0]);
// Shards can store last committed opTimes.
LogicalTime time(Timestamp(10, 2));
@@ -98,5 +115,90 @@ TEST_F(ShardRemoteTest, GetAndSetLatestLastCommittedOpTime) {
ASSERT_EQ(laterTime, shard->getLastCommittedOpTime());
}
+TEST_F(ShardRemoteTest, NetworkReplyWithLastCommittedOpTime) {
+ // Send a request to one shard.
+ auto targetedShard = kTestShardIds[0];
+ auto future = launchAsync([&] { runDummyCommandOnShard(targetedShard); });
+
+ // Mock a find response with a returned lastCommittedOpTime.
+ LogicalTime expectedTime(Timestamp(100, 2));
+ onFindWithMetadataCommand([&](const executor::RemoteCommandRequest& request) {
+ auto result = std::vector<BSONObj>{BSON("_id" << 1)};
+ auto metadata = makeLastCommittedOpTimeMetadata(expectedTime);
+ return std::make_tuple(result, metadata);
+ });
+
+ future.timed_get(kFutureTimeout);
+
+ // Verify the targeted shard has updated its lastCommittedOpTime.
+ ASSERT_EQ(expectedTime,
+ shardRegistry()->getShardNoReload(targetedShard)->getLastCommittedOpTime());
+
+ // Verify shards that were not targeted were not affected.
+ for (auto shardId : kTestShardIds) {
+ if (shardId != targetedShard) {
+ ASSERT_EQ(LogicalTime::kUninitialized,
+ shardRegistry()->getShardNoReload(shardId)->getLastCommittedOpTime());
+ }
+ }
+}
+
+TEST_F(ShardRemoteTest, NetworkReplyWithoutLastCommittedOpTime) {
+ // Send a request to one shard.
+ auto targetedShard = kTestShardIds[0];
+ auto future = launchAsync([&] { runDummyCommandOnShard(targetedShard); });
+
+ // Mock a find response without a returned lastCommittedOpTime.
+ onFindWithMetadataCommand([&](const executor::RemoteCommandRequest& request) {
+ auto result = std::vector<BSONObj>{BSON("_id" << 1)};
+ auto metadata = BSONObj();
+ return std::make_tuple(result, metadata);
+ });
+
+ future.timed_get(kFutureTimeout);
+
+ // Verify the targeted shard has not updated its lastCommittedOpTime.
+ ASSERT_EQ(LogicalTime::kUninitialized,
+ shardRegistry()->getShardNoReload(targetedShard)->getLastCommittedOpTime());
+}
+
+TEST_F(ShardRemoteTest, ScatterGatherRepliesWithLastCommittedOpTime) {
+ // Send requests to several shards.
+ auto nss = NamespaceString("test.foo");
+ auto cmdObj = BSON("find" << nss.coll());
+ std::vector<std::pair<ShardId, BSONObj>> remotes{
+ {kTestShardIds[0], cmdObj}, {kTestShardIds[1], cmdObj}, {kTestShardIds[2], cmdObj}};
+
+ auto future = launchAsync([&] {
+ establishCursors(operationContext(),
+ executor(),
+ nss,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ remotes,
+ false); // allowPartialResults
+ });
+
+ // All remotes respond with a lastCommittedOpTime.
+ LogicalTime expectedTime(Timestamp(50, 1));
+ for (auto remote : remotes) {
+ onCommandWithMetadata([&](const executor::RemoteCommandRequest& request) {
+ std::vector<BSONObj> batch = {BSON("_id" << 1)};
+ CursorResponse cursorResponse(nss, CursorId(123), batch);
+ auto result = cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
+
+ return executor::RemoteCommandResponse(
+ result, makeLastCommittedOpTimeMetadata(expectedTime), Milliseconds(1));
+ });
+ }
+
+ future.timed_get(kFutureTimeout);
+
+ // Verify all shards updated their lastCommittedOpTime.
+ for (auto shardId : kTestShardIds) {
+ ASSERT_EQ(expectedTime,
+ shardRegistry()->getShardNoReload(shardId)->getLastCommittedOpTime());
+ }
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/committed_optime_metadata_hook.cpp b/src/mongo/s/committed_optime_metadata_hook.cpp
new file mode 100644
index 00000000000..743598d4ec9
--- /dev/null
+++ b/src/mongo/s/committed_optime_metadata_hook.cpp
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/committed_optime_metadata_hook.h"
+
+#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+
+namespace mongo {
+
+namespace rpc {
+
+namespace {
+const char kLastCommittedOpTimeFieldName[] = "lastCommittedOpTime";
+}
+
+CommittedOpTimeMetadataHook::CommittedOpTimeMetadataHook(ServiceContext* service)
+ : _service(service) {}
+
+Status CommittedOpTimeMetadataHook::writeRequestMetadata(OperationContext* opCtx,
+ BSONObjBuilder* metadataBob) {
+ return Status::OK();
+}
+
+Status CommittedOpTimeMetadataHook::readReplyMetadata(OperationContext* opCtx,
+ StringData replySource,
+ const BSONObj& metadataObj) {
+ auto lastCommittedOpTimeField = metadataObj[kLastCommittedOpTimeFieldName];
+ if (lastCommittedOpTimeField.eoo()) {
+ return Status::OK();
+ }
+
+ invariant(lastCommittedOpTimeField.type() == BSONType::bsonTimestamp);
+
+ // replySource is the HostAndPort of a single server, except when this hook is triggered
+ // through DBClientReplicaSet, when it will be a replica set connection string. The
+ // shardRegistry stores connection strings and hosts in its lookup table, in addition to shard
+ // ids, so replySource can be correctly passed on to ShardRegistry::getShardNoReload.
+ auto shard = Grid::get(_service)->shardRegistry()->getShardNoReload(replySource.toString());
+ if (shard) {
+ shard->updateLastCommittedOpTime(LogicalTime(lastCommittedOpTimeField.timestamp()));
+ }
+
+ return Status::OK();
+}
+
+} // namespace rpc
+} // namespace mongo
diff --git a/src/mongo/s/committed_optime_metadata_hook.h b/src/mongo/s/committed_optime_metadata_hook.h
new file mode 100644
index 00000000000..22f53ec8ccd
--- /dev/null
+++ b/src/mongo/s/committed_optime_metadata_hook.h
@@ -0,0 +1,58 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/rpc/metadata/metadata_hook.h"
+
+namespace mongo {
+
+class BSONObj;
+class BSONObjBuilder;
+class OperationContext;
+class ServiceContext;
+class Status;
+
+namespace rpc {
+
+class CommittedOpTimeMetadataHook : public EgressMetadataHook {
+public:
+ explicit CommittedOpTimeMetadataHook(ServiceContext* service);
+
+ Status writeRequestMetadata(OperationContext* opCtx, BSONObjBuilder* metadataBob) override;
+
+ Status readReplyMetadata(OperationContext* opCtx,
+ StringData replySource,
+ const BSONObj& metadataObj) override;
+
+private:
+ ServiceContext* _service;
+};
+
+} // namespace rpc
+} // namespace mongo
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index c03b7a920df..449f51389db 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -75,6 +75,7 @@
#include "mongo/s/client/shard_remote.h"
#include "mongo/s/client/sharding_connection_hook.h"
#include "mongo/s/commands/kill_sessions_remote.h"
+#include "mongo/s/committed_optime_metadata_hook.h"
#include "mongo/s/config_server_catalog_cache_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/is_mongos.h"
@@ -297,6 +298,8 @@ Status initializeSharding(OperationContext* opCtx) {
auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>();
hookList->addHook(
stdx::make_unique<rpc::LogicalTimeMetadataHook>(opCtx->getServiceContext()));
+ hookList->addHook(
+ stdx::make_unique<rpc::CommittedOpTimeMetadataHook>(opCtx->getServiceContext()));
hookList->addHook(stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(
opCtx->getServiceContext()));
return hookList;
@@ -353,6 +356,10 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
unshardedHookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext));
unshardedHookList->addHook(
stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(serviceContext));
+ // TODO SERVER-33053: readReplyMetadata is not called on hooks added through
+ // ShardingConnectionHook with _shardedConnections=false, so this hook will not run for
+ // connections using globalConnPool.
+ unshardedHookList->addHook(stdx::make_unique<rpc::CommittedOpTimeMetadataHook>(serviceContext));
// Add sharding hooks to both connection pools - ShardingConnectionHook includes auth hooks
globalConnPool.addHook(new ShardingConnectionHook(false, std::move(unshardedHookList)));
@@ -361,6 +368,7 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
shardedHookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext));
shardedHookList->addHook(
stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(serviceContext));
+ shardedHookList->addHook(stdx::make_unique<rpc::CommittedOpTimeMetadataHook>(serviceContext));
shardConnectionPool.addHook(new ShardingConnectionHook(true, std::move(shardedHookList)));
diff --git a/src/mongo/s/sharding_router_test_fixture.cpp b/src/mongo/s/sharding_router_test_fixture.cpp
index 6b95164eaf7..cee379506c8 100644
--- a/src/mongo/s/sharding_router_test_fixture.cpp
+++ b/src/mongo/s/sharding_router_test_fixture.cpp
@@ -38,6 +38,7 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/logical_time_metadata_hook.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/query/collation/collator_factory_mock.h"
@@ -47,6 +48,7 @@
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/tracking_metadata.h"
#include "mongo/s/balancer_configuration.h"
@@ -59,6 +61,7 @@
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/shard_remote.h"
+#include "mongo/s/committed_optime_metadata_hook.h"
#include "mongo/s/config_server_catalog_cache_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
@@ -116,17 +119,23 @@ void ShardingTestFixture::setUp() {
_opCtx = _client->makeOperationContext();
// Set up executor pool used for most operations.
+ auto makeMetadataHookList = [&] {
+ auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>();
+ hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(service));
+ hookList->addHook(stdx::make_unique<rpc::CommittedOpTimeMetadataHook>(service));
+ hookList->addHook(stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(service));
+ return hookList;
+ };
+
auto fixedNet = stdx::make_unique<executor::NetworkInterfaceMock>();
- fixedNet->setEgressMetadataHook(
- stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(service));
+ fixedNet->setEgressMetadataHook(makeMetadataHookList());
_mockNetwork = fixedNet.get();
auto fixedExec = makeShardingTestExecutor(std::move(fixedNet));
_networkTestEnv = stdx::make_unique<NetworkTestEnv>(fixedExec.get(), _mockNetwork);
_executor = fixedExec.get();
auto netForPool = stdx::make_unique<executor::NetworkInterfaceMock>();
- netForPool->setEgressMetadataHook(
- stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(service));
+ netForPool->setEgressMetadataHook(makeMetadataHookList());
auto _mockNetworkForPool = netForPool.get();
auto execForPool = makeShardingTestExecutor(std::move(netForPool));
_networkTestEnvForPool =