diff options
author | Jess Fan <jess.fan@10gen.com> | 2016-08-23 15:42:29 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2016-08-23 15:43:20 -0400 |
commit | 8f0665f1f0cb98db7b2c31880208b576fe90a04f (patch) | |
tree | 7be6c0af0ae426702d6aab715364841cefcbab95 /src/mongo/s | |
parent | eedc87f13014319a6890ee06a908c75a1ac7a13b (diff) | |
download | mongo-8f0665f1f0cb98db7b2c31880208b576fe90a04f.tar.gz |
SERVER-25001 Wire in new _configsvrSplitChunk command so mongod no longer runs applyOps directly.
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp | 94 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_client_impl.h | 17 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_log_change_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp | 80 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h | 5 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp | 40 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_test.cpp | 51 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client.h | 13 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_mock.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_mock.h | 10 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_move_primary_cmd.cpp | 12 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_remove_shard_cmd.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/config.cpp | 9 | ||||
-rw-r--r-- | src/mongo/s/config_server_test_fixture.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/sharding_test_fixture.h | 7 |
16 files changed, 278 insertions, 89 deletions
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp index 6cc5c791eb9..13379820db3 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp @@ -236,8 +236,10 @@ Status ShardingCatalogClientImpl::logAction(OperationContext* txn, const std::string& ns, const BSONObj& detail) { if (_actionLogCollectionCreated.load() == 0) { - Status result = _createCappedConfigCollection( - txn, kActionLogCollectionName, kActionLogCollectionSizeMB); + Status result = _createCappedConfigCollection(txn, + kActionLogCollectionName, + kActionLogCollectionSizeMB, + ShardingCatalogClient::kMajorityWriteConcern); if (result.isOK()) { _actionLogCollectionCreated.store(1); } else { @@ -246,16 +248,24 @@ Status ShardingCatalogClientImpl::logAction(OperationContext* txn, } } - return _log(txn, kActionLogCollectionName, what, ns, detail); + return _log(txn, + kActionLogCollectionName, + what, + ns, + detail, + ShardingCatalogClient::kMajorityWriteConcern); } Status ShardingCatalogClientImpl::logChange(OperationContext* txn, const std::string& what, const std::string& ns, - const BSONObj& detail) { + const BSONObj& detail, + const WriteConcernOptions& writeConcern) { + invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer || + writeConcern.wMode == WriteConcernOptions::kMajority); if (_changeLogCollectionCreated.load() == 0) { Status result = _createCappedConfigCollection( - txn, kChangeLogCollectionName, kChangeLogCollectionSizeMB); + txn, kChangeLogCollectionName, kChangeLogCollectionSizeMB, writeConcern); if (result.isOK()) { _changeLogCollectionCreated.store(1); } else { @@ -264,7 +274,7 @@ Status ShardingCatalogClientImpl::logChange(OperationContext* txn, } } - return _log(txn, kChangeLogCollectionName, what, ns, detail); + return _log(txn, kChangeLogCollectionName, what, ns, detail, writeConcern); } // static @@ -355,7 +365,8 @@ Status ShardingCatalogClientImpl::_log(OperationContext* txn, const StringData& logCollName, const std::string& what, const std::string& operationNS, - const BSONObj& detail) { + const BSONObj& detail, + const WriteConcernOptions& writeConcern) { Date_t now = Grid::get(txn)->getNetwork()->now(); const std::string hostName = Grid::get(txn)->getNetwork()->getHostName(); const string changeId = str::stream() << hostName << "-" << now.toString() << "-" << OID::gen(); @@ -373,8 +384,8 @@ Status ShardingCatalogClientImpl::_log(OperationContext* txn, log() << "about to log metadata event into " << logCollName << ": " << redact(changeLogBSON); const NamespaceString nss("config", logCollName); - Status result = insertConfigDocument( - txn, nss.ns(), changeLogBSON, ShardingCatalogClient::kMajorityWriteConcern); + Status result = insertConfigDocument(txn, nss.ns(), changeLogBSON, writeConcern); + if (!result.isOK()) { warning() << "Error encountered while logging config change with ID [" << changeId << "] into collection " << logCollName << ": " << redact(result); @@ -446,7 +457,11 @@ Status ShardingCatalogClientImpl::shardCollection(OperationContext* txn, collectionDetail.append("numChunks", static_cast<int>(initPoints.size() + 1)); - logChange(txn, "shardCollection.start", ns, collectionDetail.obj()); + logChange(txn, + "shardCollection.start", + ns, + collectionDetail.obj(), + ShardingCatalogClientImpl::kMajorityWriteConcern); } // Construct the collection default collator. @@ -502,7 +517,11 @@ Status ShardingCatalogClientImpl::shardCollection(OperationContext* txn, << dbPrimaryShardId << causedBy(status); } - logChange(txn, "shardCollection.end", ns, BSON("version" << manager->getVersion().toString())); + logChange(txn, + "shardCollection.end", + ns, + BSON("version" << manager->getVersion().toString()), + ShardingCatalogClientImpl::kMajorityWriteConcern); return Status::OK(); } @@ -558,7 +577,11 @@ StatusWith<ShardDrainingStatus> ShardingCatalogClientImpl::removeShard(Operation grid.shardRegistry()->reload(txn); // Record start in changelog - logChange(txn, "removeShard.start", "", BSON("shard" << name)); + logChange(txn, + "removeShard.start", + "", + BSON("shard" << name), + ShardingCatalogClientImpl::kMajorityWriteConcern); return ShardDrainingStatus::STARTED; } @@ -603,7 +626,11 @@ StatusWith<ShardDrainingStatus> ShardingCatalogClientImpl::removeShard(Operation grid.shardRegistry()->reload(txn); // Record finish in changelog - logChange(txn, "removeShard", "", BSON("shard" << name)); + logChange(txn, + "removeShard", + "", + BSON("shard" << name), + ShardingCatalogClientImpl::kMajorityWriteConcern); return ShardDrainingStatus::COMPLETED; } @@ -749,7 +776,11 @@ Status ShardingCatalogClientImpl::getCollections(OperationContext* txn, } Status ShardingCatalogClientImpl::dropCollection(OperationContext* txn, const NamespaceString& ns) { - logChange(txn, "dropCollection.start", ns.ns(), BSONObj()); + logChange(txn, + "dropCollection.start", + ns.ns(), + BSONObj(), + ShardingCatalogClientImpl::kMajorityWriteConcern); auto shardsStatus = getAllShards(txn, repl::ReadConcernLevel::kMajorityReadConcern); if (!shardsStatus.isOK()) { @@ -904,7 +935,11 @@ Status ShardingCatalogClientImpl::dropCollection(OperationContext* txn, const Na LOG(1) << "dropCollection " << ns << " completed"; - logChange(txn, "dropCollection", ns.ns(), BSONObj()); + logChange(txn, + "dropCollection", + ns.ns(), + BSONObj(), + ShardingCatalogClientImpl::kMajorityWriteConcern); return Status::OK(); } @@ -1012,14 +1047,17 @@ Status ShardingCatalogClientImpl::getChunks(OperationContext* txn, const BSONObj& sort, boost::optional<int> limit, vector<ChunkType>* chunks, - OpTime* opTime) { + OpTime* opTime, + repl::ReadConcernLevel readConcern) { + invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer || + readConcern == repl::ReadConcernLevel::kMajorityReadConcern); chunks->clear(); // Convert boost::optional<int> to boost::optional<long long>. auto longLimit = limit ? boost::optional<long long>(*limit) : boost::none; auto findStatus = _exhaustiveFindOnConfig(txn, kConfigReadSelector, - repl::ReadConcernLevel::kMajorityReadConcern, + readConcern, NamespaceString(ChunkType::ConfigNS), query, sort, @@ -1264,10 +1302,15 @@ Status ShardingCatalogClientImpl::applyChunkOpsDeprecated(OperationContext* txn, const BSONArray& updateOps, const BSONArray& preCondition, const std::string& nss, - const ChunkVersion& lastChunkVersion) { + const ChunkVersion& lastChunkVersion, + const WriteConcernOptions& writeConcern, + repl::ReadConcernLevel readConcern) { + invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer || + (readConcern == repl::ReadConcernLevel::kMajorityReadConcern && + writeConcern.wMode == WriteConcernOptions::kMajority)); BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition << WriteConcernOptions::kWriteConcernField - << ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + << writeConcern.toBSON()); auto response = Grid::get(txn)->shardRegistry()->getConfigShard()->runCommand( txn, @@ -1309,7 +1352,8 @@ Status ShardingCatalogClientImpl::applyChunkOpsDeprecated(OperationContext* txn, BSONObjBuilder query; lastChunkVersion.addToBSON(query, ChunkType::DEPRECATED_lastmod()); query.append(ChunkType::ns(), nss); - Status chunkStatus = getChunks(txn, query.obj(), BSONObj(), 1, &newestChunk, nullptr); + Status chunkStatus = + getChunks(txn, query.obj(), BSONObj(), 1, &newestChunk, nullptr, readConcern); if (!chunkStatus.isOK()) { warning() << "getChunks function failed, unable to validate chunk operation metadata" @@ -1546,12 +1590,14 @@ Status ShardingCatalogClientImpl::_checkDbDoesNotExist(OperationContext* txn, << dbName); } -Status ShardingCatalogClientImpl::_createCappedConfigCollection(OperationContext* txn, - StringData collName, - int cappedSize) { +Status ShardingCatalogClientImpl::_createCappedConfigCollection( + OperationContext* txn, + StringData collName, + int cappedSize, + const WriteConcernOptions& writeConcern) { BSONObj createCmd = BSON("create" << collName << "capped" << true << "size" << cappedSize << WriteConcernOptions::kWriteConcernField - << ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + << writeConcern.toBSON()); auto result = Grid::get(txn)->shardRegistry()->getConfigShard()->runCommand( txn, diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h index 72e000db216..98a7240c205 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h @@ -80,7 +80,8 @@ public: Status logChange(OperationContext* txn, const std::string& what, const std::string& ns, - const BSONObj& detail) override; + const BSONObj& detail, + const WriteConcernOptions& writeConcern) override; StatusWith<DistLockManager::ScopedDistLock> distLock(OperationContext* txn, StringData name, @@ -120,7 +121,8 @@ public: const BSONObj& sort, boost::optional<int> limit, std::vector<ChunkType>* chunks, - repl::OpTime* opTime) override; + repl::OpTime* opTime, + repl::ReadConcernLevel readConcern) override; Status getTagsForCollection(OperationContext* txn, const std::string& collectionNs, @@ -148,7 +150,9 @@ public: const BSONArray& updateOps, const BSONArray& preCondition, const std::string& nss, - const ChunkVersion& lastChunkVersion) override; + const ChunkVersion& lastChunkVersion, + const WriteConcernOptions& writeConcern, + repl::ReadConcernLevel readConcern) override; StatusWith<BSONObj> getGlobalSettings(OperationContext* txn, StringData key) override; @@ -214,7 +218,8 @@ private: */ Status _createCappedConfigCollection(OperationContext* txn, StringData collName, - int cappedSize); + int cappedSize, + const WriteConcernOptions& writeConcern); /** * Helper method for running a count command against the config server with appropriate @@ -255,12 +260,14 @@ private: * @param what E.g. "split", "migrate" (not interpreted) * @param operationNS To which collection the metadata change is being applied (not interpreted) * @param detail Additional info about the metadata change (not interpreted) + * @param writeConcern Write concern options to use for logging */ Status _log(OperationContext* txn, const StringData& logCollName, const std::string& what, const std::string& operationNS, - const BSONObj& detail); + const BSONObj& detail, + const WriteConcernOptions& writeConcern); // // All member variables are labeled with one of the following codes indicating the diff --git a/src/mongo/s/catalog/replset/sharding_catalog_log_change_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_log_change_test.cpp index fa3aae04361..8ec85408a4b 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_log_change_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_log_change_test.cpp @@ -171,7 +171,8 @@ protected: Status log(const std::string& what, const std::string& ns, const BSONObj& detail) { if (_configCollType == ChangeLog) { - return catalogClient()->logChange(operationContext(), what, ns, detail); + return catalogClient()->logChange( + operationContext(), what, ns, detail, ShardingCatalogClient::kMajorityWriteConcern); } else { return catalogClient()->logAction(operationContext(), what, ns, detail); } diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index cc9222cf5fd..37a43598826 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -71,6 +71,7 @@ #include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" #include "mongo/s/set_shard_version_request.h" +#include "mongo/s/shard_key_pattern.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/memory.h" @@ -812,7 +813,8 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard( shardDetails.append("name", shardType.getName()); shardDetails.append("host", shardConnectionString.toString()); - _catalogClient->logChange(txn, "addShard", "", shardDetails.obj()); + _catalogClient->logChange( + txn, "addShard", "", shardDetails.obj(), ShardingCatalogClient::kMajorityWriteConcern); // Ensure the added shard is visible to this process. auto shardRegistry = Grid::get(txn)->shardRegistry(); @@ -1061,7 +1063,7 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); // Acquire GlobalLock in MODE_X twice to prevent yielding. - // GLobalLock and the following lock on config.chunks are only needed to support + // GlobalLock and the following lock on config.chunks are only needed to support // mixed-mode operation with mongoses from 3.2 // TODO(SERVER-25337): Remove GlobalLock and config.chunks lock after 3.4 Lock::GlobalLock firstGlobalLock(txn->lockState(), MODE_X, UINT_MAX); @@ -1110,12 +1112,30 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, BSONArrayBuilder updates; for (const auto& endKey : newChunkBounds) { + // verify that splitPoints are non-equivalent + if (endKey.woCompare(startKey) == 0) { + return {ErrorCodes::InvalidOptions, + str::stream() << "split on lower bound " + " of chunk " + << "[" + << startKey + << "," + << endKey + << ")" + << "is not allowed"}; + } + + // verify that splits don't create too-big shard keys + Status shardKeyStatus = ShardKeyPattern::checkShardKeySize(endKey); + if (!shardKeyStatus.isOK()) { + return shardKeyStatus; + } + // splits only update the 'minor' portion of version currentMaxVersion.incMinor(); // build an update operation against the chunks collection of the config database - // with - // upsert true + // with upsert true BSONObjBuilder op; op.append("op", "u"); op.appendBool("b", true); @@ -1156,18 +1176,28 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, BSONObjBuilder b; b.append("ns", ChunkType::ConfigNS); b.append("q", - BSON("query" << BSON(ChunkType::ns(ns.ns())) << "orderby" + BSON("query" << BSON(ChunkType::ns(ns.ns()) << ChunkType::min() << range.getMin() + << ChunkType::max() + << range.getMax()) + << "orderby" << BSON(ChunkType::DEPRECATED_lastmod() << -1))); { BSONObjBuilder bb(b.subobjStart("res")); - collVersion.addToBSON(bb, ChunkType::DEPRECATED_lastmod()); + bb.append(ChunkType::DEPRECATED_epoch(), requestEpoch); + bb.append(ChunkType::shard(), shardName); } preCond.append(b.obj()); } // apply the batch of updates to remote and local metadata - Status applyOpsStatus = grid.catalogClient(txn)->applyChunkOpsDeprecated( - txn, updates.arr(), preCond.arr(), ns.ns(), currentMaxVersion); + Status applyOpsStatus = + grid.catalogClient(txn)->applyChunkOpsDeprecated(txn, + updates.arr(), + preCond.arr(), + ns.ns(), + currentMaxVersion, + WriteConcernOptions(), + repl::ReadConcernLevel::kLocalReadConcern); if (!applyOpsStatus.isOK()) { return applyOpsStatus; } @@ -1182,10 +1212,11 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, } if (newChunks.size() == 2) { - appendShortVersion(&logDetail.subobjStart("left"), newChunks[0]); - appendShortVersion(&logDetail.subobjStart("right"), newChunks[1]); + _appendShortVersion(logDetail.subobjStart("left"), newChunks[0]); + _appendShortVersion(logDetail.subobjStart("right"), newChunks[1]); - grid.catalogClient(txn)->logChange(txn, "split", ns.ns(), logDetail.obj()); + grid.catalogClient(txn)->logChange( + txn, "split", ns.ns(), logDetail.obj(), WriteConcernOptions()); } else { BSONObj beforeDetailObj = logDetail.obj(); BSONObj firstDetailObj = beforeDetailObj.getOwned(); @@ -1196,9 +1227,10 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, chunkDetail.appendElements(beforeDetailObj); chunkDetail.append("number", i + 1); chunkDetail.append("of", newChunksSize); - appendShortVersion(&chunkDetail.subobjStart("chunk"), newChunks[i]); + _appendShortVersion(chunkDetail.subobjStart("chunk"), newChunks[i]); - grid.catalogClient(txn)->logChange(txn, "multi-split", ns.ns(), chunkDetail.obj()); + grid.catalogClient(txn)->logChange( + txn, "multi-split", ns.ns(), chunkDetail.obj(), WriteConcernOptions()); } } @@ -1325,8 +1357,14 @@ Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn, } // apply the batch of updates to remote and local metadata - Status applyOpsStatus = grid.catalogClient(txn)->applyChunkOpsDeprecated( - txn, updates.arr(), preCond.arr(), ns.ns(), mergeVersion); + Status applyOpsStatus = + grid.catalogClient(txn)->applyChunkOpsDeprecated(txn, + updates.arr(), + preCond.arr(), + ns.ns(), + mergeVersion, + WriteConcernOptions(), + repl::ReadConcernLevel::kLocalReadConcern); if (!applyOpsStatus.isOK()) { return applyOpsStatus; } @@ -1342,11 +1380,21 @@ Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn, collVersion.addToBSON(logDetail, "prevShardVersion"); mergeVersion.addToBSON(logDetail, "mergedVersion"); - grid.catalogClient(txn)->logChange(txn, "merge", ns.ns(), logDetail.obj()); + grid.catalogClient(txn)->logChange( + txn, "merge", ns.ns(), logDetail.obj(), WriteConcernOptions()); return applyOpsStatus; } +void ShardingCatalogManagerImpl::_appendShortVersion(BufBuilder& b, const ChunkType& chunk) { + BSONObjBuilder bb(b); + bb.append(ChunkType::min(), chunk.getMin()); + bb.append(ChunkType::max(), chunk.getMax()); + if (chunk.isVersionSet()) + chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod()); + bb.done(); +} + void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) { _executorForAddShard->appendConnectionStats(stats); } diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h index 973a062f2e0..397c64e2a83 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h @@ -190,6 +190,11 @@ private: StatusWith<std::vector<ShardType>> _getAllShardingUnawareShards(OperationContext* txn); /** + * Append min, max and version information from chunk to the buffer for logChange purposes. + */ + void _appendShortVersion(BufBuilder& b, const ChunkType& chunk); + + /** * Callback function used when rescheduling an addShard task after the first attempt failed. * Checks if the callback has been canceled, and if not, proceeds to call * _scheduleAddShardTask. diff --git a/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp index 0b741d6f255..a5540940976 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp @@ -142,11 +142,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) { auto lastChunkDoc = lastChunkDocStatus.getValue(); ASSERT_BSONOBJ_EQ(chunkMax, lastChunkDoc.getMax()); - { - // Check for increment on third chunkDoc's minor version - ASSERT_EQ(origVersion.majorVersion(), lastChunkDoc.getVersion().majorVersion()); - ASSERT_EQ(origVersion.minorVersion() + 3, lastChunkDoc.getVersion().minorVersion()); - } + // Check for increment on third chunkDoc's minor version + ASSERT_EQ(origVersion.majorVersion(), lastChunkDoc.getVersion().majorVersion()); + ASSERT_EQ(origVersion.minorVersion() + 3, lastChunkDoc.getVersion().minorVersion()); } TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) { @@ -165,8 +163,9 @@ TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) { chunk.setMin(chunkMin); chunk.setMax(chunkMax); + std::vector<BSONObj> splitPoints; auto chunkSplitPoint = BSON("a" << 5); - std::vector<BSONObj> splitPoints{chunkSplitPoint}; + splitPoints.push_back(chunkSplitPoint); // set up second chunk (chunk2) auto competingVersion = ChunkVersion(2, 1, collEpoch); @@ -206,6 +205,35 @@ TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) { ASSERT_EQ(competingVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion()); ASSERT_EQ(competingVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion()); } + +TEST_F(SplitChunkTest, PreConditionFailErrors) { + ChunkType chunk; + chunk.setNS("TestDB.TestColl"); + + auto origVersion = ChunkVersion(1, 0, OID::gen()); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + auto chunkMin = BSON("a" << 1); + auto chunkMax = BSON("a" << 10); + chunk.setMin(chunkMin); + chunk.setMax(chunkMax); + + std::vector<BSONObj> splitPoints; + auto chunkSplitPoint = BSON("a" << 5); + splitPoints.push_back(chunkSplitPoint); + + setupChunks({chunk}); + + auto splitStatus = catalogManager()->commitChunkSplit(operationContext(), + NamespaceString("TestDB.TestColl"), + origVersion.epoch(), + ChunkRange(chunkMin, BSON("a" << 7)), + splitPoints, + "shard0000"); + ASSERT_EQ(ErrorCodes::BadValue, splitStatus); +} + TEST_F(SplitChunkTest, NonExisingNamespaceErrors) { ChunkType chunk; chunk.setNS("TestDB.TestColl"); diff --git a/src/mongo/s/catalog/replset/sharding_catalog_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_test.cpp index 9d0e2e13deb..fab9b690c13 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_test.cpp @@ -483,7 +483,8 @@ TEST_F(ShardingCatalogClientTest, GetChunksForNSWithSortAndLimit) { BSON(ChunkType::DEPRECATED_lastmod() << -1), 1, &chunks, - &opTime)); + &opTime, + repl::ReadConcernLevel::kMajorityReadConcern)); ASSERT_EQ(2U, chunks.size()); ASSERT_EQ(newOpTime, opTime); @@ -531,8 +532,13 @@ TEST_F(ShardingCatalogClientTest, GetChunksForNSNoSortNoLimit) { auto future = launchAsync([this, &chunksQuery] { vector<ChunkType> chunks; - ASSERT_OK(catalogClient()->getChunks( - operationContext(), chunksQuery, BSONObj(), boost::none, &chunks, nullptr)); + ASSERT_OK(catalogClient()->getChunks(operationContext(), + chunksQuery, + BSONObj(), + boost::none, + &chunks, + nullptr, + repl::ReadConcernLevel::kMajorityReadConcern)); ASSERT_EQ(0U, chunks.size()); return chunks; @@ -571,8 +577,13 @@ TEST_F(ShardingCatalogClientTest, GetChunksForNSInvalidChunk) { auto future = launchAsync([this, &chunksQuery] { vector<ChunkType> chunks; - Status status = catalogClient()->getChunks( - operationContext(), chunksQuery, BSONObj(), boost::none, &chunks, nullptr); + Status status = catalogClient()->getChunks(operationContext(), + chunksQuery, + BSONObj(), + boost::none, + &chunks, + nullptr, + repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_EQUALS(ErrorCodes::FailedToParse, status); ASSERT_EQ(0U, chunks.size()); @@ -1396,8 +1407,14 @@ TEST_F(ShardingCatalogClientTest, ApplyChunkOpsDeprecatedSuccessful) { ChunkVersion lastChunkVersion(0, 0, OID()); auto future = launchAsync([this, updateOps, preCondition, nss, lastChunkVersion] { - auto status = catalogClient()->applyChunkOpsDeprecated( - operationContext(), updateOps, preCondition, nss, lastChunkVersion); + auto status = + catalogClient()->applyChunkOpsDeprecated(operationContext(), + updateOps, + preCondition, + nss, + lastChunkVersion, + ShardingCatalogClient::kMajorityWriteConcern, + repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(status); }); @@ -1434,8 +1451,14 @@ TEST_F(ShardingCatalogClientTest, ApplyChunkOpsDeprecatedSuccessfulWithCheck) { ChunkVersion lastChunkVersion(0, 0, OID()); auto future = launchAsync([this, updateOps, preCondition, nss, lastChunkVersion] { - auto status = catalogClient()->applyChunkOpsDeprecated( - operationContext(), updateOps, preCondition, nss, lastChunkVersion); + auto status = + catalogClient()->applyChunkOpsDeprecated(operationContext(), + updateOps, + preCondition, + nss, + lastChunkVersion, + ShardingCatalogClient::kMajorityWriteConcern, + repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(status); }); @@ -1476,8 +1499,14 @@ TEST_F(ShardingCatalogClientTest, ApplyChunkOpsDeprecatedFailedWithCheck) { ChunkVersion lastChunkVersion(0, 0, OID()); auto future = launchAsync([this, updateOps, preCondition, nss, lastChunkVersion] { - auto status = catalogClient()->applyChunkOpsDeprecated( - operationContext(), updateOps, preCondition, nss, lastChunkVersion); + auto status = + catalogClient()->applyChunkOpsDeprecated(operationContext(), + updateOps, + preCondition, + nss, + lastChunkVersion, + ShardingCatalogClient::kMajorityWriteConcern, + repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, status); }); diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index 02de97a4f34..cb474382a55 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -244,6 +244,7 @@ public: * @param optime an out parameter that will contain the opTime of the config server. * Can be null. Note that chunks can be fetched in multiple batches and each batch * can have a unique opTime. This opTime will be the one from the last batch. + * @param readConcern The readConcern to use while querying for chunks. * * Returns a !OK status if an error occurs. */ @@ -252,7 +253,8 @@ public: const BSONObj& sort, boost::optional<int> limit, std::vector<ChunkType>* chunks, - repl::OpTime* opTime) = 0; + repl::OpTime* opTime, + repl::ReadConcernLevel readConcern) = 0; /** * Retrieves all tags for the specified collection. @@ -309,6 +311,8 @@ public: * @param nss: namespace string for the chunks collection. * @param lastChunkVersion: version of the last document being written to the chunks * collection. + * @param writeConcern: writeConcern to use for applying documents. + * @param readConcern: readConcern to use for verifying that documents have been applied. * * 'nss' and 'lastChunkVersion' uniquely identify the last document being written, which is * expected to appear in the chunks collection on success. This is important for the @@ -320,7 +324,9 @@ public: const BSONArray& updateOps, const BSONArray& preCondition, const std::string& nss, - const ChunkVersion& lastChunkVersion) = 0; + const ChunkVersion& lastChunkVersion, + const WriteConcernOptions& writeConcern, + repl::ReadConcernLevel readConcern) = 0; /** * Writes a diagnostic event to the action log. @@ -336,7 +342,8 @@ public: virtual Status logChange(OperationContext* txn, const std::string& what, const std::string& ns, - const BSONObj& detail) = 0; + const BSONObj& detail, + const WriteConcernOptions& writeConcern) = 0; /** * Reads global sharding settings from the confing.settings collection. The key parameter is diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp index 9206eb1d0e1..4335d4b5eb7 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp @@ -118,7 +118,8 @@ Status ShardingCatalogClientMock::getChunks(OperationContext* txn, const BSONObj& sort, boost::optional<int> limit, std::vector<ChunkType>* chunks, - repl::OpTime* opTime) { + repl::OpTime* opTime, + repl::ReadConcernLevel readConcern) { return {ErrorCodes::InternalError, "Method not implemented"}; } @@ -163,7 +164,9 @@ Status ShardingCatalogClientMock::applyChunkOpsDeprecated(OperationContext* txn, const BSONArray& updateOps, const BSONArray& preCondition, const std::string& nss, - const ChunkVersion& lastChunkVersion) { + const ChunkVersion& lastChunkVersion, + const WriteConcernOptions& writeConcern, + repl::ReadConcernLevel readConcern) { return {ErrorCodes::InternalError, "Method not implemented"}; } @@ -177,7 +180,8 @@ Status ShardingCatalogClientMock::logAction(OperationContext* txn, Status ShardingCatalogClientMock::logChange(OperationContext* txn, const string& what, const string& ns, - const BSONObj& detail) { + const BSONObj& detail, + const WriteConcernOptions& writeConcern) { return {ErrorCodes::InternalError, "Method not implemented"}; } diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index 8c37896fbbc..c072c51c6d9 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -88,7 +88,8 @@ public: const BSONObj& sort, boost::optional<int> limit, std::vector<ChunkType>* chunks, - repl::OpTime* opTime) override; + repl::OpTime* opTime, + repl::ReadConcernLevel readConcern) override; Status getTagsForCollection(OperationContext* txn, const std::string& collectionNs, @@ -116,7 +117,9 @@ public: const BSONArray& updateOps, const BSONArray& preCondition, const std::string& nss, - const ChunkVersion& lastChunkVersion) override; + const ChunkVersion& lastChunkVersion, + const WriteConcernOptions& writeConcern, + repl::ReadConcernLevel readConcern) override; Status logAction(OperationContext* txn, const std::string& what, @@ -126,7 +129,8 @@ public: Status logChange(OperationContext* txn, const std::string& what, const std::string& ns, - const BSONObj& detail) override; + const BSONObj& detail, + const WriteConcernOptions& writeConcern) override; StatusWith<BSONObj> getGlobalSettings(OperationContext* txn, StringData key) override; diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index cc7275e138c..4d5868ca8ae 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -282,8 +282,14 @@ bool ChunkManager::_load(OperationContext* txn, repl::OpTime opTime; std::vector<ChunkType> chunks; - uassertStatusOK(grid.catalogClient(txn)->getChunks( - txn, diffQuery.query, diffQuery.sort, boost::none, &chunks, &opTime)); + uassertStatusOK( + grid.catalogClient(txn)->getChunks(txn, + diffQuery.query, + diffQuery.sort, + boost::none, + &chunks, + &opTime, + repl::ReadConcernLevel::kMajorityReadConcern)); invariant(opTime >= _configOpTime); _configOpTime = opTime; diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp index 5db4f4aa666..b4e6ce5e003 100644 --- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp @@ -166,7 +166,11 @@ public: _buildMoveEntry(dbname, fromShard->toString(), toShard->toString(), shardedColls); auto catalogClient = grid.catalogClient(txn); - catalogClient->logChange(txn, "movePrimary.start", dbname, moveStartDetails); + catalogClient->logChange(txn, + "movePrimary.start", + dbname, + moveStartDetails, + ShardingCatalogClient::kMajorityWriteConcern); BSONArrayBuilder barr; barr.append(shardedColls); @@ -287,7 +291,11 @@ public: BSONObj moveFinishDetails = _buildMoveEntry(dbname, oldPrimary, toShard->toString(), shardedColls); - catalogClient->logChange(txn, "movePrimary", dbname, moveFinishDetails); + catalogClient->logChange(txn, + "movePrimary", + dbname, + moveFinishDetails, + ShardingCatalogClient::kMajorityWriteConcern); return true; } diff --git a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp index 0a8c4ff318f..03dd30ae787 100644 --- a/src/mongo/s/commands/cluster_remove_shard_cmd.cpp +++ b/src/mongo/s/commands/cluster_remove_shard_cmd.cpp @@ -142,7 +142,8 @@ public: BSONObj(), boost::none, // return all &chunks, - nullptr); + nullptr, + repl::ReadConcernLevel::kMajorityReadConcern); if (!status.isOK()) { return appendCommandStatus(result, status); } diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp index a4064b0246f..6459d758beb 100644 --- a/src/mongo/s/config.cpp +++ b/src/mongo/s/config.cpp @@ -311,7 +311,8 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(OperationContext* txn, BSON(ChunkType::DEPRECATED_lastmod() << -1), 1, &newestChunk, - nullptr)); + nullptr, + repl::ReadConcernLevel::kMajorityReadConcern)); if (!newestChunk.empty()) { invariant(newestChunk.size() == 1); @@ -540,7 +541,8 @@ bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) { */ log() << "DBConfig::dropDatabase: " << _name; - grid.catalogClient(txn)->logChange(txn, "dropDatabase.start", _name, BSONObj()); + grid.catalogClient(txn)->logChange( + txn, "dropDatabase.start", _name, BSONObj(), ShardingCatalogClient::kMajorityWriteConcern); // 1 grid.catalogCache()->invalidate(_name); @@ -618,7 +620,8 @@ bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) { LOG(1) << "\t dropped primary db for: " << _name; - grid.catalogClient(txn)->logChange(txn, "dropDatabase", _name, BSONObj()); + grid.catalogClient(txn)->logChange( + txn, "dropDatabase", _name, BSONObj(), ShardingCatalogClient::kMajorityWriteConcern); return true; } diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp index b78d95e2e03..8cb106aefcc 100644 --- a/src/mongo/s/config_server_test_fixture.cpp +++ b/src/mongo/s/config_server_test_fixture.cpp @@ -411,9 +411,8 @@ Status ConfigServerTestFixture::setupChunks(const std::vector<ChunkType>& chunks const NamespaceString chunkNS(ChunkType::ConfigNS); for (const auto& chunk : chunks) { auto insertStatus = insertToConfigCollection(operationContext(), chunkNS, chunk.toBSON()); - if (!insertStatus.isOK()) { + if (!insertStatus.isOK()) return insertStatus; - } } return Status::OK(); diff --git a/src/mongo/s/sharding_test_fixture.h b/src/mongo/s/sharding_test_fixture.h index 9365a18dd66..c330aa468dc 100644 --- a/src/mongo/s/sharding_test_fixture.h +++ b/src/mongo/s/sharding_test_fixture.h @@ -84,8 +84,6 @@ protected: ShardingCatalogClient* catalogClient() const; - ShardingCatalogManager* catalogManager() const; - /** * Prefer catalogClient() method over this as much as possible. */ @@ -113,8 +111,6 @@ protected: * single request + response or find tests. */ void onCommand(executor::NetworkTestEnv::OnCommandFunction func); - // Same as onCommand but run against _addShardNetworkTestEnv - void onCommandForAddShard(executor::NetworkTestEnv::OnCommandFunction func); void onCommandWithMetadata(executor::NetworkTestEnv::OnCommandWithMetadataFunction func); void onFindCommand(executor::NetworkTestEnv::OnFindCommandFunction func); void onFindWithMetadataCommand( @@ -221,12 +217,9 @@ private: executor::NetworkInterfaceMock* _mockNetwork; executor::TaskExecutor* _executor; - executor::TaskExecutor* _executorForAddShard; std::unique_ptr<executor::NetworkTestEnv> _networkTestEnv; - std::unique_ptr<executor::NetworkTestEnv> _addShardNetworkTestEnv; DistLockManagerMock* _distLockManager = nullptr; ShardingCatalogClientImpl* _catalogClient = nullptr; - ShardingCatalogManagerImpl* _catalogManager = nullptr; }; } // namespace mongo |