From 248aea0dad9f9a3d46cb37547a546b0c9fe7e135 Mon Sep 17 00:00:00 2001 From: Randolph Tan Date: Fri, 7 Aug 2015 15:15:18 -0400 Subject: SERVER-19390 Make config server queries do read committed --- src/mongo/executor/network_test_env.cpp | 44 +++- src/mongo/executor/network_test_env.h | 7 + src/mongo/s/catalog/SConscript | 1 + src/mongo/s/catalog/catalog_manager.h | 4 +- src/mongo/s/catalog/catalog_manager_mock.cpp | 4 +- src/mongo/s/catalog/catalog_manager_mock.h | 4 +- src/mongo/s/catalog/dist_lock_catalog_impl.cpp | 34 ++- src/mongo/s/catalog/dist_lock_catalog_impl.h | 11 + .../s/catalog/dist_lock_catalog_impl_test.cpp | 96 ++++---- .../s/catalog/legacy/catalog_manager_legacy.cpp | 5 +- .../s/catalog/legacy/catalog_manager_legacy.h | 4 +- .../replset/catalog_manager_replica_set.cpp | 134 ++++++----- .../catalog/replset/catalog_manager_replica_set.h | 10 +- .../catalog_manager_replica_set_add_shard_test.cpp | 23 +- ...talog_manager_replica_set_remove_shard_test.cpp | 8 + ...g_manager_replica_set_shard_collection_test.cpp | 16 ++ .../replset/catalog_manager_replica_set_test.cpp | 251 +++++++++++++++++++-- .../catalog_manager_replica_set_test_fixture.cpp | 9 +- .../catalog_manager_replica_set_test_fixture.h | 2 + .../catalog_manager_replica_set_upgrade_test.cpp | 11 +- src/mongo/s/client/shard_registry.cpp | 48 +++- src/mongo/s/client/shard_registry.h | 18 +- 22 files changed, 574 insertions(+), 170 deletions(-) (limited to 'src/mongo') diff --git a/src/mongo/executor/network_test_env.cpp b/src/mongo/executor/network_test_env.cpp index 64241e72c12..21ad1ef1c6b 100644 --- a/src/mongo/executor/network_test_env.cpp +++ b/src/mongo/executor/network_test_env.cpp @@ -69,9 +69,22 @@ void NetworkTestEnv::onCommandWithMetadata(OnCommandWithMetadataFunction func) { const NetworkInterfaceMock::NetworkOperationIterator noi = _mockNetwork->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); - _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), func(request)); - _mockNetwork->runReadyNetworkOperations(); + const auto cmdResponseStatus = func(request); + const auto cmdResponse = cmdResponseStatus.getValue(); + + BSONObjBuilder result; + + if (cmdResponseStatus.isOK()) { + result.appendElements(cmdResponse.data); + } + + Command::appendCommandStatus(result, cmdResponseStatus.getStatus()); + + const RemoteCommandResponse response(result.obj(), cmdResponse.metadata, Milliseconds(1)); + + _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), response); + _mockNetwork->runReadyNetworkOperations(); _mockNetwork->exitNetwork(); } @@ -97,5 +110,32 @@ void NetworkTestEnv::onFindCommand(OnFindCommandFunction func) { }); } +void NetworkTestEnv::onFindWithMetadataCommand(OnFindCommandWithMetadataFunction func) { + onCommandWithMetadata( + [&func](const RemoteCommandRequest& request) -> StatusWith { + const auto& resultStatus = func(request); + + if (!resultStatus.isOK()) { + return resultStatus.getStatus(); + } + + std::vector result; + BSONObj metadata; + std::tie(result, metadata) = resultStatus.getValue(); + + BSONArrayBuilder arr; + for (const auto& obj : result) { + arr.append(obj); + } + + const NamespaceString nss = + NamespaceString(request.dbname, request.cmdObj.firstElement().String()); + BSONObjBuilder resultBuilder; + appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &resultBuilder); + + return RemoteCommandResponse(resultBuilder.obj(), metadata, Milliseconds(1)); + }); +} + } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_test_env.h b/src/mongo/executor/network_test_env.h index 6ef0f242597..717fbb642ea 100644 --- a/src/mongo/executor/network_test_env.h +++ b/src/mongo/executor/network_test_env.h @@ -28,6 +28,7 @@ #pragma once +#include #include #include @@ -135,6 +136,11 @@ public: using OnFindCommandFunction = stdx::function>(const RemoteCommandRequest&)>; + // Function that accepts a find request and returns a tuple of resulting documents and response + // metadata. + using OnFindCommandWithMetadataFunction = + stdx::function, BSONObj>>( + const RemoteCommandRequest&)>; /** * Create a new environment based on the given network. @@ -149,6 +155,7 @@ public: void onCommand(OnCommandFunction func); void onCommandWithMetadata(OnCommandWithMetadataFunction func); void onFindCommand(OnFindCommandFunction func); + void onFindWithMetadataCommand(OnFindCommandWithMetadataFunction func); private: // Task executor used for running asynchronous operations. diff --git a/src/mongo/s/catalog/SConscript b/src/mongo/s/catalog/SConscript index ab3d850e2ad..a0ed81168d8 100644 --- a/src/mongo/s/catalog/SConscript +++ b/src/mongo/s/catalog/SConscript @@ -133,6 +133,7 @@ env.Library( '$BUILD_DIR/mongo/client/remote_command_targeter', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/query/command_request_response', + '$BUILD_DIR/mongo/db/repl/read_concern_args', '$BUILD_DIR/mongo/rpc/command_status', '$BUILD_DIR/mongo/s/client/sharding_client', '$BUILD_DIR/mongo/util/net/hostandport', diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h index 366e6c63644..6d5bdf07314 100644 --- a/src/mongo/s/catalog/catalog_manager.h +++ b/src/mongo/s/catalog/catalog_manager.h @@ -430,12 +430,12 @@ private: * NamespaceExists if it exists with the same casing * DatabaseDifferCase if it exists under different casing. */ - virtual Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) const = 0; + virtual Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) = 0; /** * Generates a unique name to be given to a newly added shard. */ - virtual StatusWith _generateNewShardName() const = 0; + virtual StatusWith _generateNewShardName() = 0; }; } // namespace mongo diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp index ab2cf24bfc9..5456cba8b3f 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/catalog_manager_mock.cpp @@ -168,11 +168,11 @@ DistLockManager* CatalogManagerMock::getDistLockManager() const { return _mockDistLockMgr.get(); } -Status CatalogManagerMock::_checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) const { +Status CatalogManagerMock::_checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) { return Status::OK(); } -StatusWith CatalogManagerMock::_generateNewShardName() const { +StatusWith CatalogManagerMock::_generateNewShardName() { return {ErrorCodes::InternalError, "Method not implemented"}; } diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h index 83bcab351e2..b70d6855979 100644 --- a/src/mongo/s/catalog/catalog_manager_mock.h +++ b/src/mongo/s/catalog/catalog_manager_mock.h @@ -122,9 +122,9 @@ public: Status checkAndUpgrade(bool checkOnly) override; private: - Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) const override; + Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) override; - StatusWith _generateNewShardName() const override; + StatusWith _generateNewShardName() override; std::unique_ptr _mockDistLockMgr; }; diff --git a/src/mongo/s/catalog/dist_lock_catalog_impl.cpp b/src/mongo/s/catalog/dist_lock_catalog_impl.cpp index 6f02a4aba32..fbfe5e0d34e 100644 --- a/src/mongo/s/catalog/dist_lock_catalog_impl.cpp +++ b/src/mongo/s/catalog/dist_lock_catalog_impl.cpp @@ -39,7 +39,9 @@ #include "mongo/client/remote_command_targeter.h" #include "mongo/db/lasterror.h" #include "mongo/db/query/find_and_modify_request.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/sharding_metadata.h" #include "mongo/s/catalog/type_lockpings.h" #include "mongo/s/catalog/type_locks.h" @@ -50,12 +52,14 @@ namespace mongo { using std::string; +using std::vector; namespace { const char kCmdResponseWriteConcernField[] = "writeConcernError"; const char kFindAndModifyResponseResultDocField[] = "value"; const char kLocalTimeField[] = "localTime"; +const BSONObj kReplMetadata = BSON(rpc::kReplicationMetadataFieldName << 1); const ReadPreferenceSetting kReadPref(ReadPreference::PrimaryOnly, TagSet()); /** @@ -163,11 +167,11 @@ StatusWith DistLockCatalogImpl::getPing(StringData processID) { return targetStatus.getStatus(); } - auto findResult = _client->exhaustiveFind(targetStatus.getValue(), - _lockPingNS, - BSON(LockpingsType::process() << processID), - BSONObj(), - 1); + auto findResult = _findOnConfig(targetStatus.getValue(), + _lockPingNS, + BSON(LockpingsType::process() << processID), + BSONObj(), + 1); if (!findResult.isOK()) { return findResult.getStatus(); @@ -382,7 +386,7 @@ StatusWith DistLockCatalogImpl::getLockByTS(const OID& lockSessionID) return targetStatus.getStatus(); } - auto findResult = _client->exhaustiveFind( + auto findResult = _findOnConfig( targetStatus.getValue(), _locksNS, BSON(LocksType::lockID(lockSessionID)), BSONObj(), 1); if (!findResult.isOK()) { @@ -414,7 +418,7 @@ StatusWith DistLockCatalogImpl::getLockByName(StringData name) { return targetStatus.getStatus(); } - auto findResult = _client->exhaustiveFind( + auto findResult = _findOnConfig( targetStatus.getValue(), _locksNS, BSON(LocksType::name() << name), BSONObj(), 1); if (!findResult.isOK()) { @@ -457,4 +461,20 @@ Status DistLockCatalogImpl::stopPing(StringData processId) { return findAndModifyStatus.getStatus(); } +StatusWith> DistLockCatalogImpl::_findOnConfig(const HostAndPort& host, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& sort, + boost::optional limit) { + repl::ReadConcernArgs readConcern(boost::none, repl::ReadConcernLevel::kMajorityReadConcern); + auto result = + _client->exhaustiveFind(host, nss, query, sort, limit, readConcern, kReplMetadata); + + if (!result.isOK()) { + return result.getStatus(); + } + + return result.getValue().docs; +} + } // namespace mongo diff --git a/src/mongo/s/catalog/dist_lock_catalog_impl.h b/src/mongo/s/catalog/dist_lock_catalog_impl.h index 48343790f06..41764dd7089 100644 --- a/src/mongo/s/catalog/dist_lock_catalog_impl.h +++ b/src/mongo/s/catalog/dist_lock_catalog_impl.h @@ -28,6 +28,9 @@ #pragma once +#include +#include + #include "mongo/bson/oid.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" @@ -37,6 +40,8 @@ namespace mongo { +struct HostAndPort; +class NamespaceString; class RemoteCommandTargeter; class ShardRegistry; @@ -78,6 +83,12 @@ public: private: RemoteCommandTargeter* _targeter(); + StatusWith> _findOnConfig(const HostAndPort& host, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& sort, + boost::optional limit); + ShardRegistry* _client; // These are not static to avoid initialization order fiasco. diff --git a/src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp b/src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp index be1de29ea4f..4ffb1782fde 100644 --- a/src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp +++ b/src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp @@ -38,6 +38,7 @@ #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" #include "mongo/db/query/find_and_modify_request.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/network_test_env.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" @@ -59,6 +60,7 @@ using executor::NetworkInterfaceMock; using executor::NetworkTestEnv; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; +using repl::ReadConcernArgs; namespace { @@ -133,6 +135,12 @@ private: std::unique_ptr _distLockCatalog; }; +void checkReadConcern(const BSONObj& findCmd) { + auto readConcernElem = findCmd[ReadConcernArgs::kReadConcernFieldName]; + ASSERT_EQ(Object, readConcernElem.type()); + ASSERT_EQ(BSON(ReadConcernArgs::kLevelFieldName << "majority"), readConcernElem.Obj()); +} + TEST_F(DistLockCatalogFixture, BasicPing) { auto future = launchAsync([this] { Date_t ping(dateFromISOString("2014-03-11T09:17:18.098Z").getValue()); @@ -1291,28 +1299,29 @@ TEST_F(DistLockCatalogFixture, BasicGetPing) { ASSERT_EQUALS(ping, pingDoc.getPing()); }); - onFindCommand([](const RemoteCommandRequest& request) -> StatusWith> { - ASSERT_EQUALS(dummyHost, request.target); - ASSERT_EQUALS("config", request.dbname); - - BSONObj expectedCmd(fromjson(R"({ - find: "lockpings", - filter: { _id: "test" }, - limit: 1 - })")); + onFindCommand( + [](const RemoteCommandRequest& request) { + ASSERT_EQUALS(dummyHost, request.target); + ASSERT_EQUALS("config", request.dbname); - ASSERT_EQUALS(expectedCmd, request.cmdObj); + const auto& findCmd = request.cmdObj; + ASSERT_EQUALS("lockpings", findCmd["find"].str()); + ASSERT_EQUALS(BSON("_id" + << "test"), + findCmd["filter"].Obj()); + ASSERT_EQUALS(1, findCmd["limit"].numberLong()); + checkReadConcern(findCmd); - BSONObj pingDoc(fromjson(R"({ - _id: "test", - ping: { $date: "2015-05-26T13:06:27.293Z" } - })")); + BSONObj pingDoc(fromjson(R"({ + _id: "test", + ping: { $date: "2015-05-26T13:06:27.293Z" } + })")); - std::vector result; - result.push_back(pingDoc); + std::vector result; + result.push_back(pingDoc); - return result; - }); + return result; + }); future.timed_get(kFutureTimeout); } @@ -1382,13 +1391,11 @@ TEST_F(DistLockCatalogFixture, BasicGetLockByTS) { ASSERT_EQUALS(dummyHost, request.target); ASSERT_EQUALS("config", request.dbname); - BSONObj expectedCmd(fromjson(R"({ - find: "locks", - filter: { ts: ObjectId("555f99712c99a78c5b083358") }, - limit: 1 - })")); - - ASSERT_EQUALS(expectedCmd, request.cmdObj); + const auto& findCmd = request.cmdObj; + ASSERT_EQUALS("locks", findCmd["find"].str()); + ASSERT_EQUALS(BSON("ts" << OID("555f99712c99a78c5b083358")), findCmd["filter"].Obj()); + ASSERT_EQUALS(1, findCmd["limit"].numberLong()); + checkReadConcern(findCmd); BSONObj lockDoc(fromjson(R"({ _id: "test", @@ -1464,28 +1471,29 @@ TEST_F(DistLockCatalogFixture, BasicGetLockByName) { ASSERT_EQUALS(ts, lockDoc.getLockID()); }); - onFindCommand([](const RemoteCommandRequest& request) -> StatusWith> { - ASSERT_EQUALS(dummyHost, request.target); - ASSERT_EQUALS("config", request.dbname); + onFindCommand( + [](const RemoteCommandRequest& request) { + ASSERT_EQUALS(dummyHost, request.target); + ASSERT_EQUALS("config", request.dbname); - BSONObj expectedCmd(fromjson(R"({ - find: "locks", - filter: { _id: "abc" }, - limit: 1 - })")); - - ASSERT_EQUALS(expectedCmd, request.cmdObj); + const auto& findCmd = request.cmdObj; + ASSERT_EQUALS("locks", findCmd["find"].str()); + ASSERT_EQUALS(BSON("_id" + << "abc"), + findCmd["filter"].Obj()); + ASSERT_EQUALS(1, findCmd["limit"].numberLong()); + checkReadConcern(findCmd); - BSONObj lockDoc(fromjson(R"({ - _id: "abc", - state: 2, - ts: ObjectId("555f99712c99a78c5b083358") - })")); + BSONObj lockDoc(fromjson(R"({ + _id: "abc", + state: 2, + ts: ObjectId("555f99712c99a78c5b083358") + })")); - std::vector result; - result.push_back(lockDoc); - return result; - }); + std::vector result; + result.push_back(lockDoc); + return result; + }); future.timed_get(kFutureTimeout); } diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index 24b710ec14b..93b513aeace 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -972,8 +972,7 @@ void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest& exec.executeBatch(request, response); } -Status CatalogManagerLegacy::_checkDbDoesNotExist(const std::string& dbName, - DatabaseType* db) const { +Status CatalogManagerLegacy::_checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) { ScopedDbConnection conn(_configServerConnectionString, 30); BSONObjBuilder b; @@ -1008,7 +1007,7 @@ Status CatalogManagerLegacy::_checkDbDoesNotExist(const std::string& dbName, return Status::OK(); } -StatusWith CatalogManagerLegacy::_generateNewShardName() const { +StatusWith CatalogManagerLegacy::_generateNewShardName() { BSONObj o; { ScopedDbConnection conn(_configServerConnectionString, 30); diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h index 068b8555d91..24354cc527f 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h @@ -125,9 +125,9 @@ public: Status checkAndUpgrade(bool checkOnly) override; private: - Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) const override; + Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) override; - StatusWith _generateNewShardName() const override; + StatusWith _generateNewShardName() override; /** * Starts the thread that periodically checks data consistency amongst the config servers. diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 1590821c415..7a0db0b085f 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -365,11 +365,11 @@ StatusWith CatalogManagerReplicaSet::getDatabase(const std::string return readHost.getStatus(); } - auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), - NamespaceString(DatabaseType::ConfigNS), - BSON(DatabaseType::name(dbName)), - BSONObj(), - 1); + auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(), + NamespaceString(DatabaseType::ConfigNS), + BSON(DatabaseType::name(dbName)), + BSONObj(), + 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -392,12 +392,11 @@ StatusWith CatalogManagerReplicaSet::getCollection(const std::st return readHostStatus.getStatus(); } - auto statusFind = - grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(), - NamespaceString(CollectionType::ConfigNS), - BSON(CollectionType::fullNs(collNs)), - BSONObj(), - 1); + auto statusFind = _exhaustiveFindOnConfig(readHostStatus.getValue(), + NamespaceString(CollectionType::ConfigNS), + BSON(CollectionType::fullNs(collNs)), + BSONObj(), + 1); if (!statusFind.isOK()) { return statusFind.getStatus(); } @@ -428,12 +427,11 @@ Status CatalogManagerReplicaSet::getCollections(const std::string* dbName, return readHost.getStatus(); } - auto findStatus = - grid.shardRegistry()->exhaustiveFind(readHost.getValue(), - NamespaceString(CollectionType::ConfigNS), - b.obj(), - BSONObj(), - boost::none); // no limit + auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(), + NamespaceString(CollectionType::ConfigNS), + b.obj(), + BSONObj(), + boost::none); // no limit if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -531,11 +529,11 @@ StatusWith CatalogManagerReplicaSet::getGlobalSettings(const strin return readHost.getStatus(); } - auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), - NamespaceString(SettingsType::ConfigNS), - BSON(SettingsType::key(key)), - BSONObj(), - 1); + auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(), + NamespaceString(SettingsType::ConfigNS), + BSON(SettingsType::key(key)), + BSONObj(), + 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -572,11 +570,11 @@ Status CatalogManagerReplicaSet::getDatabasesForShard(const string& shardName, return readHost.getStatus(); } - auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), - NamespaceString(DatabaseType::ConfigNS), - BSON(DatabaseType::primary(shardName)), - BSONObj(), - boost::none); // no limit + auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(), + NamespaceString(DatabaseType::ConfigNS), + BSON(DatabaseType::primary(shardName)), + BSONObj(), + boost::none); // no limit if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -609,7 +607,7 @@ Status CatalogManagerReplicaSet::getChunks(const BSONObj& query, // Convert boost::optional to boost::optional. auto longLimit = limit ? boost::optional(*limit) : boost::none; - auto findStatus = grid.shardRegistry()->exhaustiveFind( + auto findStatus = _exhaustiveFindOnConfig( readHostStatus.getValue(), NamespaceString(ChunkType::ConfigNS), query, sort, longLimit); if (!findStatus.isOK()) { return findStatus.getStatus(); @@ -641,11 +639,11 @@ Status CatalogManagerReplicaSet::getTagsForCollection(const std::string& collect return readHostStatus.getStatus(); } - auto findStatus = grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(), - NamespaceString(TagsType::ConfigNS), - BSON(TagsType::ns(collectionNs)), - BSON(TagsType::min() << 1), - boost::none); // no limit + auto findStatus = _exhaustiveFindOnConfig(readHostStatus.getValue(), + NamespaceString(TagsType::ConfigNS), + BSON(TagsType::ns(collectionNs)), + BSON(TagsType::min() << 1), + boost::none); // no limit if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -675,7 +673,7 @@ StatusWith CatalogManagerReplicaSet::getTagForChunk(const std::string& c BSONObj query = BSON(TagsType::ns(collectionNs) << TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max() << BSON("$gte" << chunk.getMax())); - auto findStatus = grid.shardRegistry()->exhaustiveFind( + auto findStatus = _exhaustiveFindOnConfig( readHostStatus.getValue(), NamespaceString(TagsType::ConfigNS), query, BSONObj(), 1); if (!findStatus.isOK()) { return findStatus.getStatus(); @@ -705,11 +703,11 @@ Status CatalogManagerReplicaSet::getAllShards(vector* shards) { return readHost.getStatus(); } - auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), - NamespaceString(ShardType::ConfigNS), - BSONObj(), // no query filter - BSONObj(), // no sort - boost::none); // no limit + auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(), + NamespaceString(ShardType::ConfigNS), + BSONObj(), // no query filter + BSONObj(), // no sort + boost::none); // no limit if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -807,8 +805,7 @@ void CatalogManagerReplicaSet::writeConfigServerDirect(const BatchedCommandReque } } -Status CatalogManagerReplicaSet::_checkDbDoesNotExist(const string& dbName, - DatabaseType* db) const { +Status CatalogManagerReplicaSet::_checkDbDoesNotExist(const string& dbName, DatabaseType* db) { BSONObjBuilder queryBuilder; queryBuilder.appendRegex( DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbName) + "$", "i"); @@ -819,11 +816,11 @@ Status CatalogManagerReplicaSet::_checkDbDoesNotExist(const string& dbName, return readHost.getStatus(); } - auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), - NamespaceString(DatabaseType::ConfigNS), - queryBuilder.obj(), - BSONObj(), - 1); + auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(), + NamespaceString(DatabaseType::ConfigNS), + queryBuilder.obj(), + BSONObj(), + 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -854,7 +851,7 @@ Status CatalogManagerReplicaSet::_checkDbDoesNotExist(const string& dbName, << " have: " << actualDbName << " want to add: " << dbName); } -StatusWith CatalogManagerReplicaSet::_generateNewShardName() const { +StatusWith CatalogManagerReplicaSet::_generateNewShardName() { const auto configShard = grid.shardRegistry()->getShard("config"); const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { @@ -864,11 +861,11 @@ StatusWith CatalogManagerReplicaSet::_generateNewShardName() const BSONObjBuilder shardNameRegex; shardNameRegex.appendRegex(ShardType::name(), "^shard"); - auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), - NamespaceString(ShardType::ConfigNS), - shardNameRegex.obj(), - BSON(ShardType::name() << -1), - 1); + auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(), + NamespaceString(ShardType::ConfigNS), + shardNameRegex.obj(), + BSON(ShardType::name() << -1), + 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -984,11 +981,11 @@ StatusWith CatalogManagerReplicaSet::_getConfigVersion() { } auto readHost = readHostStatus.getValue(); - auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost, - NamespaceString(VersionType::ConfigNS), - BSONObj(), - BSONObj(), - boost::none /* no limit */); + auto findStatus = _exhaustiveFindOnConfig(readHost, + NamespaceString(VersionType::ConfigNS), + BSONObj(), + BSONObj(), + boost::none /* no limit */); if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -1074,6 +1071,29 @@ StatusWith CatalogManagerReplicaSet::_runCommandOnConfigWithNotMasterRe return response.response; } +StatusWith> CatalogManagerReplicaSet::_exhaustiveFindOnConfig( + const HostAndPort& host, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& sort, + boost::optional limit) { + repl::ReadConcernArgs readConcern(_getConfigOpTime(), + repl::ReadConcernLevel::kMajorityReadConcern); + + auto result = grid.shardRegistry()->exhaustiveFind( + host, nss, query, sort, limit, readConcern, kReplMetadata); + + if (!result.isOK()) { + return result.getStatus(); + } + + auto response = std::move(result.getValue()); + + _updateLastSeenConfigOpTime(response.opTime); + + return std::move(response.docs); +} + repl::OpTime CatalogManagerReplicaSet::_getConfigOpTime() { stdx::lock_guard lk(_mutex); return _configOpTime; diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index 18c2bec94b1..f2c53a924f7 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -128,9 +128,9 @@ public: Status checkAndUpgrade(bool checkOnly) override; private: - Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) const override; + Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) override; - StatusWith _generateNewShardName() const override; + StatusWith _generateNewShardName() override; bool _runReadCommand(const std::string& dbname, const BSONObj& cmdObj, @@ -152,6 +152,12 @@ private: StatusWith _runCommandOnConfigWithNotMasterRetries(const std::string& dbName, BSONObj cmdObj); + StatusWith> _exhaustiveFindOnConfig(const HostAndPort& host, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& sort, + boost::optional limit); + /** * Appends a read committed read concern to the request object. */ diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp index 14c5b87da11..bd7b874ee1b 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp @@ -214,6 +214,7 @@ TEST_F(AddShardTest, AddShardStandalone) { // in the previous call, in the config server metadata onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(request.target, configHost); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.toString(), DatabaseType::ConfigNS); @@ -223,17 +224,23 @@ TEST_F(AddShardTest, AddShardStandalone) { ASSERT_EQ(query->ns(), DatabaseType::ConfigNS); ASSERT_EQ(query->getFilter(), BSON(DatabaseType::name("TestDB1"))); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{}; }); - onFindCommand([](const RemoteCommandRequest& request) { + onFindCommand([this](const RemoteCommandRequest& request) { const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.toString(), DatabaseType::ConfigNS); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); ASSERT_EQ(query->ns(), DatabaseType::ConfigNS); ASSERT_EQ(query->getFilter(), BSON(DatabaseType::name("TestDB2"))); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{}; }); @@ -321,6 +328,7 @@ TEST_F(AddShardTest, AddShardStandaloneGenerateName) { // in the previous call, in the config server metadata onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(request.target, configHost); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.toString(), DatabaseType::ConfigNS); @@ -330,10 +338,14 @@ TEST_F(AddShardTest, AddShardStandaloneGenerateName) { ASSERT_EQ(query->ns(), DatabaseType::ConfigNS); ASSERT_EQ(query->getFilter(), BSON(DatabaseType::name("TestDB1"))); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{}; }); - onFindCommand([](const RemoteCommandRequest& request) { + onFindCommand([this](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.toString(), DatabaseType::ConfigNS); auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); @@ -341,6 +353,8 @@ TEST_F(AddShardTest, AddShardStandaloneGenerateName) { ASSERT_EQ(query->ns(), DatabaseType::ConfigNS); ASSERT_EQ(query->getFilter(), BSON(DatabaseType::name("TestDB2"))); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{}; }); @@ -350,13 +364,16 @@ TEST_F(AddShardTest, AddShardStandaloneGenerateName) { existingShard.setMaxSizeMB(100); // New name is being generated for the new shard - onFindCommand([&existingShard](const RemoteCommandRequest& request) { + onFindCommand([this, &existingShard](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.toString(), ShardType::ConfigNS); auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); ASSERT_EQ(query->ns(), ShardType::ConfigNS); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{existingShard.toBSON()}; }); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp index dc12eeb5a7d..74245c53fa0 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp @@ -171,6 +171,8 @@ TEST_F(RemoveShardTest, RemoveShardStartDraining) { // Respond to request to reload information about existing shards onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); @@ -179,6 +181,8 @@ TEST_F(RemoveShardTest, RemoveShardStartDraining) { ASSERT_EQ(BSONObj(), query->getSort()); ASSERT_FALSE(query->getLimit().is_initialized()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + ShardType remainingShard; remainingShard.setHost("host1"); remainingShard.setName("shard0"); @@ -343,6 +347,8 @@ TEST_F(RemoveShardTest, RemoveShardCompletion) { // Respond to request to reload information about existing shards onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); @@ -351,6 +357,8 @@ TEST_F(RemoveShardTest, RemoveShardCompletion) { ASSERT_EQ(BSONObj(), query->getSort()); ASSERT_FALSE(query->getLimit().is_initialized()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + ShardType remainingShard; remainingShard.setHost("host1"); remainingShard.setName("shard0"); diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp index 3c0a59a68ec..98d144fc961 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp @@ -80,6 +80,8 @@ public: void expectGetDatabase(const DatabaseType& expectedDb) { onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(DatabaseType::ConfigNS, nss.ns()); @@ -91,6 +93,8 @@ public: ASSERT_EQ(BSONObj(), query->getSort()); ASSERT_EQ(1, query->getLimit().get()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{expectedDb.toBSON()}; }); } @@ -146,6 +150,8 @@ public: void expectReloadChunks(const std::string& ns, const vector& chunks) { onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), ChunkType::ConfigNS); @@ -160,6 +166,8 @@ public: ASSERT_EQ(expectedSort, query->getSort()); ASSERT_FALSE(query->getLimit().is_initialized()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + vector chunksToReturn; std::transform(chunks.begin(), @@ -202,6 +210,8 @@ public: void expectReloadCollection(const CollectionType& collection) { onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), CollectionType::ConfigNS); @@ -217,6 +227,8 @@ public: } ASSERT_EQ(BSONObj(), query->getSort()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{collection.toBSON()}; }); } @@ -224,6 +236,8 @@ public: void expectLoadNewestChunk(const string& ns, const ChunkType& chunk) { onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), ChunkType::ConfigNS); @@ -237,6 +251,8 @@ public: ASSERT_EQ(expectedSort, query->getSort()); ASSERT_EQ(1, query->getLimit().get()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{chunk.toBSON()}; }); } diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp index 8d295a29e6c..9473bd7e26c 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp @@ -86,7 +86,9 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionExisting) { return assertGet(catalogManager()->getCollection(expectedColl.getNs().ns())); }); - onFindCommand([&expectedColl](const RemoteCommandRequest& request) { + onFindCommand([this, &expectedColl](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), CollectionType::ConfigNS); @@ -98,6 +100,8 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionExisting) { ASSERT_EQ(query->getSort(), BSONObj()); ASSERT_EQ(query->getLimit().get(), 1); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{expectedColl.toBSON()}; }); @@ -132,10 +136,12 @@ TEST_F(CatalogManagerReplSetTest, GetDatabaseExisting) { return assertGet(catalogManager()->getDatabase(expectedDb.getName())); }); - onFindCommand([&expectedDb](const RemoteCommandRequest& request) { + onFindCommand([this, &expectedDb](const RemoteCommandRequest& request) { const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), DatabaseType::ConfigNS); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); ASSERT_EQ(query->ns(), DatabaseType::ConfigNS); @@ -143,6 +149,8 @@ TEST_F(CatalogManagerReplSetTest, GetDatabaseExisting) { ASSERT_EQ(query->getSort(), BSONObj()); ASSERT_EQ(query->getLimit().get(), 1); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{expectedDb.toBSON()}; }); @@ -297,7 +305,9 @@ TEST_F(CatalogManagerReplSetTest, GetAllShardsValid) { return shards; }); - onFindCommand([&s1, &s2, &s3](const RemoteCommandRequest& request) { + onFindCommand([this, &s1, &s2, &s3](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), ShardType::ConfigNS); @@ -308,6 +318,8 @@ TEST_F(CatalogManagerReplSetTest, GetAllShardsValid) { ASSERT_EQ(query->getSort(), BSONObj()); ASSERT_FALSE(query->getLimit().is_initialized()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{s1.toBSON(), s2.toBSON(), s3.toBSON()}; }); @@ -383,7 +395,9 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSWithSortAndLimit) { return chunks; }); - onFindCommand([&chunksQuery, chunkA, chunkB](const RemoteCommandRequest& request) { + onFindCommand([this, &chunksQuery, chunkA, chunkB](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), ChunkType::ConfigNS); @@ -394,6 +408,8 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSWithSortAndLimit) { ASSERT_EQ(query->getSort(), BSON(ChunkType::version() << -1)); ASSERT_EQ(query->getLimit().get(), 1); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{chunkA.toBSON(), chunkB.toBSON()}; }); @@ -421,7 +437,9 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSNoSortNoLimit) { return chunks; }); - onFindCommand([&chunksQuery](const RemoteCommandRequest& request) { + onFindCommand([this, &chunksQuery](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), ChunkType::ConfigNS); @@ -432,6 +450,8 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSNoSortNoLimit) { ASSERT_EQ(query->getSort(), BSONObj()); ASSERT_FALSE(query->getLimit().is_initialized()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{}; }); @@ -697,7 +717,9 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsBalancerDoc) { return assertGet(catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey)); }); - onFindCommand([st1](const RemoteCommandRequest& request) { + onFindCommand([this, st1](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), SettingsType::ConfigNS); @@ -706,6 +728,8 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsBalancerDoc) { ASSERT_EQ(query->ns(), SettingsType::ConfigNS); ASSERT_EQ(query->getFilter(), BSON(SettingsType::key(SettingsType::BalancerDocKey))); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{st1.toBSON()}; }); @@ -725,7 +749,9 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsChunkSizeDoc) { return assertGet(catalogManager()->getGlobalSettings(SettingsType::ChunkSizeDocKey)); }); - onFindCommand([st1](const RemoteCommandRequest& request) { + onFindCommand([this, st1](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), SettingsType::ConfigNS); @@ -734,6 +760,8 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsChunkSizeDoc) { ASSERT_EQ(query->ns(), SettingsType::ConfigNS); ASSERT_EQ(query->getFilter(), BSON(SettingsType::key(SettingsType::ChunkSizeDocKey))); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{st1.toBSON()}; }); @@ -750,7 +778,9 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsInvalidDoc) { ASSERT_EQ(balSettings.getStatus(), ErrorCodes::FailedToParse); }); - onFindCommand([](const RemoteCommandRequest& request) { + onFindCommand([this](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), SettingsType::ConfigNS); @@ -759,6 +789,8 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsInvalidDoc) { ASSERT_EQ(query->ns(), SettingsType::ConfigNS); ASSERT_EQ(query->getFilter(), BSON(SettingsType::key("invalidKey"))); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{ BSON("invalidKey" << "some value") // invalid settings document -- key is required @@ -778,7 +810,9 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsNonExistent) { ASSERT_EQ(chunkSizeSettings.getStatus(), ErrorCodes::NoMatchingDocument); }); - onFindCommand([](const RemoteCommandRequest& request) { + onFindCommand([this](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), SettingsType::ConfigNS); @@ -787,6 +821,8 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsNonExistent) { ASSERT_EQ(query->ns(), SettingsType::ConfigNS); ASSERT_EQ(query->getFilter(), BSON(SettingsType::key(SettingsType::ChunkSizeDocKey))); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{}; }); @@ -829,7 +865,9 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsNoDb) { return collections; }); - onFindCommand([coll1, coll2, coll3](const RemoteCommandRequest& request) { + onFindCommand([this, coll1, coll2, coll3](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), CollectionType::ConfigNS); @@ -839,6 +877,8 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsNoDb) { ASSERT_EQ(query->getFilter(), BSONObj()); ASSERT_EQ(query->getSort(), BSONObj()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{coll1.toBSON(), coll2.toBSON(), coll3.toBSON()}; }); @@ -876,7 +916,9 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsWithDb) { return collections; }); - onFindCommand([coll1, coll2](const RemoteCommandRequest& request) { + onFindCommand([this, coll1, coll2](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), CollectionType::ConfigNS); @@ -889,6 +931,8 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsWithDb) { ASSERT_EQ(query->getFilter(), b.obj()); } + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{coll1.toBSON(), coll2.toBSON()}; }); @@ -919,10 +963,12 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsInvalidCollectionType) { validColl.setKeyPattern(KeyPattern{BSON("_id" << 1)}); ASSERT_OK(validColl.validate()); - onFindCommand([validColl](const RemoteCommandRequest& request) { + onFindCommand([this, validColl](const RemoteCommandRequest& request) { const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), CollectionType::ConfigNS); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); ASSERT_EQ(query->ns(), CollectionType::ConfigNS); @@ -932,6 +978,8 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsInvalidCollectionType) { ASSERT_EQ(query->getFilter(), b.obj()); } + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{ validColl.toBSON(), BSONObj() // empty document is invalid @@ -960,7 +1008,9 @@ TEST_F(CatalogManagerReplSetTest, GetDatabasesForShardValid) { return dbs; }); - onFindCommand([dbt1, dbt2](const RemoteCommandRequest& request) { + onFindCommand([this, dbt1, dbt2](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), DatabaseType::ConfigNS); @@ -970,6 +1020,8 @@ TEST_F(CatalogManagerReplSetTest, GetDatabasesForShardValid) { ASSERT_EQ(query->getFilter(), BSON(DatabaseType::primary(dbt1.getPrimary()))); ASSERT_EQ(query->getSort(), BSONObj()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{dbt1.toBSON(), dbt2.toBSON()}; }); @@ -1028,7 +1080,9 @@ TEST_F(CatalogManagerReplSetTest, GetTagsForCollection) { return tags; }); - onFindCommand([tagA, tagB](const RemoteCommandRequest& request) { + onFindCommand([this, tagA, tagB](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), TagsType::ConfigNS); @@ -1038,6 +1092,8 @@ TEST_F(CatalogManagerReplSetTest, GetTagsForCollection) { ASSERT_EQ(query->getFilter(), BSON(TagsType::ns("TestDB.TestColl"))); ASSERT_EQ(query->getSort(), BSON(TagsType::min() << 1)); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{tagA.toBSON(), tagB.toBSON()}; }); @@ -1108,7 +1164,9 @@ TEST_F(CatalogManagerReplSetTest, GetTagForChunkOneTagFound) { auto future = launchAsync( [this, chunk] { return assertGet(catalogManager()->getTagForChunk("test.coll", chunk)); }); - onFindCommand([chunk](const RemoteCommandRequest& request) { + onFindCommand([this, chunk](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), TagsType::ConfigNS); @@ -1120,6 +1178,8 @@ TEST_F(CatalogManagerReplSetTest, GetTagForChunkOneTagFound) { << TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max() << BSON("$gte" << chunk.getMax()))); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + TagsType tt; tt.setNS("test.coll"); tt.setTag("tag"); @@ -1148,7 +1208,9 @@ TEST_F(CatalogManagerReplSetTest, GetTagForChunkNoTagFound) { auto future = launchAsync( [this, chunk] { return assertGet(catalogManager()->getTagForChunk("test.coll", chunk)); }); - onFindCommand([chunk](const RemoteCommandRequest& request) { + onFindCommand([this, chunk](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), TagsType::ConfigNS); @@ -1160,6 +1222,8 @@ TEST_F(CatalogManagerReplSetTest, GetTagForChunkNoTagFound) { << TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max() << BSON("$gte" << chunk.getMax()))); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{}; }); @@ -1185,7 +1249,9 @@ TEST_F(CatalogManagerReplSetTest, GetTagForChunkInvalidTagDoc) { ASSERT_EQ(ErrorCodes::FailedToParse, tagResult.getStatus()); }); - onFindCommand([chunk](const RemoteCommandRequest& request) { + onFindCommand([this, chunk](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.ns(), TagsType::ConfigNS); @@ -1197,6 +1263,8 @@ TEST_F(CatalogManagerReplSetTest, GetTagForChunkInvalidTagDoc) { << TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max() << BSON("$gte" << chunk.getMax()))); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + // Return a tag document missing the min key return vector{BSON(TagsType::ns("test.mycol") << TagsType::tag("tag") << TagsType::max(BSON("a" << 20)))}; @@ -1364,6 +1432,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); @@ -1372,6 +1442,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { ASSERT_EQ(BSONObj(), query->getSort()); ASSERT_FALSE(query->getLimit().is_initialized()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{s0.toBSON(), s1.toBSON(), s2.toBSON()}; }); @@ -1404,7 +1476,9 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); ASSERT_EQ(DatabaseType::ConfigNS, nss.ns()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); return vector{}; }); @@ -1414,6 +1488,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { ASSERT_EQUALS("admin", request.dbname); string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName)); ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); @@ -1426,6 +1501,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) { ASSERT_EQUALS("admin", request.dbname); string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName)); ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); @@ -1514,7 +1590,9 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDBExists) { ASSERT_EQUALS(ErrorCodes::NamespaceExists, status); }); - onFindCommand([dbname](const RemoteCommandRequest& request) { + onFindCommand([this, dbname](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); @@ -1524,6 +1602,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDBExists) { ASSERT_EQ(DatabaseType::ConfigNS, query->ns()); ASSERT_EQ(queryBuilder.obj(), query->getFilter()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); return vector{BSON("_id" << dbname)}; }); @@ -1550,7 +1629,9 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDBExistsDifferentCase) { ASSERT_EQUALS(ErrorCodes::DatabaseDifferCase, status); }); - onFindCommand([dbname, dbnameDiffCase](const RemoteCommandRequest& request) { + onFindCommand([this, dbname, dbnameDiffCase](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); @@ -1560,6 +1641,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDBExistsDifferentCase) { ASSERT_EQ(DatabaseType::ConfigNS, query->ns()); ASSERT_EQ(queryBuilder.obj(), query->getFilter()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); return vector{BSON("_id" << dbnameDiffCase)}; }); @@ -1586,14 +1668,17 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseNoShards) { }); // Report no databases with the same name already exist - onFindCommand([dbname](const RemoteCommandRequest& request) { + onFindCommand([this, dbname](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(DatabaseType::ConfigNS, nss.ns()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); return vector{}; }); // Report no shards exist - onFindCommand([](const RemoteCommandRequest& request) { + onFindCommand([this](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); @@ -1602,6 +1687,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseNoShards) { ASSERT_EQ(BSONObj(), query->getSort()); ASSERT_FALSE(query->getLimit().is_initialized()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{}; }); @@ -1630,6 +1717,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false)); @@ -1638,6 +1726,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { ASSERT_EQ(BSONObj(), query->getSort()); ASSERT_FALSE(query->getLimit().is_initialized()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{s0.toBSON(), s1.toBSON(), s2.toBSON()}; }); @@ -1669,8 +1759,10 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { // Report no databases with the same name already exist onFindCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(DatabaseType::ConfigNS, nss.ns()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); return vector{}; }); @@ -1680,6 +1772,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { ASSERT_EQUALS("admin", request.dbname); string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName)); ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); @@ -1692,6 +1785,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { ASSERT_EQUALS("admin", request.dbname); string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName)); ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); @@ -1704,6 +1798,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { ASSERT_EQUALS("admin", request.dbname); string cmdName = request.cmdObj.firstElement().fieldName(); ASSERT_EQUALS("listDatabases", cmdName); + ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName)); ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata); @@ -1714,6 +1809,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) { onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(configHost, request.target); ASSERT_EQUALS("config", request.dbname); + ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName)); ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); @@ -1773,7 +1869,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExists) { }); // Query to find if db already exists in config. - onFindCommand([](const RemoteCommandRequest& request) { + onFindCommand([this](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(DatabaseType::ConfigNS, nss.toString()); @@ -1788,6 +1885,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExists) { ASSERT_EQ(BSONObj(), query->getSort()); ASSERT_EQ(1, query->getLimit().get()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + return vector{}; }); @@ -2109,5 +2208,113 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) { future3.timed_get(kFutureTimeout); } +TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeFindThenCmd) { + configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1")); + + auto future1 = launchAsync( + [this] { ASSERT_OK(catalogManager()->getGlobalSettings("chunksize").getStatus()); }); + + repl::OpTime highestOpTime; + const repl::OpTime newOpTime(Timestamp(7, 6), 5); + + onFindWithMetadataCommand( + [this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); + + ReplSetMetadata metadata(12, newOpTime, 100, 3); + BSONObjBuilder builder; + BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName)); + metadata.writeToMetadata(&replBuilder); + replBuilder.done(); + + SettingsType settings; + settings.setKey("chunksize"); + settings.setChunkSizeMB(2); + + return std::make_tuple(vector{settings.toBSON()}, builder.obj()); + }); + + future1.timed_get(kFutureTimeout); + + highestOpTime = newOpTime; + + // Return an older OpTime + auto future2 = launchAsync([this] { + BSONObjBuilder responseBuilder; + ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); + }); + + const repl::OpTime oldOpTime(Timestamp(3, 10), 5); + + onCommand([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) { + ASSERT_EQUALS("test", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + + ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); + checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); + + return BSON("ok" << 1); + }); + + future2.timed_get(kFutureTimeout); +} + +TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeCmdThenFind) { + configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1")); + + // Initialize the internal config OpTime + auto future1 = launchAsync([this] { + BSONObjBuilder responseBuilder; + ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder)); + }); + + repl::OpTime highestOpTime; + const repl::OpTime newOpTime(Timestamp(7, 6), 5); + + onCommandWithMetadata([this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) { + ASSERT_EQUALS("test", request.dbname); + + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + + ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName()); + checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); + + ReplSetMetadata metadata(12, newOpTime, 100, 3); + BSONObjBuilder builder; + BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName)); + metadata.writeToMetadata(&replBuilder); + replBuilder.done(); + + return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1)); + }); + + future1.timed_get(kFutureTimeout); + + highestOpTime = newOpTime; + + // Return an older OpTime + auto future2 = launchAsync( + [this] { ASSERT_OK(catalogManager()->getGlobalSettings("chunksize").getStatus()); }); + + const repl::OpTime oldOpTime(Timestamp(3, 10), 5); + + onFindCommand([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + + ASSERT_EQ(string("find"), request.cmdObj.firstElementFieldName()); + checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm()); + + SettingsType settings; + settings.setKey("chunksize"); + settings.setChunkSizeMB(2); + + return vector{settings.toBSON()}; + }); + + future2.timed_get(kFutureTimeout); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp index 3b0036418a5..36eb4bbac64 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp @@ -189,6 +189,11 @@ void CatalogManagerReplSetTestFixture::onFindCommand(NetworkTestEnv::OnFindComma _networkTestEnv->onFindCommand(func); } +void CatalogManagerReplSetTestFixture::onFindWithMetadataCommand( + NetworkTestEnv::OnFindCommandWithMetadataFunction func) { + _networkTestEnv->onFindWithMetadataCommand(func); +} + void CatalogManagerReplSetTestFixture::setupShards(const std::vector& shards) { auto future = launchAsync([this] { shardRegistry()->reload(); }); @@ -198,7 +203,7 @@ void CatalogManagerReplSetTestFixture::setupShards(const std::vector& } void CatalogManagerReplSetTestFixture::expectGetShards(const std::vector& shards) { - onFindCommand([&shards](const RemoteCommandRequest& request) { + onFindCommand([this, &shards](const RemoteCommandRequest& request) { const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); ASSERT_EQ(nss.toString(), ShardType::ConfigNS); @@ -212,6 +217,8 @@ void CatalogManagerReplSetTestFixture::expectGetShards(const std::vectorgetSort(), BSONObj()); ASSERT_FALSE(query->getLimit().is_initialized()); + checkReadConcern(request.cmdObj, Timestamp(0, 0), 0); + vector shardsToReturn; std::transform(shards.begin(), diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h index 67d8d678637..651049bb5f7 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h @@ -96,6 +96,8 @@ protected: void onCommand(executor::NetworkTestEnv::OnCommandFunction func); void onCommandWithMetadata(executor::NetworkTestEnv::OnCommandWithMetadataFunction func); void onFindCommand(executor::NetworkTestEnv::OnFindCommandFunction func); + void onFindWithMetadataCommand( + executor::NetworkTestEnv::OnFindCommandWithMetadataFunction func); /** * Setup the shard registry to contain the given shards until the next reload. diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp index c20e98a4ef9..57d0694e5cb 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp @@ -53,12 +53,15 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNotNeeded) { auto future = launchAsync([this] { ASSERT_OK(catalogManager()->checkAndUpgrade(true)); }); - onFindCommand([](const RemoteCommandRequest& request) { + onFindCommand([this](const RemoteCommandRequest& request) { + ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata); + ASSERT_EQ(HostAndPort("config:123"), request.target); ASSERT_EQ("config", request.dbname); - ASSERT_EQ(BSON("find" - << "version"), - request.cmdObj); + + const auto& findCmd = request.cmdObj; + ASSERT_EQ("version", findCmd["find"].str()); + checkReadConcern(findCmd, Timestamp(0, 0), 0); BSONObj versionDoc(fromjson(R"({ _id: 1, diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 6eaa45c8c50..921849312ce 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -269,28 +269,43 @@ shared_ptr ShardRegistry::_findUsingLookUp(const ShardId& shardId) { return nullptr; } -StatusWith> ShardRegistry::exhaustiveFind(const HostAndPort& host, - const NamespaceString& nss, - const BSONObj& query, - const BSONObj& sort, - boost::optional limit) { +StatusWith ShardRegistry::exhaustiveFind( + const HostAndPort& host, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& sort, + boost::optional limit, + boost::optional readConcern, + const BSONObj& metadata) { // If for some reason the callback never gets invoked, we will return this status Status status = Status(ErrorCodes::InternalError, "Internal error running find command"); - vector results; + QueryResponse response; - auto fetcherCallback = [&status, &results](const Fetcher::QueryResponseStatus& dataStatus, - Fetcher::NextAction* nextAction) { + auto fetcherCallback = [&status, &response](const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction) { // Throw out any accumulated results on error if (!dataStatus.isOK()) { status = dataStatus.getStatus(); - results.clear(); + response.docs.clear(); return; } auto& data = dataStatus.getValue(); + if (auto replField = data.otherFields.metadata[rpc::kReplicationMetadataFieldName]) { + auto replParseStatus = rpc::ReplSetMetadata::readFromMetadata(replField.Obj()); + + if (!replParseStatus.isOK()) { + status = replParseStatus.getStatus(); + response.docs.clear(); + return; + } + + response.opTime = replParseStatus.getValue().getLastCommittedOptime(); + } + for (const BSONObj& doc : data.documents) { - results.push_back(std::move(doc.getOwned())); + response.docs.push_back(std::move(doc.getOwned())); } status = Status::OK(); @@ -304,7 +319,16 @@ StatusWith> ShardRegistry::exhaustiveFind(const HostAndPort boost::none, // skip limit); - QueryFetcher fetcher(_executor.get(), host, nss, lpq->asFindCommand(), fetcherCallback); + BSONObjBuilder findCmdBuilder; + lpq->asFindCommand(&findCmdBuilder); + + if (readConcern) { + BSONObjBuilder builder; + readConcern->appendInfo(&findCmdBuilder); + } + + QueryFetcher fetcher( + _executor.get(), host, nss, findCmdBuilder.done(), fetcherCallback, metadata); Status scheduleStatus = fetcher.schedule(); if (!scheduleStatus.isOK()) { @@ -317,7 +341,7 @@ StatusWith> ShardRegistry::exhaustiveFind(const HostAndPort return status; } - return results; + return response; } StatusWith ShardRegistry::runCommand(const HostAndPort& host, diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 634bcdf2340..b465b065e8e 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -36,6 +36,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/s/client/shard.h" #include "mongo/stdx/mutex.h" @@ -72,6 +73,11 @@ public: repl::OpTime opTime; }; + struct QueryResponse { + std::vector docs; + repl::OpTime opTime; + }; + /** * Instantiates a new shard registry. * @@ -150,11 +156,13 @@ public: * * Note: should never be used outside of CatalogManagerReplicaSet or DistLockCatalogImpl. */ - StatusWith> exhaustiveFind(const HostAndPort& host, - const NamespaceString& nss, - const BSONObj& query, - const BSONObj& sort, - boost::optional limit); + StatusWith exhaustiveFind(const HostAndPort& host, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& sort, + boost::optional limit, + boost::optional readConcern, + const BSONObj& metadata); /** * Runs a command against the specified host and returns the result. It is the responsibility -- cgit v1.2.1