summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2015-08-07 15:15:18 -0400
committerRandolph Tan <randolph@10gen.com>2015-08-11 12:08:52 -0400
commit248aea0dad9f9a3d46cb37547a546b0c9fe7e135 (patch)
tree1ff09899731c586f52ab1b20bbf044e7ecab5b20 /src/mongo
parent8117ecc138e9f87ade5a475a2695d1e39f474d8c (diff)
downloadmongo-248aea0dad9f9a3d46cb37547a546b0c9fe7e135.tar.gz
SERVER-19390 Make config server queries do read committed
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/executor/network_test_env.cpp44
-rw-r--r--src/mongo/executor/network_test_env.h7
-rw-r--r--src/mongo/s/catalog/SConscript1
-rw-r--r--src/mongo/s/catalog/catalog_manager.h4
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.cpp4
-rw-r--r--src/mongo/s/catalog/catalog_manager_mock.h4
-rw-r--r--src/mongo/s/catalog/dist_lock_catalog_impl.cpp34
-rw-r--r--src/mongo/s/catalog/dist_lock_catalog_impl.h11
-rw-r--r--src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp96
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp5
-rw-r--r--src/mongo/s/catalog/legacy/catalog_manager_legacy.h4
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp134
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.h10
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp23
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp8
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp16
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp251
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp9
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h2
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp11
-rw-r--r--src/mongo/s/client/shard_registry.cpp48
-rw-r--r--src/mongo/s/client/shard_registry.h18
22 files changed, 574 insertions, 170 deletions
diff --git a/src/mongo/executor/network_test_env.cpp b/src/mongo/executor/network_test_env.cpp
index 64241e72c12..21ad1ef1c6b 100644
--- a/src/mongo/executor/network_test_env.cpp
+++ b/src/mongo/executor/network_test_env.cpp
@@ -69,9 +69,22 @@ void NetworkTestEnv::onCommandWithMetadata(OnCommandWithMetadataFunction func) {
const NetworkInterfaceMock::NetworkOperationIterator noi = _mockNetwork->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
- _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), func(request));
- _mockNetwork->runReadyNetworkOperations();
+ const auto cmdResponseStatus = func(request);
+ const auto cmdResponse = cmdResponseStatus.getValue();
+
+ BSONObjBuilder result;
+
+ if (cmdResponseStatus.isOK()) {
+ result.appendElements(cmdResponse.data);
+ }
+
+ Command::appendCommandStatus(result, cmdResponseStatus.getStatus());
+
+ const RemoteCommandResponse response(result.obj(), cmdResponse.metadata, Milliseconds(1));
+
+ _mockNetwork->scheduleResponse(noi, _mockNetwork->now(), response);
+ _mockNetwork->runReadyNetworkOperations();
_mockNetwork->exitNetwork();
}
@@ -97,5 +110,32 @@ void NetworkTestEnv::onFindCommand(OnFindCommandFunction func) {
});
}
+void NetworkTestEnv::onFindWithMetadataCommand(OnFindCommandWithMetadataFunction func) {
+ onCommandWithMetadata(
+ [&func](const RemoteCommandRequest& request) -> StatusWith<RemoteCommandResponse> {
+ const auto& resultStatus = func(request);
+
+ if (!resultStatus.isOK()) {
+ return resultStatus.getStatus();
+ }
+
+ std::vector<BSONObj> result;
+ BSONObj metadata;
+ std::tie(result, metadata) = resultStatus.getValue();
+
+ BSONArrayBuilder arr;
+ for (const auto& obj : result) {
+ arr.append(obj);
+ }
+
+ const NamespaceString nss =
+ NamespaceString(request.dbname, request.cmdObj.firstElement().String());
+ BSONObjBuilder resultBuilder;
+ appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &resultBuilder);
+
+ return RemoteCommandResponse(resultBuilder.obj(), metadata, Milliseconds(1));
+ });
+}
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_test_env.h b/src/mongo/executor/network_test_env.h
index 6ef0f242597..717fbb642ea 100644
--- a/src/mongo/executor/network_test_env.h
+++ b/src/mongo/executor/network_test_env.h
@@ -28,6 +28,7 @@
#pragma once
+#include <tuple>
#include <type_traits>
#include <vector>
@@ -135,6 +136,11 @@ public:
using OnFindCommandFunction =
stdx::function<StatusWith<std::vector<BSONObj>>(const RemoteCommandRequest&)>;
+ // Function that accepts a find request and returns a tuple of resulting documents and response
+ // metadata.
+ using OnFindCommandWithMetadataFunction =
+ stdx::function<StatusWith<std::tuple<std::vector<BSONObj>, BSONObj>>(
+ const RemoteCommandRequest&)>;
/**
* Create a new environment based on the given network.
@@ -149,6 +155,7 @@ public:
void onCommand(OnCommandFunction func);
void onCommandWithMetadata(OnCommandWithMetadataFunction func);
void onFindCommand(OnFindCommandFunction func);
+ void onFindWithMetadataCommand(OnFindCommandWithMetadataFunction func);
private:
// Task executor used for running asynchronous operations.
diff --git a/src/mongo/s/catalog/SConscript b/src/mongo/s/catalog/SConscript
index ab3d850e2ad..a0ed81168d8 100644
--- a/src/mongo/s/catalog/SConscript
+++ b/src/mongo/s/catalog/SConscript
@@ -133,6 +133,7 @@ env.Library(
'$BUILD_DIR/mongo/client/remote_command_targeter',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/query/command_request_response',
+ '$BUILD_DIR/mongo/db/repl/read_concern_args',
'$BUILD_DIR/mongo/rpc/command_status',
'$BUILD_DIR/mongo/s/client/sharding_client',
'$BUILD_DIR/mongo/util/net/hostandport',
diff --git a/src/mongo/s/catalog/catalog_manager.h b/src/mongo/s/catalog/catalog_manager.h
index 366e6c63644..6d5bdf07314 100644
--- a/src/mongo/s/catalog/catalog_manager.h
+++ b/src/mongo/s/catalog/catalog_manager.h
@@ -430,12 +430,12 @@ private:
* NamespaceExists if it exists with the same casing
* DatabaseDifferCase if it exists under different casing.
*/
- virtual Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) const = 0;
+ virtual Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) = 0;
/**
* Generates a unique name to be given to a newly added shard.
*/
- virtual StatusWith<std::string> _generateNewShardName() const = 0;
+ virtual StatusWith<std::string> _generateNewShardName() = 0;
};
} // namespace mongo
diff --git a/src/mongo/s/catalog/catalog_manager_mock.cpp b/src/mongo/s/catalog/catalog_manager_mock.cpp
index ab2cf24bfc9..5456cba8b3f 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/catalog_manager_mock.cpp
@@ -168,11 +168,11 @@ DistLockManager* CatalogManagerMock::getDistLockManager() const {
return _mockDistLockMgr.get();
}
-Status CatalogManagerMock::_checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) const {
+Status CatalogManagerMock::_checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) {
return Status::OK();
}
-StatusWith<std::string> CatalogManagerMock::_generateNewShardName() const {
+StatusWith<std::string> CatalogManagerMock::_generateNewShardName() {
return {ErrorCodes::InternalError, "Method not implemented"};
}
diff --git a/src/mongo/s/catalog/catalog_manager_mock.h b/src/mongo/s/catalog/catalog_manager_mock.h
index 83bcab351e2..b70d6855979 100644
--- a/src/mongo/s/catalog/catalog_manager_mock.h
+++ b/src/mongo/s/catalog/catalog_manager_mock.h
@@ -122,9 +122,9 @@ public:
Status checkAndUpgrade(bool checkOnly) override;
private:
- Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) const override;
+ Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) override;
- StatusWith<std::string> _generateNewShardName() const override;
+ StatusWith<std::string> _generateNewShardName() override;
std::unique_ptr<DistLockManagerMock> _mockDistLockMgr;
};
diff --git a/src/mongo/s/catalog/dist_lock_catalog_impl.cpp b/src/mongo/s/catalog/dist_lock_catalog_impl.cpp
index 6f02a4aba32..fbfe5e0d34e 100644
--- a/src/mongo/s/catalog/dist_lock_catalog_impl.cpp
+++ b/src/mongo/s/catalog/dist_lock_catalog_impl.cpp
@@ -39,7 +39,9 @@
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/query/find_and_modify_request.h"
+#include "mongo/db/repl/read_concern_args.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/metadata.h"
#include "mongo/rpc/metadata/sharding_metadata.h"
#include "mongo/s/catalog/type_lockpings.h"
#include "mongo/s/catalog/type_locks.h"
@@ -50,12 +52,14 @@
namespace mongo {
using std::string;
+using std::vector;
namespace {
const char kCmdResponseWriteConcernField[] = "writeConcernError";
const char kFindAndModifyResponseResultDocField[] = "value";
const char kLocalTimeField[] = "localTime";
+const BSONObj kReplMetadata = BSON(rpc::kReplicationMetadataFieldName << 1);
const ReadPreferenceSetting kReadPref(ReadPreference::PrimaryOnly, TagSet());
/**
@@ -163,11 +167,11 @@ StatusWith<LockpingsType> DistLockCatalogImpl::getPing(StringData processID) {
return targetStatus.getStatus();
}
- auto findResult = _client->exhaustiveFind(targetStatus.getValue(),
- _lockPingNS,
- BSON(LockpingsType::process() << processID),
- BSONObj(),
- 1);
+ auto findResult = _findOnConfig(targetStatus.getValue(),
+ _lockPingNS,
+ BSON(LockpingsType::process() << processID),
+ BSONObj(),
+ 1);
if (!findResult.isOK()) {
return findResult.getStatus();
@@ -382,7 +386,7 @@ StatusWith<LocksType> DistLockCatalogImpl::getLockByTS(const OID& lockSessionID)
return targetStatus.getStatus();
}
- auto findResult = _client->exhaustiveFind(
+ auto findResult = _findOnConfig(
targetStatus.getValue(), _locksNS, BSON(LocksType::lockID(lockSessionID)), BSONObj(), 1);
if (!findResult.isOK()) {
@@ -414,7 +418,7 @@ StatusWith<LocksType> DistLockCatalogImpl::getLockByName(StringData name) {
return targetStatus.getStatus();
}
- auto findResult = _client->exhaustiveFind(
+ auto findResult = _findOnConfig(
targetStatus.getValue(), _locksNS, BSON(LocksType::name() << name), BSONObj(), 1);
if (!findResult.isOK()) {
@@ -457,4 +461,20 @@ Status DistLockCatalogImpl::stopPing(StringData processId) {
return findAndModifyStatus.getStatus();
}
+StatusWith<vector<BSONObj>> DistLockCatalogImpl::_findOnConfig(const HostAndPort& host,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& sort,
+ boost::optional<long long> limit) {
+ repl::ReadConcernArgs readConcern(boost::none, repl::ReadConcernLevel::kMajorityReadConcern);
+ auto result =
+ _client->exhaustiveFind(host, nss, query, sort, limit, readConcern, kReplMetadata);
+
+ if (!result.isOK()) {
+ return result.getStatus();
+ }
+
+ return result.getValue().docs;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/dist_lock_catalog_impl.h b/src/mongo/s/catalog/dist_lock_catalog_impl.h
index 48343790f06..41764dd7089 100644
--- a/src/mongo/s/catalog/dist_lock_catalog_impl.h
+++ b/src/mongo/s/catalog/dist_lock_catalog_impl.h
@@ -28,6 +28,9 @@
#pragma once
+#include <boost/optional.hpp>
+#include <vector>
+
#include "mongo/bson/oid.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
@@ -37,6 +40,8 @@
namespace mongo {
+struct HostAndPort;
+class NamespaceString;
class RemoteCommandTargeter;
class ShardRegistry;
@@ -78,6 +83,12 @@ public:
private:
RemoteCommandTargeter* _targeter();
+ StatusWith<std::vector<BSONObj>> _findOnConfig(const HostAndPort& host,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& sort,
+ boost::optional<long long> limit);
+
ShardRegistry* _client;
// These are not static to avoid initialization order fiasco.
diff --git a/src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp b/src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp
index be1de29ea4f..4ffb1782fde 100644
--- a/src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp
+++ b/src/mongo/s/catalog/dist_lock_catalog_impl_test.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/query/find_and_modify_request.h"
+#include "mongo/db/repl/read_concern_args.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/network_test_env.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
@@ -59,6 +60,7 @@ using executor::NetworkInterfaceMock;
using executor::NetworkTestEnv;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
+using repl::ReadConcernArgs;
namespace {
@@ -133,6 +135,12 @@ private:
std::unique_ptr<DistLockCatalogImpl> _distLockCatalog;
};
+void checkReadConcern(const BSONObj& findCmd) {
+ auto readConcernElem = findCmd[ReadConcernArgs::kReadConcernFieldName];
+ ASSERT_EQ(Object, readConcernElem.type());
+ ASSERT_EQ(BSON(ReadConcernArgs::kLevelFieldName << "majority"), readConcernElem.Obj());
+}
+
TEST_F(DistLockCatalogFixture, BasicPing) {
auto future = launchAsync([this] {
Date_t ping(dateFromISOString("2014-03-11T09:17:18.098Z").getValue());
@@ -1291,28 +1299,29 @@ TEST_F(DistLockCatalogFixture, BasicGetPing) {
ASSERT_EQUALS(ping, pingDoc.getPing());
});
- onFindCommand([](const RemoteCommandRequest& request) -> StatusWith<vector<BSONObj>> {
- ASSERT_EQUALS(dummyHost, request.target);
- ASSERT_EQUALS("config", request.dbname);
-
- BSONObj expectedCmd(fromjson(R"({
- find: "lockpings",
- filter: { _id: "test" },
- limit: 1
- })"));
+ onFindCommand(
+ [](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(dummyHost, request.target);
+ ASSERT_EQUALS("config", request.dbname);
- ASSERT_EQUALS(expectedCmd, request.cmdObj);
+ const auto& findCmd = request.cmdObj;
+ ASSERT_EQUALS("lockpings", findCmd["find"].str());
+ ASSERT_EQUALS(BSON("_id"
+ << "test"),
+ findCmd["filter"].Obj());
+ ASSERT_EQUALS(1, findCmd["limit"].numberLong());
+ checkReadConcern(findCmd);
- BSONObj pingDoc(fromjson(R"({
- _id: "test",
- ping: { $date: "2015-05-26T13:06:27.293Z" }
- })"));
+ BSONObj pingDoc(fromjson(R"({
+ _id: "test",
+ ping: { $date: "2015-05-26T13:06:27.293Z" }
+ })"));
- std::vector<BSONObj> result;
- result.push_back(pingDoc);
+ std::vector<BSONObj> result;
+ result.push_back(pingDoc);
- return result;
- });
+ return result;
+ });
future.timed_get(kFutureTimeout);
}
@@ -1382,13 +1391,11 @@ TEST_F(DistLockCatalogFixture, BasicGetLockByTS) {
ASSERT_EQUALS(dummyHost, request.target);
ASSERT_EQUALS("config", request.dbname);
- BSONObj expectedCmd(fromjson(R"({
- find: "locks",
- filter: { ts: ObjectId("555f99712c99a78c5b083358") },
- limit: 1
- })"));
-
- ASSERT_EQUALS(expectedCmd, request.cmdObj);
+ const auto& findCmd = request.cmdObj;
+ ASSERT_EQUALS("locks", findCmd["find"].str());
+ ASSERT_EQUALS(BSON("ts" << OID("555f99712c99a78c5b083358")), findCmd["filter"].Obj());
+ ASSERT_EQUALS(1, findCmd["limit"].numberLong());
+ checkReadConcern(findCmd);
BSONObj lockDoc(fromjson(R"({
_id: "test",
@@ -1464,28 +1471,29 @@ TEST_F(DistLockCatalogFixture, BasicGetLockByName) {
ASSERT_EQUALS(ts, lockDoc.getLockID());
});
- onFindCommand([](const RemoteCommandRequest& request) -> StatusWith<vector<BSONObj>> {
- ASSERT_EQUALS(dummyHost, request.target);
- ASSERT_EQUALS("config", request.dbname);
+ onFindCommand(
+ [](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(dummyHost, request.target);
+ ASSERT_EQUALS("config", request.dbname);
- BSONObj expectedCmd(fromjson(R"({
- find: "locks",
- filter: { _id: "abc" },
- limit: 1
- })"));
-
- ASSERT_EQUALS(expectedCmd, request.cmdObj);
+ const auto& findCmd = request.cmdObj;
+ ASSERT_EQUALS("locks", findCmd["find"].str());
+ ASSERT_EQUALS(BSON("_id"
+ << "abc"),
+ findCmd["filter"].Obj());
+ ASSERT_EQUALS(1, findCmd["limit"].numberLong());
+ checkReadConcern(findCmd);
- BSONObj lockDoc(fromjson(R"({
- _id: "abc",
- state: 2,
- ts: ObjectId("555f99712c99a78c5b083358")
- })"));
+ BSONObj lockDoc(fromjson(R"({
+ _id: "abc",
+ state: 2,
+ ts: ObjectId("555f99712c99a78c5b083358")
+ })"));
- std::vector<BSONObj> result;
- result.push_back(lockDoc);
- return result;
- });
+ std::vector<BSONObj> result;
+ result.push_back(lockDoc);
+ return result;
+ });
future.timed_get(kFutureTimeout);
}
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
index 24b710ec14b..93b513aeace 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
@@ -972,8 +972,7 @@ void CatalogManagerLegacy::writeConfigServerDirect(const BatchedCommandRequest&
exec.executeBatch(request, response);
}
-Status CatalogManagerLegacy::_checkDbDoesNotExist(const std::string& dbName,
- DatabaseType* db) const {
+Status CatalogManagerLegacy::_checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) {
ScopedDbConnection conn(_configServerConnectionString, 30);
BSONObjBuilder b;
@@ -1008,7 +1007,7 @@ Status CatalogManagerLegacy::_checkDbDoesNotExist(const std::string& dbName,
return Status::OK();
}
-StatusWith<string> CatalogManagerLegacy::_generateNewShardName() const {
+StatusWith<string> CatalogManagerLegacy::_generateNewShardName() {
BSONObj o;
{
ScopedDbConnection conn(_configServerConnectionString, 30);
diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
index 068b8555d91..24354cc527f 100644
--- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
+++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.h
@@ -125,9 +125,9 @@ public:
Status checkAndUpgrade(bool checkOnly) override;
private:
- Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) const override;
+ Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) override;
- StatusWith<std::string> _generateNewShardName() const override;
+ StatusWith<std::string> _generateNewShardName() override;
/**
* Starts the thread that periodically checks data consistency amongst the config servers.
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index 1590821c415..7a0db0b085f 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -365,11 +365,11 @@ StatusWith<DatabaseType> CatalogManagerReplicaSet::getDatabase(const std::string
return readHost.getStatus();
}
- auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(),
- NamespaceString(DatabaseType::ConfigNS),
- BSON(DatabaseType::name(dbName)),
- BSONObj(),
- 1);
+ auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(),
+ NamespaceString(DatabaseType::ConfigNS),
+ BSON(DatabaseType::name(dbName)),
+ BSONObj(),
+ 1);
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -392,12 +392,11 @@ StatusWith<CollectionType> CatalogManagerReplicaSet::getCollection(const std::st
return readHostStatus.getStatus();
}
- auto statusFind =
- grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(),
- NamespaceString(CollectionType::ConfigNS),
- BSON(CollectionType::fullNs(collNs)),
- BSONObj(),
- 1);
+ auto statusFind = _exhaustiveFindOnConfig(readHostStatus.getValue(),
+ NamespaceString(CollectionType::ConfigNS),
+ BSON(CollectionType::fullNs(collNs)),
+ BSONObj(),
+ 1);
if (!statusFind.isOK()) {
return statusFind.getStatus();
}
@@ -428,12 +427,11 @@ Status CatalogManagerReplicaSet::getCollections(const std::string* dbName,
return readHost.getStatus();
}
- auto findStatus =
- grid.shardRegistry()->exhaustiveFind(readHost.getValue(),
- NamespaceString(CollectionType::ConfigNS),
- b.obj(),
- BSONObj(),
- boost::none); // no limit
+ auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(),
+ NamespaceString(CollectionType::ConfigNS),
+ b.obj(),
+ BSONObj(),
+ boost::none); // no limit
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -531,11 +529,11 @@ StatusWith<SettingsType> CatalogManagerReplicaSet::getGlobalSettings(const strin
return readHost.getStatus();
}
- auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(),
- NamespaceString(SettingsType::ConfigNS),
- BSON(SettingsType::key(key)),
- BSONObj(),
- 1);
+ auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(),
+ NamespaceString(SettingsType::ConfigNS),
+ BSON(SettingsType::key(key)),
+ BSONObj(),
+ 1);
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -572,11 +570,11 @@ Status CatalogManagerReplicaSet::getDatabasesForShard(const string& shardName,
return readHost.getStatus();
}
- auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(),
- NamespaceString(DatabaseType::ConfigNS),
- BSON(DatabaseType::primary(shardName)),
- BSONObj(),
- boost::none); // no limit
+ auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(),
+ NamespaceString(DatabaseType::ConfigNS),
+ BSON(DatabaseType::primary(shardName)),
+ BSONObj(),
+ boost::none); // no limit
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -609,7 +607,7 @@ Status CatalogManagerReplicaSet::getChunks(const BSONObj& query,
// Convert boost::optional<int> to boost::optional<long long>.
auto longLimit = limit ? boost::optional<long long>(*limit) : boost::none;
- auto findStatus = grid.shardRegistry()->exhaustiveFind(
+ auto findStatus = _exhaustiveFindOnConfig(
readHostStatus.getValue(), NamespaceString(ChunkType::ConfigNS), query, sort, longLimit);
if (!findStatus.isOK()) {
return findStatus.getStatus();
@@ -641,11 +639,11 @@ Status CatalogManagerReplicaSet::getTagsForCollection(const std::string& collect
return readHostStatus.getStatus();
}
- auto findStatus = grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(),
- NamespaceString(TagsType::ConfigNS),
- BSON(TagsType::ns(collectionNs)),
- BSON(TagsType::min() << 1),
- boost::none); // no limit
+ auto findStatus = _exhaustiveFindOnConfig(readHostStatus.getValue(),
+ NamespaceString(TagsType::ConfigNS),
+ BSON(TagsType::ns(collectionNs)),
+ BSON(TagsType::min() << 1),
+ boost::none); // no limit
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -675,7 +673,7 @@ StatusWith<string> CatalogManagerReplicaSet::getTagForChunk(const std::string& c
BSONObj query =
BSON(TagsType::ns(collectionNs) << TagsType::min() << BSON("$lte" << chunk.getMin())
<< TagsType::max() << BSON("$gte" << chunk.getMax()));
- auto findStatus = grid.shardRegistry()->exhaustiveFind(
+ auto findStatus = _exhaustiveFindOnConfig(
readHostStatus.getValue(), NamespaceString(TagsType::ConfigNS), query, BSONObj(), 1);
if (!findStatus.isOK()) {
return findStatus.getStatus();
@@ -705,11 +703,11 @@ Status CatalogManagerReplicaSet::getAllShards(vector<ShardType>* shards) {
return readHost.getStatus();
}
- auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(),
- NamespaceString(ShardType::ConfigNS),
- BSONObj(), // no query filter
- BSONObj(), // no sort
- boost::none); // no limit
+ auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(),
+ NamespaceString(ShardType::ConfigNS),
+ BSONObj(), // no query filter
+ BSONObj(), // no sort
+ boost::none); // no limit
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -807,8 +805,7 @@ void CatalogManagerReplicaSet::writeConfigServerDirect(const BatchedCommandReque
}
}
-Status CatalogManagerReplicaSet::_checkDbDoesNotExist(const string& dbName,
- DatabaseType* db) const {
+Status CatalogManagerReplicaSet::_checkDbDoesNotExist(const string& dbName, DatabaseType* db) {
BSONObjBuilder queryBuilder;
queryBuilder.appendRegex(
DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbName) + "$", "i");
@@ -819,11 +816,11 @@ Status CatalogManagerReplicaSet::_checkDbDoesNotExist(const string& dbName,
return readHost.getStatus();
}
- auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(),
- NamespaceString(DatabaseType::ConfigNS),
- queryBuilder.obj(),
- BSONObj(),
- 1);
+ auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(),
+ NamespaceString(DatabaseType::ConfigNS),
+ queryBuilder.obj(),
+ BSONObj(),
+ 1);
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -854,7 +851,7 @@ Status CatalogManagerReplicaSet::_checkDbDoesNotExist(const string& dbName,
<< " have: " << actualDbName << " want to add: " << dbName);
}
-StatusWith<std::string> CatalogManagerReplicaSet::_generateNewShardName() const {
+StatusWith<std::string> CatalogManagerReplicaSet::_generateNewShardName() {
const auto configShard = grid.shardRegistry()->getShard("config");
const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector);
if (!readHost.isOK()) {
@@ -864,11 +861,11 @@ StatusWith<std::string> CatalogManagerReplicaSet::_generateNewShardName() const
BSONObjBuilder shardNameRegex;
shardNameRegex.appendRegex(ShardType::name(), "^shard");
- auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(),
- NamespaceString(ShardType::ConfigNS),
- shardNameRegex.obj(),
- BSON(ShardType::name() << -1),
- 1);
+ auto findStatus = _exhaustiveFindOnConfig(readHost.getValue(),
+ NamespaceString(ShardType::ConfigNS),
+ shardNameRegex.obj(),
+ BSON(ShardType::name() << -1),
+ 1);
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -984,11 +981,11 @@ StatusWith<VersionType> CatalogManagerReplicaSet::_getConfigVersion() {
}
auto readHost = readHostStatus.getValue();
- auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost,
- NamespaceString(VersionType::ConfigNS),
- BSONObj(),
- BSONObj(),
- boost::none /* no limit */);
+ auto findStatus = _exhaustiveFindOnConfig(readHost,
+ NamespaceString(VersionType::ConfigNS),
+ BSONObj(),
+ BSONObj(),
+ boost::none /* no limit */);
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -1074,6 +1071,29 @@ StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandOnConfigWithNotMasterRe
return response.response;
}
+StatusWith<std::vector<BSONObj>> CatalogManagerReplicaSet::_exhaustiveFindOnConfig(
+ const HostAndPort& host,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& sort,
+ boost::optional<long long> limit) {
+ repl::ReadConcernArgs readConcern(_getConfigOpTime(),
+ repl::ReadConcernLevel::kMajorityReadConcern);
+
+ auto result = grid.shardRegistry()->exhaustiveFind(
+ host, nss, query, sort, limit, readConcern, kReplMetadata);
+
+ if (!result.isOK()) {
+ return result.getStatus();
+ }
+
+ auto response = std::move(result.getValue());
+
+ _updateLastSeenConfigOpTime(response.opTime);
+
+ return std::move(response.docs);
+}
+
repl::OpTime CatalogManagerReplicaSet::_getConfigOpTime() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return _configOpTime;
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
index 18c2bec94b1..f2c53a924f7 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
@@ -128,9 +128,9 @@ public:
Status checkAndUpgrade(bool checkOnly) override;
private:
- Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) const override;
+ Status _checkDbDoesNotExist(const std::string& dbName, DatabaseType* db) override;
- StatusWith<std::string> _generateNewShardName() const override;
+ StatusWith<std::string> _generateNewShardName() override;
bool _runReadCommand(const std::string& dbname,
const BSONObj& cmdObj,
@@ -152,6 +152,12 @@ private:
StatusWith<BSONObj> _runCommandOnConfigWithNotMasterRetries(const std::string& dbName,
BSONObj cmdObj);
+ StatusWith<std::vector<BSONObj>> _exhaustiveFindOnConfig(const HostAndPort& host,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& sort,
+ boost::optional<long long> limit);
+
/**
* Appends a read committed read concern to the request object.
*/
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
index 14c5b87da11..bd7b874ee1b 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
@@ -214,6 +214,7 @@ TEST_F(AddShardTest, AddShardStandalone) {
// in the previous call, in the config server metadata
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(request.target, configHost);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.toString(), DatabaseType::ConfigNS);
@@ -223,17 +224,23 @@ TEST_F(AddShardTest, AddShardStandalone) {
ASSERT_EQ(query->ns(), DatabaseType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSON(DatabaseType::name("TestDB1")));
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{};
});
- onFindCommand([](const RemoteCommandRequest& request) {
+ onFindCommand([this](const RemoteCommandRequest& request) {
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.toString(), DatabaseType::ConfigNS);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), DatabaseType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSON(DatabaseType::name("TestDB2")));
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{};
});
@@ -321,6 +328,7 @@ TEST_F(AddShardTest, AddShardStandaloneGenerateName) {
// in the previous call, in the config server metadata
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(request.target, configHost);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.toString(), DatabaseType::ConfigNS);
@@ -330,10 +338,14 @@ TEST_F(AddShardTest, AddShardStandaloneGenerateName) {
ASSERT_EQ(query->ns(), DatabaseType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSON(DatabaseType::name("TestDB1")));
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{};
});
- onFindCommand([](const RemoteCommandRequest& request) {
+ onFindCommand([this](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.toString(), DatabaseType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
@@ -341,6 +353,8 @@ TEST_F(AddShardTest, AddShardStandaloneGenerateName) {
ASSERT_EQ(query->ns(), DatabaseType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSON(DatabaseType::name("TestDB2")));
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{};
});
@@ -350,13 +364,16 @@ TEST_F(AddShardTest, AddShardStandaloneGenerateName) {
existingShard.setMaxSizeMB(100);
// New name is being generated for the new shard
- onFindCommand([&existingShard](const RemoteCommandRequest& request) {
+ onFindCommand([this, &existingShard](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.toString(), ShardType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), ShardType::ConfigNS);
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{existingShard.toBSON()};
});
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp
index dc12eeb5a7d..74245c53fa0 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_remove_shard_test.cpp
@@ -171,6 +171,8 @@ TEST_F(RemoveShardTest, RemoveShardStartDraining) {
// Respond to request to reload information about existing shards
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
@@ -179,6 +181,8 @@ TEST_F(RemoveShardTest, RemoveShardStartDraining) {
ASSERT_EQ(BSONObj(), query->getSort());
ASSERT_FALSE(query->getLimit().is_initialized());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
ShardType remainingShard;
remainingShard.setHost("host1");
remainingShard.setName("shard0");
@@ -343,6 +347,8 @@ TEST_F(RemoveShardTest, RemoveShardCompletion) {
// Respond to request to reload information about existing shards
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
@@ -351,6 +357,8 @@ TEST_F(RemoveShardTest, RemoveShardCompletion) {
ASSERT_EQ(BSONObj(), query->getSort());
ASSERT_FALSE(query->getLimit().is_initialized());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
ShardType remainingShard;
remainingShard.setHost("host1");
remainingShard.setName("shard0");
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp
index 3c0a59a68ec..98d144fc961 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp
@@ -80,6 +80,8 @@ public:
void expectGetDatabase(const DatabaseType& expectedDb) {
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(DatabaseType::ConfigNS, nss.ns());
@@ -91,6 +93,8 @@ public:
ASSERT_EQ(BSONObj(), query->getSort());
ASSERT_EQ(1, query->getLimit().get());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{expectedDb.toBSON()};
});
}
@@ -146,6 +150,8 @@ public:
void expectReloadChunks(const std::string& ns, const vector<ChunkType>& chunks) {
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), ChunkType::ConfigNS);
@@ -160,6 +166,8 @@ public:
ASSERT_EQ(expectedSort, query->getSort());
ASSERT_FALSE(query->getLimit().is_initialized());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
vector<BSONObj> chunksToReturn;
std::transform(chunks.begin(),
@@ -202,6 +210,8 @@ public:
void expectReloadCollection(const CollectionType& collection) {
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
@@ -217,6 +227,8 @@ public:
}
ASSERT_EQ(BSONObj(), query->getSort());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{collection.toBSON()};
});
}
@@ -224,6 +236,8 @@ public:
void expectLoadNewestChunk(const string& ns, const ChunkType& chunk) {
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), ChunkType::ConfigNS);
@@ -237,6 +251,8 @@ public:
ASSERT_EQ(expectedSort, query->getSort());
ASSERT_EQ(1, query->getLimit().get());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{chunk.toBSON()};
});
}
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp
index 8d295a29e6c..9473bd7e26c 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test.cpp
@@ -86,7 +86,9 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionExisting) {
return assertGet(catalogManager()->getCollection(expectedColl.getNs().ns()));
});
- onFindCommand([&expectedColl](const RemoteCommandRequest& request) {
+ onFindCommand([this, &expectedColl](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
@@ -98,6 +100,8 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionExisting) {
ASSERT_EQ(query->getSort(), BSONObj());
ASSERT_EQ(query->getLimit().get(), 1);
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{expectedColl.toBSON()};
});
@@ -132,10 +136,12 @@ TEST_F(CatalogManagerReplSetTest, GetDatabaseExisting) {
return assertGet(catalogManager()->getDatabase(expectedDb.getName()));
});
- onFindCommand([&expectedDb](const RemoteCommandRequest& request) {
+ onFindCommand([this, &expectedDb](const RemoteCommandRequest& request) {
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), DatabaseType::ConfigNS);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), DatabaseType::ConfigNS);
@@ -143,6 +149,8 @@ TEST_F(CatalogManagerReplSetTest, GetDatabaseExisting) {
ASSERT_EQ(query->getSort(), BSONObj());
ASSERT_EQ(query->getLimit().get(), 1);
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{expectedDb.toBSON()};
});
@@ -297,7 +305,9 @@ TEST_F(CatalogManagerReplSetTest, GetAllShardsValid) {
return shards;
});
- onFindCommand([&s1, &s2, &s3](const RemoteCommandRequest& request) {
+ onFindCommand([this, &s1, &s2, &s3](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), ShardType::ConfigNS);
@@ -308,6 +318,8 @@ TEST_F(CatalogManagerReplSetTest, GetAllShardsValid) {
ASSERT_EQ(query->getSort(), BSONObj());
ASSERT_FALSE(query->getLimit().is_initialized());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{s1.toBSON(), s2.toBSON(), s3.toBSON()};
});
@@ -383,7 +395,9 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSWithSortAndLimit) {
return chunks;
});
- onFindCommand([&chunksQuery, chunkA, chunkB](const RemoteCommandRequest& request) {
+ onFindCommand([this, &chunksQuery, chunkA, chunkB](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), ChunkType::ConfigNS);
@@ -394,6 +408,8 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSWithSortAndLimit) {
ASSERT_EQ(query->getSort(), BSON(ChunkType::version() << -1));
ASSERT_EQ(query->getLimit().get(), 1);
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{chunkA.toBSON(), chunkB.toBSON()};
});
@@ -421,7 +437,9 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSNoSortNoLimit) {
return chunks;
});
- onFindCommand([&chunksQuery](const RemoteCommandRequest& request) {
+ onFindCommand([this, &chunksQuery](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), ChunkType::ConfigNS);
@@ -432,6 +450,8 @@ TEST_F(CatalogManagerReplSetTest, GetChunksForNSNoSortNoLimit) {
ASSERT_EQ(query->getSort(), BSONObj());
ASSERT_FALSE(query->getLimit().is_initialized());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{};
});
@@ -697,7 +717,9 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsBalancerDoc) {
return assertGet(catalogManager()->getGlobalSettings(SettingsType::BalancerDocKey));
});
- onFindCommand([st1](const RemoteCommandRequest& request) {
+ onFindCommand([this, st1](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), SettingsType::ConfigNS);
@@ -706,6 +728,8 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsBalancerDoc) {
ASSERT_EQ(query->ns(), SettingsType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSON(SettingsType::key(SettingsType::BalancerDocKey)));
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{st1.toBSON()};
});
@@ -725,7 +749,9 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsChunkSizeDoc) {
return assertGet(catalogManager()->getGlobalSettings(SettingsType::ChunkSizeDocKey));
});
- onFindCommand([st1](const RemoteCommandRequest& request) {
+ onFindCommand([this, st1](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), SettingsType::ConfigNS);
@@ -734,6 +760,8 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsChunkSizeDoc) {
ASSERT_EQ(query->ns(), SettingsType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSON(SettingsType::key(SettingsType::ChunkSizeDocKey)));
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{st1.toBSON()};
});
@@ -750,7 +778,9 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsInvalidDoc) {
ASSERT_EQ(balSettings.getStatus(), ErrorCodes::FailedToParse);
});
- onFindCommand([](const RemoteCommandRequest& request) {
+ onFindCommand([this](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), SettingsType::ConfigNS);
@@ -759,6 +789,8 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsInvalidDoc) {
ASSERT_EQ(query->ns(), SettingsType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSON(SettingsType::key("invalidKey")));
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{
BSON("invalidKey"
<< "some value") // invalid settings document -- key is required
@@ -778,7 +810,9 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsNonExistent) {
ASSERT_EQ(chunkSizeSettings.getStatus(), ErrorCodes::NoMatchingDocument);
});
- onFindCommand([](const RemoteCommandRequest& request) {
+ onFindCommand([this](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), SettingsType::ConfigNS);
@@ -787,6 +821,8 @@ TEST_F(CatalogManagerReplSetTest, GetGlobalSettingsNonExistent) {
ASSERT_EQ(query->ns(), SettingsType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSON(SettingsType::key(SettingsType::ChunkSizeDocKey)));
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{};
});
@@ -829,7 +865,9 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsNoDb) {
return collections;
});
- onFindCommand([coll1, coll2, coll3](const RemoteCommandRequest& request) {
+ onFindCommand([this, coll1, coll2, coll3](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
@@ -839,6 +877,8 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsNoDb) {
ASSERT_EQ(query->getFilter(), BSONObj());
ASSERT_EQ(query->getSort(), BSONObj());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{coll1.toBSON(), coll2.toBSON(), coll3.toBSON()};
});
@@ -876,7 +916,9 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsWithDb) {
return collections;
});
- onFindCommand([coll1, coll2](const RemoteCommandRequest& request) {
+ onFindCommand([this, coll1, coll2](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
@@ -889,6 +931,8 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsWithDb) {
ASSERT_EQ(query->getFilter(), b.obj());
}
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{coll1.toBSON(), coll2.toBSON()};
});
@@ -919,10 +963,12 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsInvalidCollectionType) {
validColl.setKeyPattern(KeyPattern{BSON("_id" << 1)});
ASSERT_OK(validColl.validate());
- onFindCommand([validColl](const RemoteCommandRequest& request) {
+ onFindCommand([this, validColl](const RemoteCommandRequest& request) {
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), CollectionType::ConfigNS);
@@ -932,6 +978,8 @@ TEST_F(CatalogManagerReplSetTest, GetCollectionsInvalidCollectionType) {
ASSERT_EQ(query->getFilter(), b.obj());
}
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{
validColl.toBSON(),
BSONObj() // empty document is invalid
@@ -960,7 +1008,9 @@ TEST_F(CatalogManagerReplSetTest, GetDatabasesForShardValid) {
return dbs;
});
- onFindCommand([dbt1, dbt2](const RemoteCommandRequest& request) {
+ onFindCommand([this, dbt1, dbt2](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), DatabaseType::ConfigNS);
@@ -970,6 +1020,8 @@ TEST_F(CatalogManagerReplSetTest, GetDatabasesForShardValid) {
ASSERT_EQ(query->getFilter(), BSON(DatabaseType::primary(dbt1.getPrimary())));
ASSERT_EQ(query->getSort(), BSONObj());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{dbt1.toBSON(), dbt2.toBSON()};
});
@@ -1028,7 +1080,9 @@ TEST_F(CatalogManagerReplSetTest, GetTagsForCollection) {
return tags;
});
- onFindCommand([tagA, tagB](const RemoteCommandRequest& request) {
+ onFindCommand([this, tagA, tagB](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), TagsType::ConfigNS);
@@ -1038,6 +1092,8 @@ TEST_F(CatalogManagerReplSetTest, GetTagsForCollection) {
ASSERT_EQ(query->getFilter(), BSON(TagsType::ns("TestDB.TestColl")));
ASSERT_EQ(query->getSort(), BSON(TagsType::min() << 1));
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{tagA.toBSON(), tagB.toBSON()};
});
@@ -1108,7 +1164,9 @@ TEST_F(CatalogManagerReplSetTest, GetTagForChunkOneTagFound) {
auto future = launchAsync(
[this, chunk] { return assertGet(catalogManager()->getTagForChunk("test.coll", chunk)); });
- onFindCommand([chunk](const RemoteCommandRequest& request) {
+ onFindCommand([this, chunk](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), TagsType::ConfigNS);
@@ -1120,6 +1178,8 @@ TEST_F(CatalogManagerReplSetTest, GetTagForChunkOneTagFound) {
<< TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max()
<< BSON("$gte" << chunk.getMax())));
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
TagsType tt;
tt.setNS("test.coll");
tt.setTag("tag");
@@ -1148,7 +1208,9 @@ TEST_F(CatalogManagerReplSetTest, GetTagForChunkNoTagFound) {
auto future = launchAsync(
[this, chunk] { return assertGet(catalogManager()->getTagForChunk("test.coll", chunk)); });
- onFindCommand([chunk](const RemoteCommandRequest& request) {
+ onFindCommand([this, chunk](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), TagsType::ConfigNS);
@@ -1160,6 +1222,8 @@ TEST_F(CatalogManagerReplSetTest, GetTagForChunkNoTagFound) {
<< TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max()
<< BSON("$gte" << chunk.getMax())));
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{};
});
@@ -1185,7 +1249,9 @@ TEST_F(CatalogManagerReplSetTest, GetTagForChunkInvalidTagDoc) {
ASSERT_EQ(ErrorCodes::FailedToParse, tagResult.getStatus());
});
- onFindCommand([chunk](const RemoteCommandRequest& request) {
+ onFindCommand([this, chunk](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), TagsType::ConfigNS);
@@ -1197,6 +1263,8 @@ TEST_F(CatalogManagerReplSetTest, GetTagForChunkInvalidTagDoc) {
<< TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max()
<< BSON("$gte" << chunk.getMax())));
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
// Return a tag document missing the min key
return vector<BSONObj>{BSON(TagsType::ns("test.mycol") << TagsType::tag("tag")
<< TagsType::max(BSON("a" << 20)))};
@@ -1364,6 +1432,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) {
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
@@ -1372,6 +1442,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) {
ASSERT_EQ(BSONObj(), query->getSort());
ASSERT_FALSE(query->getLimit().is_initialized());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{s0.toBSON(), s1.toBSON(), s2.toBSON()};
});
@@ -1404,7 +1476,9 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) {
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
ASSERT_EQ(DatabaseType::ConfigNS, nss.ns());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
return vector<BSONObj>{};
});
@@ -1414,6 +1488,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) {
ASSERT_EQUALS("admin", request.dbname);
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
@@ -1426,6 +1501,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) {
ASSERT_EQUALS("admin", request.dbname);
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
@@ -1514,7 +1590,9 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDBExists) {
ASSERT_EQUALS(ErrorCodes::NamespaceExists, status);
});
- onFindCommand([dbname](const RemoteCommandRequest& request) {
+ onFindCommand([this, dbname](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
@@ -1524,6 +1602,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDBExists) {
ASSERT_EQ(DatabaseType::ConfigNS, query->ns());
ASSERT_EQ(queryBuilder.obj(), query->getFilter());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
return vector<BSONObj>{BSON("_id" << dbname)};
});
@@ -1550,7 +1629,9 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDBExistsDifferentCase) {
ASSERT_EQUALS(ErrorCodes::DatabaseDifferCase, status);
});
- onFindCommand([dbname, dbnameDiffCase](const RemoteCommandRequest& request) {
+ onFindCommand([this, dbname, dbnameDiffCase](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
@@ -1560,6 +1641,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDBExistsDifferentCase) {
ASSERT_EQ(DatabaseType::ConfigNS, query->ns());
ASSERT_EQ(queryBuilder.obj(), query->getFilter());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
return vector<BSONObj>{BSON("_id" << dbnameDiffCase)};
});
@@ -1586,14 +1668,17 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseNoShards) {
});
// Report no databases with the same name already exist
- onFindCommand([dbname](const RemoteCommandRequest& request) {
+ onFindCommand([this, dbname](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(DatabaseType::ConfigNS, nss.ns());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
return vector<BSONObj>{};
});
// Report no shards exist
- onFindCommand([](const RemoteCommandRequest& request) {
+ onFindCommand([this](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
@@ -1602,6 +1687,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseNoShards) {
ASSERT_EQ(BSONObj(), query->getSort());
ASSERT_FALSE(query->getLimit().is_initialized());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{};
});
@@ -1630,6 +1717,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) {
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
@@ -1638,6 +1726,8 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) {
ASSERT_EQ(BSONObj(), query->getSort());
ASSERT_FALSE(query->getLimit().is_initialized());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{s0.toBSON(), s1.toBSON(), s2.toBSON()};
});
@@ -1669,8 +1759,10 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) {
// Report no databases with the same name already exist
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(DatabaseType::ConfigNS, nss.ns());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
return vector<BSONObj>{};
});
@@ -1680,6 +1772,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) {
ASSERT_EQUALS("admin", request.dbname);
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
@@ -1692,6 +1785,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) {
ASSERT_EQUALS("admin", request.dbname);
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
@@ -1704,6 +1798,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) {
ASSERT_EQUALS("admin", request.dbname);
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
ASSERT_EQUALS(rpc::makeEmptyMetadata(), request.metadata);
@@ -1714,6 +1809,7 @@ TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) {
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
ASSERT_EQUALS("config", request.dbname);
+ ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
@@ -1773,7 +1869,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExists) {
});
// Query to find if db already exists in config.
- onFindCommand([](const RemoteCommandRequest& request) {
+ onFindCommand([this](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(DatabaseType::ConfigNS, nss.toString());
@@ -1788,6 +1885,8 @@ TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExists) {
ASSERT_EQ(BSONObj(), query->getSort());
ASSERT_EQ(1, query->getLimit().get());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
return vector<BSONObj>{};
});
@@ -2109,5 +2208,113 @@ TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) {
future3.timed_get(kFutureTimeout);
}
+TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeFindThenCmd) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future1 = launchAsync(
+ [this] { ASSERT_OK(catalogManager()->getGlobalSettings("chunksize").getStatus()); });
+
+ repl::OpTime highestOpTime;
+ const repl::OpTime newOpTime(Timestamp(7, 6), 5);
+
+ onFindWithMetadataCommand(
+ [this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ ReplSetMetadata metadata(12, newOpTime, 100, 3);
+ BSONObjBuilder builder;
+ BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName));
+ metadata.writeToMetadata(&replBuilder);
+ replBuilder.done();
+
+ SettingsType settings;
+ settings.setKey("chunksize");
+ settings.setChunkSizeMB(2);
+
+ return std::make_tuple(vector<BSONObj>{settings.toBSON()}, builder.obj());
+ });
+
+ future1.timed_get(kFutureTimeout);
+
+ highestOpTime = newOpTime;
+
+ // Return an older OpTime
+ auto future2 = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder));
+ });
+
+ const repl::OpTime oldOpTime(Timestamp(3, 10), 5);
+
+ onCommand([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
+ ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ return BSON("ok" << 1);
+ });
+
+ future2.timed_get(kFutureTimeout);
+}
+
+TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeCmdThenFind) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ // Initialize the internal config OpTime
+ auto future1 = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ ASSERT_TRUE(catalogManager()->runReadCommand("test", BSON("dummy" << 1), &responseBuilder));
+ });
+
+ repl::OpTime highestOpTime;
+ const repl::OpTime newOpTime(Timestamp(7, 6), 5);
+
+ onCommandWithMetadata([this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
+ ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ ReplSetMetadata metadata(12, newOpTime, 100, 3);
+ BSONObjBuilder builder;
+ BSONObjBuilder replBuilder(builder.subobjStart(rpc::kReplicationMetadataFieldName));
+ metadata.writeToMetadata(&replBuilder);
+ replBuilder.done();
+
+ return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
+ });
+
+ future1.timed_get(kFutureTimeout);
+
+ highestOpTime = newOpTime;
+
+ // Return an older OpTime
+ auto future2 = launchAsync(
+ [this] { ASSERT_OK(catalogManager()->getGlobalSettings("chunksize").getStatus()); });
+
+ const repl::OpTime oldOpTime(Timestamp(3, 10), 5);
+
+ onFindCommand([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
+ ASSERT_EQ(string("find"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ SettingsType settings;
+ settings.setKey("chunksize");
+ settings.setChunkSizeMB(2);
+
+ return vector<BSONObj>{settings.toBSON()};
+ });
+
+ future2.timed_get(kFutureTimeout);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp
index 3b0036418a5..36eb4bbac64 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.cpp
@@ -189,6 +189,11 @@ void CatalogManagerReplSetTestFixture::onFindCommand(NetworkTestEnv::OnFindComma
_networkTestEnv->onFindCommand(func);
}
+void CatalogManagerReplSetTestFixture::onFindWithMetadataCommand(
+ NetworkTestEnv::OnFindCommandWithMetadataFunction func) {
+ _networkTestEnv->onFindWithMetadataCommand(func);
+}
+
void CatalogManagerReplSetTestFixture::setupShards(const std::vector<ShardType>& shards) {
auto future = launchAsync([this] { shardRegistry()->reload(); });
@@ -198,7 +203,7 @@ void CatalogManagerReplSetTestFixture::setupShards(const std::vector<ShardType>&
}
void CatalogManagerReplSetTestFixture::expectGetShards(const std::vector<ShardType>& shards) {
- onFindCommand([&shards](const RemoteCommandRequest& request) {
+ onFindCommand([this, &shards](const RemoteCommandRequest& request) {
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.toString(), ShardType::ConfigNS);
@@ -212,6 +217,8 @@ void CatalogManagerReplSetTestFixture::expectGetShards(const std::vector<ShardTy
ASSERT_EQ(query->getSort(), BSONObj());
ASSERT_FALSE(query->getLimit().is_initialized());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), 0);
+
vector<BSONObj> shardsToReturn;
std::transform(shards.begin(),
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h
index 67d8d678637..651049bb5f7 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h
@@ -96,6 +96,8 @@ protected:
void onCommand(executor::NetworkTestEnv::OnCommandFunction func);
void onCommandWithMetadata(executor::NetworkTestEnv::OnCommandWithMetadataFunction func);
void onFindCommand(executor::NetworkTestEnv::OnFindCommandFunction func);
+ void onFindWithMetadataCommand(
+ executor::NetworkTestEnv::OnFindCommandWithMetadataFunction func);
/**
* Setup the shard registry to contain the given shards until the next reload.
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp
index c20e98a4ef9..57d0694e5cb 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_upgrade_test.cpp
@@ -53,12 +53,15 @@ TEST_F(CatalogManagerReplSetTestFixture, UpgradeNotNeeded) {
auto future = launchAsync([this] { ASSERT_OK(catalogManager()->checkAndUpgrade(true)); });
- onFindCommand([](const RemoteCommandRequest& request) {
+ onFindCommand([this](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(BSON(rpc::kReplicationMetadataFieldName << 1), request.metadata);
+
ASSERT_EQ(HostAndPort("config:123"), request.target);
ASSERT_EQ("config", request.dbname);
- ASSERT_EQ(BSON("find"
- << "version"),
- request.cmdObj);
+
+ const auto& findCmd = request.cmdObj;
+ ASSERT_EQ("version", findCmd["find"].str());
+ checkReadConcern(findCmd, Timestamp(0, 0), 0);
BSONObj versionDoc(fromjson(R"({
_id: 1,
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 6eaa45c8c50..921849312ce 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -269,28 +269,43 @@ shared_ptr<Shard> ShardRegistry::_findUsingLookUp(const ShardId& shardId) {
return nullptr;
}
-StatusWith<std::vector<BSONObj>> ShardRegistry::exhaustiveFind(const HostAndPort& host,
- const NamespaceString& nss,
- const BSONObj& query,
- const BSONObj& sort,
- boost::optional<long long> limit) {
+StatusWith<ShardRegistry::QueryResponse> ShardRegistry::exhaustiveFind(
+ const HostAndPort& host,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& sort,
+ boost::optional<long long> limit,
+ boost::optional<repl::ReadConcernArgs> readConcern,
+ const BSONObj& metadata) {
// If for some reason the callback never gets invoked, we will return this status
Status status = Status(ErrorCodes::InternalError, "Internal error running find command");
- vector<BSONObj> results;
+ QueryResponse response;
- auto fetcherCallback = [&status, &results](const Fetcher::QueryResponseStatus& dataStatus,
- Fetcher::NextAction* nextAction) {
+ auto fetcherCallback = [&status, &response](const Fetcher::QueryResponseStatus& dataStatus,
+ Fetcher::NextAction* nextAction) {
// Throw out any accumulated results on error
if (!dataStatus.isOK()) {
status = dataStatus.getStatus();
- results.clear();
+ response.docs.clear();
return;
}
auto& data = dataStatus.getValue();
+ if (auto replField = data.otherFields.metadata[rpc::kReplicationMetadataFieldName]) {
+ auto replParseStatus = rpc::ReplSetMetadata::readFromMetadata(replField.Obj());
+
+ if (!replParseStatus.isOK()) {
+ status = replParseStatus.getStatus();
+ response.docs.clear();
+ return;
+ }
+
+ response.opTime = replParseStatus.getValue().getLastCommittedOptime();
+ }
+
for (const BSONObj& doc : data.documents) {
- results.push_back(std::move(doc.getOwned()));
+ response.docs.push_back(std::move(doc.getOwned()));
}
status = Status::OK();
@@ -304,7 +319,16 @@ StatusWith<std::vector<BSONObj>> ShardRegistry::exhaustiveFind(const HostAndPort
boost::none, // skip
limit);
- QueryFetcher fetcher(_executor.get(), host, nss, lpq->asFindCommand(), fetcherCallback);
+ BSONObjBuilder findCmdBuilder;
+ lpq->asFindCommand(&findCmdBuilder);
+
+ if (readConcern) {
+ BSONObjBuilder builder;
+ readConcern->appendInfo(&findCmdBuilder);
+ }
+
+ QueryFetcher fetcher(
+ _executor.get(), host, nss, findCmdBuilder.done(), fetcherCallback, metadata);
Status scheduleStatus = fetcher.schedule();
if (!scheduleStatus.isOK()) {
@@ -317,7 +341,7 @@ StatusWith<std::vector<BSONObj>> ShardRegistry::exhaustiveFind(const HostAndPort
return status;
}
- return results;
+ return response;
}
StatusWith<BSONObj> ShardRegistry::runCommand(const HostAndPort& host,
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index 634bcdf2340..b465b065e8e 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -36,6 +36,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/read_concern_args.h"
#include "mongo/s/client/shard.h"
#include "mongo/stdx/mutex.h"
@@ -72,6 +73,11 @@ public:
repl::OpTime opTime;
};
+ struct QueryResponse {
+ std::vector<BSONObj> docs;
+ repl::OpTime opTime;
+ };
+
/**
* Instantiates a new shard registry.
*
@@ -150,11 +156,13 @@ public:
*
* Note: should never be used outside of CatalogManagerReplicaSet or DistLockCatalogImpl.
*/
- StatusWith<std::vector<BSONObj>> exhaustiveFind(const HostAndPort& host,
- const NamespaceString& nss,
- const BSONObj& query,
- const BSONObj& sort,
- boost::optional<long long> limit);
+ StatusWith<QueryResponse> exhaustiveFind(const HostAndPort& host,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& sort,
+ boost::optional<long long> limit,
+ boost::optional<repl::ReadConcernArgs> readConcern,
+ const BSONObj& metadata);
/**
* Runs a command against the specified host and returns the result. It is the responsibility