summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorJess Fan <jess.fan@10gen.com>2016-08-23 15:42:29 -0400
committerSpencer T Brody <spencer@mongodb.com>2016-08-23 15:43:20 -0400
commit8f0665f1f0cb98db7b2c31880208b576fe90a04f (patch)
tree7be6c0af0ae426702d6aab715364841cefcbab95 /src/mongo/s
parenteedc87f13014319a6890ee06a908c75a1ac7a13b (diff)
downloadmongo-8f0665f1f0cb98db7b2c31880208b576fe90a04f.tar.gz
SERVER-25001 Wire in new _configsvrSplitChunk command so mongod no longer runs applyOps directly.
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp94
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.h17
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_log_change_test.cpp3
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp80
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h5
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp40
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_test.cpp51
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client.h13
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.cpp10
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.h10
-rw-r--r--src/mongo/s/chunk_manager.cpp10
-rw-r--r--src/mongo/s/commands/cluster_move_primary_cmd.cpp12
-rw-r--r--src/mongo/s/commands/cluster_remove_shard_cmd.cpp3
-rw-r--r--src/mongo/s/config.cpp9
-rw-r--r--src/mongo/s/config_server_test_fixture.cpp3
-rw-r--r--src/mongo/s/sharding_test_fixture.h7
16 files changed, 278 insertions, 89 deletions
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
index 6cc5c791eb9..13379820db3 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
@@ -236,8 +236,10 @@ Status ShardingCatalogClientImpl::logAction(OperationContext* txn,
const std::string& ns,
const BSONObj& detail) {
if (_actionLogCollectionCreated.load() == 0) {
- Status result = _createCappedConfigCollection(
- txn, kActionLogCollectionName, kActionLogCollectionSizeMB);
+ Status result = _createCappedConfigCollection(txn,
+ kActionLogCollectionName,
+ kActionLogCollectionSizeMB,
+ ShardingCatalogClient::kMajorityWriteConcern);
if (result.isOK()) {
_actionLogCollectionCreated.store(1);
} else {
@@ -246,16 +248,24 @@ Status ShardingCatalogClientImpl::logAction(OperationContext* txn,
}
}
- return _log(txn, kActionLogCollectionName, what, ns, detail);
+ return _log(txn,
+ kActionLogCollectionName,
+ what,
+ ns,
+ detail,
+ ShardingCatalogClient::kMajorityWriteConcern);
}
Status ShardingCatalogClientImpl::logChange(OperationContext* txn,
const std::string& what,
const std::string& ns,
- const BSONObj& detail) {
+ const BSONObj& detail,
+ const WriteConcernOptions& writeConcern) {
+ invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
+ writeConcern.wMode == WriteConcernOptions::kMajority);
if (_changeLogCollectionCreated.load() == 0) {
Status result = _createCappedConfigCollection(
- txn, kChangeLogCollectionName, kChangeLogCollectionSizeMB);
+ txn, kChangeLogCollectionName, kChangeLogCollectionSizeMB, writeConcern);
if (result.isOK()) {
_changeLogCollectionCreated.store(1);
} else {
@@ -264,7 +274,7 @@ Status ShardingCatalogClientImpl::logChange(OperationContext* txn,
}
}
- return _log(txn, kChangeLogCollectionName, what, ns, detail);
+ return _log(txn, kChangeLogCollectionName, what, ns, detail, writeConcern);
}
// static
@@ -355,7 +365,8 @@ Status ShardingCatalogClientImpl::_log(OperationContext* txn,
const StringData& logCollName,
const std::string& what,
const std::string& operationNS,
- const BSONObj& detail) {
+ const BSONObj& detail,
+ const WriteConcernOptions& writeConcern) {
Date_t now = Grid::get(txn)->getNetwork()->now();
const std::string hostName = Grid::get(txn)->getNetwork()->getHostName();
const string changeId = str::stream() << hostName << "-" << now.toString() << "-" << OID::gen();
@@ -373,8 +384,8 @@ Status ShardingCatalogClientImpl::_log(OperationContext* txn,
log() << "about to log metadata event into " << logCollName << ": " << redact(changeLogBSON);
const NamespaceString nss("config", logCollName);
- Status result = insertConfigDocument(
- txn, nss.ns(), changeLogBSON, ShardingCatalogClient::kMajorityWriteConcern);
+ Status result = insertConfigDocument(txn, nss.ns(), changeLogBSON, writeConcern);
+
if (!result.isOK()) {
warning() << "Error encountered while logging config change with ID [" << changeId
<< "] into collection " << logCollName << ": " << redact(result);
@@ -446,7 +457,11 @@ Status ShardingCatalogClientImpl::shardCollection(OperationContext* txn,
collectionDetail.append("numChunks", static_cast<int>(initPoints.size() + 1));
- logChange(txn, "shardCollection.start", ns, collectionDetail.obj());
+ logChange(txn,
+ "shardCollection.start",
+ ns,
+ collectionDetail.obj(),
+ ShardingCatalogClientImpl::kMajorityWriteConcern);
}
// Construct the collection default collator.
@@ -502,7 +517,11 @@ Status ShardingCatalogClientImpl::shardCollection(OperationContext* txn,
<< dbPrimaryShardId << causedBy(status);
}
- logChange(txn, "shardCollection.end", ns, BSON("version" << manager->getVersion().toString()));
+ logChange(txn,
+ "shardCollection.end",
+ ns,
+ BSON("version" << manager->getVersion().toString()),
+ ShardingCatalogClientImpl::kMajorityWriteConcern);
return Status::OK();
}
@@ -558,7 +577,11 @@ StatusWith<ShardDrainingStatus> ShardingCatalogClientImpl::removeShard(Operation
grid.shardRegistry()->reload(txn);
// Record start in changelog
- logChange(txn, "removeShard.start", "", BSON("shard" << name));
+ logChange(txn,
+ "removeShard.start",
+ "",
+ BSON("shard" << name),
+ ShardingCatalogClientImpl::kMajorityWriteConcern);
return ShardDrainingStatus::STARTED;
}
@@ -603,7 +626,11 @@ StatusWith<ShardDrainingStatus> ShardingCatalogClientImpl::removeShard(Operation
grid.shardRegistry()->reload(txn);
// Record finish in changelog
- logChange(txn, "removeShard", "", BSON("shard" << name));
+ logChange(txn,
+ "removeShard",
+ "",
+ BSON("shard" << name),
+ ShardingCatalogClientImpl::kMajorityWriteConcern);
return ShardDrainingStatus::COMPLETED;
}
@@ -749,7 +776,11 @@ Status ShardingCatalogClientImpl::getCollections(OperationContext* txn,
}
Status ShardingCatalogClientImpl::dropCollection(OperationContext* txn, const NamespaceString& ns) {
- logChange(txn, "dropCollection.start", ns.ns(), BSONObj());
+ logChange(txn,
+ "dropCollection.start",
+ ns.ns(),
+ BSONObj(),
+ ShardingCatalogClientImpl::kMajorityWriteConcern);
auto shardsStatus = getAllShards(txn, repl::ReadConcernLevel::kMajorityReadConcern);
if (!shardsStatus.isOK()) {
@@ -904,7 +935,11 @@ Status ShardingCatalogClientImpl::dropCollection(OperationContext* txn, const Na
LOG(1) << "dropCollection " << ns << " completed";
- logChange(txn, "dropCollection", ns.ns(), BSONObj());
+ logChange(txn,
+ "dropCollection",
+ ns.ns(),
+ BSONObj(),
+ ShardingCatalogClientImpl::kMajorityWriteConcern);
return Status::OK();
}
@@ -1012,14 +1047,17 @@ Status ShardingCatalogClientImpl::getChunks(OperationContext* txn,
const BSONObj& sort,
boost::optional<int> limit,
vector<ChunkType>* chunks,
- OpTime* opTime) {
+ OpTime* opTime,
+ repl::ReadConcernLevel readConcern) {
+ invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
+ readConcern == repl::ReadConcernLevel::kMajorityReadConcern);
chunks->clear();
// Convert boost::optional<int> to boost::optional<long long>.
auto longLimit = limit ? boost::optional<long long>(*limit) : boost::none;
auto findStatus = _exhaustiveFindOnConfig(txn,
kConfigReadSelector,
- repl::ReadConcernLevel::kMajorityReadConcern,
+ readConcern,
NamespaceString(ChunkType::ConfigNS),
query,
sort,
@@ -1264,10 +1302,15 @@ Status ShardingCatalogClientImpl::applyChunkOpsDeprecated(OperationContext* txn,
const BSONArray& updateOps,
const BSONArray& preCondition,
const std::string& nss,
- const ChunkVersion& lastChunkVersion) {
+ const ChunkVersion& lastChunkVersion,
+ const WriteConcernOptions& writeConcern,
+ repl::ReadConcernLevel readConcern) {
+ invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||
+ (readConcern == repl::ReadConcernLevel::kMajorityReadConcern &&
+ writeConcern.wMode == WriteConcernOptions::kMajority));
BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition
<< WriteConcernOptions::kWriteConcernField
- << ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+ << writeConcern.toBSON());
auto response = Grid::get(txn)->shardRegistry()->getConfigShard()->runCommand(
txn,
@@ -1309,7 +1352,8 @@ Status ShardingCatalogClientImpl::applyChunkOpsDeprecated(OperationContext* txn,
BSONObjBuilder query;
lastChunkVersion.addToBSON(query, ChunkType::DEPRECATED_lastmod());
query.append(ChunkType::ns(), nss);
- Status chunkStatus = getChunks(txn, query.obj(), BSONObj(), 1, &newestChunk, nullptr);
+ Status chunkStatus =
+ getChunks(txn, query.obj(), BSONObj(), 1, &newestChunk, nullptr, readConcern);
if (!chunkStatus.isOK()) {
warning() << "getChunks function failed, unable to validate chunk operation metadata"
@@ -1546,12 +1590,14 @@ Status ShardingCatalogClientImpl::_checkDbDoesNotExist(OperationContext* txn,
<< dbName);
}
-Status ShardingCatalogClientImpl::_createCappedConfigCollection(OperationContext* txn,
- StringData collName,
- int cappedSize) {
+Status ShardingCatalogClientImpl::_createCappedConfigCollection(
+ OperationContext* txn,
+ StringData collName,
+ int cappedSize,
+ const WriteConcernOptions& writeConcern) {
BSONObj createCmd = BSON("create" << collName << "capped" << true << "size" << cappedSize
<< WriteConcernOptions::kWriteConcernField
- << ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+ << writeConcern.toBSON());
auto result = Grid::get(txn)->shardRegistry()->getConfigShard()->runCommand(
txn,
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
index 72e000db216..98a7240c205 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
@@ -80,7 +80,8 @@ public:
Status logChange(OperationContext* txn,
const std::string& what,
const std::string& ns,
- const BSONObj& detail) override;
+ const BSONObj& detail,
+ const WriteConcernOptions& writeConcern) override;
StatusWith<DistLockManager::ScopedDistLock> distLock(OperationContext* txn,
StringData name,
@@ -120,7 +121,8 @@ public:
const BSONObj& sort,
boost::optional<int> limit,
std::vector<ChunkType>* chunks,
- repl::OpTime* opTime) override;
+ repl::OpTime* opTime,
+ repl::ReadConcernLevel readConcern) override;
Status getTagsForCollection(OperationContext* txn,
const std::string& collectionNs,
@@ -148,7 +150,9 @@ public:
const BSONArray& updateOps,
const BSONArray& preCondition,
const std::string& nss,
- const ChunkVersion& lastChunkVersion) override;
+ const ChunkVersion& lastChunkVersion,
+ const WriteConcernOptions& writeConcern,
+ repl::ReadConcernLevel readConcern) override;
StatusWith<BSONObj> getGlobalSettings(OperationContext* txn, StringData key) override;
@@ -214,7 +218,8 @@ private:
*/
Status _createCappedConfigCollection(OperationContext* txn,
StringData collName,
- int cappedSize);
+ int cappedSize,
+ const WriteConcernOptions& writeConcern);
/**
* Helper method for running a count command against the config server with appropriate
@@ -255,12 +260,14 @@ private:
* @param what E.g. "split", "migrate" (not interpreted)
* @param operationNS To which collection the metadata change is being applied (not interpreted)
* @param detail Additional info about the metadata change (not interpreted)
+ * @param writeConcern Write concern options to use for logging
*/
Status _log(OperationContext* txn,
const StringData& logCollName,
const std::string& what,
const std::string& operationNS,
- const BSONObj& detail);
+ const BSONObj& detail,
+ const WriteConcernOptions& writeConcern);
//
// All member variables are labeled with one of the following codes indicating the
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_log_change_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_log_change_test.cpp
index fa3aae04361..8ec85408a4b 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_log_change_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_log_change_test.cpp
@@ -171,7 +171,8 @@ protected:
Status log(const std::string& what, const std::string& ns, const BSONObj& detail) {
if (_configCollType == ChangeLog) {
- return catalogClient()->logChange(operationContext(), what, ns, detail);
+ return catalogClient()->logChange(
+ operationContext(), what, ns, detail, ShardingCatalogClient::kMajorityWriteConcern);
} else {
return catalogClient()->logAction(operationContext(), what, ns, detail);
}
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index cc9222cf5fd..37a43598826 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -71,6 +71,7 @@
#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/set_shard_version_request.h"
+#include "mongo/s/shard_key_pattern.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/stdx/memory.h"
@@ -812,7 +813,8 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard(
shardDetails.append("name", shardType.getName());
shardDetails.append("host", shardConnectionString.toString());
- _catalogClient->logChange(txn, "addShard", "", shardDetails.obj());
+ _catalogClient->logChange(
+ txn, "addShard", "", shardDetails.obj(), ShardingCatalogClient::kMajorityWriteConcern);
// Ensure the added shard is visible to this process.
auto shardRegistry = Grid::get(txn)->shardRegistry();
@@ -1061,7 +1063,7 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn,
Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock);
// Acquire GlobalLock in MODE_X twice to prevent yielding.
- // GLobalLock and the following lock on config.chunks are only needed to support
+ // GlobalLock and the following lock on config.chunks are only needed to support
// mixed-mode operation with mongoses from 3.2
// TODO(SERVER-25337): Remove GlobalLock and config.chunks lock after 3.4
Lock::GlobalLock firstGlobalLock(txn->lockState(), MODE_X, UINT_MAX);
@@ -1110,12 +1112,30 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn,
BSONArrayBuilder updates;
for (const auto& endKey : newChunkBounds) {
+ // verify that splitPoints are non-equivalent
+ if (endKey.woCompare(startKey) == 0) {
+ return {ErrorCodes::InvalidOptions,
+ str::stream() << "split on lower bound "
+ " of chunk "
+ << "["
+ << startKey
+ << ","
+ << endKey
+ << ")"
+ << "is not allowed"};
+ }
+
+ // verify that splits don't create too-big shard keys
+ Status shardKeyStatus = ShardKeyPattern::checkShardKeySize(endKey);
+ if (!shardKeyStatus.isOK()) {
+ return shardKeyStatus;
+ }
+
// splits only update the 'minor' portion of version
currentMaxVersion.incMinor();
// build an update operation against the chunks collection of the config database
- // with
- // upsert true
+ // with upsert true
BSONObjBuilder op;
op.append("op", "u");
op.appendBool("b", true);
@@ -1156,18 +1176,28 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn,
BSONObjBuilder b;
b.append("ns", ChunkType::ConfigNS);
b.append("q",
- BSON("query" << BSON(ChunkType::ns(ns.ns())) << "orderby"
+ BSON("query" << BSON(ChunkType::ns(ns.ns()) << ChunkType::min() << range.getMin()
+ << ChunkType::max()
+ << range.getMax())
+ << "orderby"
<< BSON(ChunkType::DEPRECATED_lastmod() << -1)));
{
BSONObjBuilder bb(b.subobjStart("res"));
- collVersion.addToBSON(bb, ChunkType::DEPRECATED_lastmod());
+ bb.append(ChunkType::DEPRECATED_epoch(), requestEpoch);
+ bb.append(ChunkType::shard(), shardName);
}
preCond.append(b.obj());
}
// apply the batch of updates to remote and local metadata
- Status applyOpsStatus = grid.catalogClient(txn)->applyChunkOpsDeprecated(
- txn, updates.arr(), preCond.arr(), ns.ns(), currentMaxVersion);
+ Status applyOpsStatus =
+ grid.catalogClient(txn)->applyChunkOpsDeprecated(txn,
+ updates.arr(),
+ preCond.arr(),
+ ns.ns(),
+ currentMaxVersion,
+ WriteConcernOptions(),
+ repl::ReadConcernLevel::kLocalReadConcern);
if (!applyOpsStatus.isOK()) {
return applyOpsStatus;
}
@@ -1182,10 +1212,11 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn,
}
if (newChunks.size() == 2) {
- appendShortVersion(&logDetail.subobjStart("left"), newChunks[0]);
- appendShortVersion(&logDetail.subobjStart("right"), newChunks[1]);
+ _appendShortVersion(logDetail.subobjStart("left"), newChunks[0]);
+ _appendShortVersion(logDetail.subobjStart("right"), newChunks[1]);
- grid.catalogClient(txn)->logChange(txn, "split", ns.ns(), logDetail.obj());
+ grid.catalogClient(txn)->logChange(
+ txn, "split", ns.ns(), logDetail.obj(), WriteConcernOptions());
} else {
BSONObj beforeDetailObj = logDetail.obj();
BSONObj firstDetailObj = beforeDetailObj.getOwned();
@@ -1196,9 +1227,10 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn,
chunkDetail.appendElements(beforeDetailObj);
chunkDetail.append("number", i + 1);
chunkDetail.append("of", newChunksSize);
- appendShortVersion(&chunkDetail.subobjStart("chunk"), newChunks[i]);
+ _appendShortVersion(chunkDetail.subobjStart("chunk"), newChunks[i]);
- grid.catalogClient(txn)->logChange(txn, "multi-split", ns.ns(), chunkDetail.obj());
+ grid.catalogClient(txn)->logChange(
+ txn, "multi-split", ns.ns(), chunkDetail.obj(), WriteConcernOptions());
}
}
@@ -1325,8 +1357,14 @@ Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn,
}
// apply the batch of updates to remote and local metadata
- Status applyOpsStatus = grid.catalogClient(txn)->applyChunkOpsDeprecated(
- txn, updates.arr(), preCond.arr(), ns.ns(), mergeVersion);
+ Status applyOpsStatus =
+ grid.catalogClient(txn)->applyChunkOpsDeprecated(txn,
+ updates.arr(),
+ preCond.arr(),
+ ns.ns(),
+ mergeVersion,
+ WriteConcernOptions(),
+ repl::ReadConcernLevel::kLocalReadConcern);
if (!applyOpsStatus.isOK()) {
return applyOpsStatus;
}
@@ -1342,11 +1380,21 @@ Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn,
collVersion.addToBSON(logDetail, "prevShardVersion");
mergeVersion.addToBSON(logDetail, "mergedVersion");
- grid.catalogClient(txn)->logChange(txn, "merge", ns.ns(), logDetail.obj());
+ grid.catalogClient(txn)->logChange(
+ txn, "merge", ns.ns(), logDetail.obj(), WriteConcernOptions());
return applyOpsStatus;
}
+void ShardingCatalogManagerImpl::_appendShortVersion(BufBuilder& b, const ChunkType& chunk) {
+ BSONObjBuilder bb(b);
+ bb.append(ChunkType::min(), chunk.getMin());
+ bb.append(ChunkType::max(), chunk.getMax());
+ if (chunk.isVersionSet())
+ chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod());
+ bb.done();
+}
+
void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) {
_executorForAddShard->appendConnectionStats(stats);
}
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
index 973a062f2e0..397c64e2a83 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
@@ -190,6 +190,11 @@ private:
StatusWith<std::vector<ShardType>> _getAllShardingUnawareShards(OperationContext* txn);
/**
+ * Append min, max and version information from chunk to the buffer for logChange purposes.
+ */
+ void _appendShortVersion(BufBuilder& b, const ChunkType& chunk);
+
+ /**
* Callback function used when rescheduling an addShard task after the first attempt failed.
* Checks if the callback has been canceled, and if not, proceeds to call
* _scheduleAddShardTask.
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp
index 0b741d6f255..a5540940976 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp
@@ -142,11 +142,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
auto lastChunkDoc = lastChunkDocStatus.getValue();
ASSERT_BSONOBJ_EQ(chunkMax, lastChunkDoc.getMax());
- {
- // Check for increment on third chunkDoc's minor version
- ASSERT_EQ(origVersion.majorVersion(), lastChunkDoc.getVersion().majorVersion());
- ASSERT_EQ(origVersion.minorVersion() + 3, lastChunkDoc.getVersion().minorVersion());
- }
+ // Check for increment on third chunkDoc's minor version
+ ASSERT_EQ(origVersion.majorVersion(), lastChunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(origVersion.minorVersion() + 3, lastChunkDoc.getVersion().minorVersion());
}
TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) {
@@ -165,8 +163,9 @@ TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) {
chunk.setMin(chunkMin);
chunk.setMax(chunkMax);
+ std::vector<BSONObj> splitPoints;
auto chunkSplitPoint = BSON("a" << 5);
- std::vector<BSONObj> splitPoints{chunkSplitPoint};
+ splitPoints.push_back(chunkSplitPoint);
// set up second chunk (chunk2)
auto competingVersion = ChunkVersion(2, 1, collEpoch);
@@ -206,6 +205,35 @@ TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) {
ASSERT_EQ(competingVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion());
ASSERT_EQ(competingVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion());
}
+
+TEST_F(SplitChunkTest, PreConditionFailErrors) {
+ ChunkType chunk;
+ chunk.setNS("TestDB.TestColl");
+
+ auto origVersion = ChunkVersion(1, 0, OID::gen());
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkMax = BSON("a" << 10);
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkMax);
+
+ std::vector<BSONObj> splitPoints;
+ auto chunkSplitPoint = BSON("a" << 5);
+ splitPoints.push_back(chunkSplitPoint);
+
+ setupChunks({chunk});
+
+ auto splitStatus = catalogManager()->commitChunkSplit(operationContext(),
+ NamespaceString("TestDB.TestColl"),
+ origVersion.epoch(),
+ ChunkRange(chunkMin, BSON("a" << 7)),
+ splitPoints,
+ "shard0000");
+ ASSERT_EQ(ErrorCodes::BadValue, splitStatus);
+}
+
TEST_F(SplitChunkTest, NonExisingNamespaceErrors) {
ChunkType chunk;
chunk.setNS("TestDB.TestColl");
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_test.cpp
index 9d0e2e13deb..fab9b690c13 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_test.cpp
@@ -483,7 +483,8 @@ TEST_F(ShardingCatalogClientTest, GetChunksForNSWithSortAndLimit) {
BSON(ChunkType::DEPRECATED_lastmod() << -1),
1,
&chunks,
- &opTime));
+ &opTime,
+ repl::ReadConcernLevel::kMajorityReadConcern));
ASSERT_EQ(2U, chunks.size());
ASSERT_EQ(newOpTime, opTime);
@@ -531,8 +532,13 @@ TEST_F(ShardingCatalogClientTest, GetChunksForNSNoSortNoLimit) {
auto future = launchAsync([this, &chunksQuery] {
vector<ChunkType> chunks;
- ASSERT_OK(catalogClient()->getChunks(
- operationContext(), chunksQuery, BSONObj(), boost::none, &chunks, nullptr));
+ ASSERT_OK(catalogClient()->getChunks(operationContext(),
+ chunksQuery,
+ BSONObj(),
+ boost::none,
+ &chunks,
+ nullptr,
+ repl::ReadConcernLevel::kMajorityReadConcern));
ASSERT_EQ(0U, chunks.size());
return chunks;
@@ -571,8 +577,13 @@ TEST_F(ShardingCatalogClientTest, GetChunksForNSInvalidChunk) {
auto future = launchAsync([this, &chunksQuery] {
vector<ChunkType> chunks;
- Status status = catalogClient()->getChunks(
- operationContext(), chunksQuery, BSONObj(), boost::none, &chunks, nullptr);
+ Status status = catalogClient()->getChunks(operationContext(),
+ chunksQuery,
+ BSONObj(),
+ boost::none,
+ &chunks,
+ nullptr,
+ repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_EQUALS(ErrorCodes::FailedToParse, status);
ASSERT_EQ(0U, chunks.size());
@@ -1396,8 +1407,14 @@ TEST_F(ShardingCatalogClientTest, ApplyChunkOpsDeprecatedSuccessful) {
ChunkVersion lastChunkVersion(0, 0, OID());
auto future = launchAsync([this, updateOps, preCondition, nss, lastChunkVersion] {
- auto status = catalogClient()->applyChunkOpsDeprecated(
- operationContext(), updateOps, preCondition, nss, lastChunkVersion);
+ auto status =
+ catalogClient()->applyChunkOpsDeprecated(operationContext(),
+ updateOps,
+ preCondition,
+ nss,
+ lastChunkVersion,
+ ShardingCatalogClient::kMajorityWriteConcern,
+ repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_OK(status);
});
@@ -1434,8 +1451,14 @@ TEST_F(ShardingCatalogClientTest, ApplyChunkOpsDeprecatedSuccessfulWithCheck) {
ChunkVersion lastChunkVersion(0, 0, OID());
auto future = launchAsync([this, updateOps, preCondition, nss, lastChunkVersion] {
- auto status = catalogClient()->applyChunkOpsDeprecated(
- operationContext(), updateOps, preCondition, nss, lastChunkVersion);
+ auto status =
+ catalogClient()->applyChunkOpsDeprecated(operationContext(),
+ updateOps,
+ preCondition,
+ nss,
+ lastChunkVersion,
+ ShardingCatalogClient::kMajorityWriteConcern,
+ repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_OK(status);
});
@@ -1476,8 +1499,14 @@ TEST_F(ShardingCatalogClientTest, ApplyChunkOpsDeprecatedFailedWithCheck) {
ChunkVersion lastChunkVersion(0, 0, OID());
auto future = launchAsync([this, updateOps, preCondition, nss, lastChunkVersion] {
- auto status = catalogClient()->applyChunkOpsDeprecated(
- operationContext(), updateOps, preCondition, nss, lastChunkVersion);
+ auto status =
+ catalogClient()->applyChunkOpsDeprecated(operationContext(),
+ updateOps,
+ preCondition,
+ nss,
+ lastChunkVersion,
+ ShardingCatalogClient::kMajorityWriteConcern,
+ repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, status);
});
diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h
index 02de97a4f34..cb474382a55 100644
--- a/src/mongo/s/catalog/sharding_catalog_client.h
+++ b/src/mongo/s/catalog/sharding_catalog_client.h
@@ -244,6 +244,7 @@ public:
* @param optime an out parameter that will contain the opTime of the config server.
* Can be null. Note that chunks can be fetched in multiple batches and each batch
* can have a unique opTime. This opTime will be the one from the last batch.
+ * @param readConcern The readConcern to use while querying for chunks.
*
* Returns a !OK status if an error occurs.
*/
@@ -252,7 +253,8 @@ public:
const BSONObj& sort,
boost::optional<int> limit,
std::vector<ChunkType>* chunks,
- repl::OpTime* opTime) = 0;
+ repl::OpTime* opTime,
+ repl::ReadConcernLevel readConcern) = 0;
/**
* Retrieves all tags for the specified collection.
@@ -309,6 +311,8 @@ public:
* @param nss: namespace string for the chunks collection.
* @param lastChunkVersion: version of the last document being written to the chunks
* collection.
+ * @param writeConcern: writeConcern to use for applying documents.
+ * @param readConcern: readConcern to use for verifying that documents have been applied.
*
* 'nss' and 'lastChunkVersion' uniquely identify the last document being written, which is
* expected to appear in the chunks collection on success. This is important for the
@@ -320,7 +324,9 @@ public:
const BSONArray& updateOps,
const BSONArray& preCondition,
const std::string& nss,
- const ChunkVersion& lastChunkVersion) = 0;
+ const ChunkVersion& lastChunkVersion,
+ const WriteConcernOptions& writeConcern,
+ repl::ReadConcernLevel readConcern) = 0;
/**
* Writes a diagnostic event to the action log.
@@ -336,7 +342,8 @@ public:
virtual Status logChange(OperationContext* txn,
const std::string& what,
const std::string& ns,
- const BSONObj& detail) = 0;
+ const BSONObj& detail,
+ const WriteConcernOptions& writeConcern) = 0;
/**
* Reads global sharding settings from the confing.settings collection. The key parameter is
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
index 9206eb1d0e1..4335d4b5eb7 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
@@ -118,7 +118,8 @@ Status ShardingCatalogClientMock::getChunks(OperationContext* txn,
const BSONObj& sort,
boost::optional<int> limit,
std::vector<ChunkType>* chunks,
- repl::OpTime* opTime) {
+ repl::OpTime* opTime,
+ repl::ReadConcernLevel readConcern) {
return {ErrorCodes::InternalError, "Method not implemented"};
}
@@ -163,7 +164,9 @@ Status ShardingCatalogClientMock::applyChunkOpsDeprecated(OperationContext* txn,
const BSONArray& updateOps,
const BSONArray& preCondition,
const std::string& nss,
- const ChunkVersion& lastChunkVersion) {
+ const ChunkVersion& lastChunkVersion,
+ const WriteConcernOptions& writeConcern,
+ repl::ReadConcernLevel readConcern) {
return {ErrorCodes::InternalError, "Method not implemented"};
}
@@ -177,7 +180,8 @@ Status ShardingCatalogClientMock::logAction(OperationContext* txn,
Status ShardingCatalogClientMock::logChange(OperationContext* txn,
const string& what,
const string& ns,
- const BSONObj& detail) {
+ const BSONObj& detail,
+ const WriteConcernOptions& writeConcern) {
return {ErrorCodes::InternalError, "Method not implemented"};
}
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h
index 8c37896fbbc..c072c51c6d9 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h
@@ -88,7 +88,8 @@ public:
const BSONObj& sort,
boost::optional<int> limit,
std::vector<ChunkType>* chunks,
- repl::OpTime* opTime) override;
+ repl::OpTime* opTime,
+ repl::ReadConcernLevel readConcern) override;
Status getTagsForCollection(OperationContext* txn,
const std::string& collectionNs,
@@ -116,7 +117,9 @@ public:
const BSONArray& updateOps,
const BSONArray& preCondition,
const std::string& nss,
- const ChunkVersion& lastChunkVersion) override;
+ const ChunkVersion& lastChunkVersion,
+ const WriteConcernOptions& writeConcern,
+ repl::ReadConcernLevel readConcern) override;
Status logAction(OperationContext* txn,
const std::string& what,
@@ -126,7 +129,8 @@ public:
Status logChange(OperationContext* txn,
const std::string& what,
const std::string& ns,
- const BSONObj& detail) override;
+ const BSONObj& detail,
+ const WriteConcernOptions& writeConcern) override;
StatusWith<BSONObj> getGlobalSettings(OperationContext* txn, StringData key) override;
diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp
index cc7275e138c..4d5868ca8ae 100644
--- a/src/mongo/s/chunk_manager.cpp
+++ b/src/mongo/s/chunk_manager.cpp
@@ -282,8 +282,14 @@ bool ChunkManager::_load(OperationContext* txn,
repl::OpTime opTime;
std::vector<ChunkType> chunks;
- uassertStatusOK(grid.catalogClient(txn)->getChunks(
- txn, diffQuery.query, diffQuery.sort, boost::none, &chunks, &opTime));
+ uassertStatusOK(
+ grid.catalogClient(txn)->getChunks(txn,
+ diffQuery.query,
+ diffQuery.sort,
+ boost::none,
+ &chunks,
+ &opTime,
+ repl::ReadConcernLevel::kMajorityReadConcern));
invariant(opTime >= _configOpTime);
_configOpTime = opTime;
diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp
index 5db4f4aa666..b4e6ce5e003 100644
--- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp
@@ -166,7 +166,11 @@ public:
_buildMoveEntry(dbname, fromShard->toString(), toShard->toString(), shardedColls);
auto catalogClient = grid.catalogClient(txn);
- catalogClient->logChange(txn, "movePrimary.start", dbname, moveStartDetails);
+ catalogClient->logChange(txn,
+ "movePrimary.start",
+ dbname,
+ moveStartDetails,
+ ShardingCatalogClient::kMajorityWriteConcern);
BSONArrayBuilder barr;
barr.append(shardedColls);
@@ -287,7 +291,11 @@ public:
BSONObj moveFinishDetails =
_buildMoveEntry(dbname, oldPrimary, toShard->toString(), shardedColls);
- catalogClient->logChange(txn, "movePrimary", dbname, moveFinishDetails);
+ catalogClient->logChange(txn,
+ "movePrimary",
+ dbname,
+ moveFinishDetails,
+ ShardingCatalogClient::kMajorityWriteConcern);
return true;
}
diff --git a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp
index 0a8c4ff318f..03dd30ae787 100644
--- a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp
+++ b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp
@@ -142,7 +142,8 @@ public:
BSONObj(),
boost::none, // return all
&chunks,
- nullptr);
+ nullptr,
+ repl::ReadConcernLevel::kMajorityReadConcern);
if (!status.isOK()) {
return appendCommandStatus(result, status);
}
diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp
index a4064b0246f..6459d758beb 100644
--- a/src/mongo/s/config.cpp
+++ b/src/mongo/s/config.cpp
@@ -311,7 +311,8 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(OperationContext* txn,
BSON(ChunkType::DEPRECATED_lastmod() << -1),
1,
&newestChunk,
- nullptr));
+ nullptr,
+ repl::ReadConcernLevel::kMajorityReadConcern));
if (!newestChunk.empty()) {
invariant(newestChunk.size() == 1);
@@ -540,7 +541,8 @@ bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) {
*/
log() << "DBConfig::dropDatabase: " << _name;
- grid.catalogClient(txn)->logChange(txn, "dropDatabase.start", _name, BSONObj());
+ grid.catalogClient(txn)->logChange(
+ txn, "dropDatabase.start", _name, BSONObj(), ShardingCatalogClient::kMajorityWriteConcern);
// 1
grid.catalogCache()->invalidate(_name);
@@ -618,7 +620,8 @@ bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) {
LOG(1) << "\t dropped primary db for: " << _name;
- grid.catalogClient(txn)->logChange(txn, "dropDatabase", _name, BSONObj());
+ grid.catalogClient(txn)->logChange(
+ txn, "dropDatabase", _name, BSONObj(), ShardingCatalogClient::kMajorityWriteConcern);
return true;
}
diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp
index b78d95e2e03..8cb106aefcc 100644
--- a/src/mongo/s/config_server_test_fixture.cpp
+++ b/src/mongo/s/config_server_test_fixture.cpp
@@ -411,9 +411,8 @@ Status ConfigServerTestFixture::setupChunks(const std::vector<ChunkType>& chunks
const NamespaceString chunkNS(ChunkType::ConfigNS);
for (const auto& chunk : chunks) {
auto insertStatus = insertToConfigCollection(operationContext(), chunkNS, chunk.toBSON());
- if (!insertStatus.isOK()) {
+ if (!insertStatus.isOK())
return insertStatus;
- }
}
return Status::OK();
diff --git a/src/mongo/s/sharding_test_fixture.h b/src/mongo/s/sharding_test_fixture.h
index 9365a18dd66..c330aa468dc 100644
--- a/src/mongo/s/sharding_test_fixture.h
+++ b/src/mongo/s/sharding_test_fixture.h
@@ -84,8 +84,6 @@ protected:
ShardingCatalogClient* catalogClient() const;
- ShardingCatalogManager* catalogManager() const;
-
/**
* Prefer catalogClient() method over this as much as possible.
*/
@@ -113,8 +111,6 @@ protected:
* single request + response or find tests.
*/
void onCommand(executor::NetworkTestEnv::OnCommandFunction func);
- // Same as onCommand but run against _addShardNetworkTestEnv
- void onCommandForAddShard(executor::NetworkTestEnv::OnCommandFunction func);
void onCommandWithMetadata(executor::NetworkTestEnv::OnCommandWithMetadataFunction func);
void onFindCommand(executor::NetworkTestEnv::OnFindCommandFunction func);
void onFindWithMetadataCommand(
@@ -221,12 +217,9 @@ private:
executor::NetworkInterfaceMock* _mockNetwork;
executor::TaskExecutor* _executor;
- executor::TaskExecutor* _executorForAddShard;
std::unique_ptr<executor::NetworkTestEnv> _networkTestEnv;
- std::unique_ptr<executor::NetworkTestEnv> _addShardNetworkTestEnv;
DistLockManagerMock* _distLockManager = nullptr;
ShardingCatalogClientImpl* _catalogClient = nullptr;
- ShardingCatalogManagerImpl* _catalogManager = nullptr;
};
} // namespace mongo