summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-03-12 17:27:43 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-03-12 17:28:36 -0400
commit39e06c9ef8c797ad626956b564ac9ebe295cbaf3 (patch)
treebfa2742fe1a814980def015b29dc8d5bfaf4bad3 /src/mongo/s
parent8125c55a251805899552d0af4776930216223703 (diff)
downloadmongo-39e06c9ef8c797ad626956b564ac9ebe295cbaf3.tar.gz
SERVER-22611 Sharding catalog cache refactor
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/SConscript6
-rw-r--r--src/mongo/s/catalog/type_chunk.cpp9
-rw-r--r--src/mongo/s/catalog/type_chunk.h4
-rw-r--r--src/mongo/s/catalog_cache.cpp381
-rw-r--r--src/mongo/s/catalog_cache.h185
-rw-r--r--src/mongo/s/chunk_diff.cpp8
-rw-r--r--src/mongo/s/chunk_diff.h9
-rw-r--r--src/mongo/s/chunk_manager.cpp391
-rw-r--r--src/mongo/s/chunk_manager.h103
-rw-r--r--src/mongo/s/chunk_manager_query_test.cpp (renamed from src/mongo/s/chunk_manager_test.cpp)132
-rw-r--r--src/mongo/s/chunk_manager_refresh_test.cpp194
-rw-r--r--src/mongo/s/chunk_manager_test_fixture.cpp122
-rw-r--r--src/mongo/s/chunk_manager_test_fixture.h62
-rw-r--r--src/mongo/s/client/parallel.cpp84
-rw-r--r--src/mongo/s/client/parallel.h6
-rw-r--r--src/mongo/s/client/version_manager.cpp29
-rw-r--r--src/mongo/s/commands/chunk_manager_targeter.cpp275
-rw-r--r--src/mongo/s/commands/chunk_manager_targeter.cpp-8454ea5f737
-rw-r--r--src/mongo/s/commands/chunk_manager_targeter.h21
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp48
-rw-r--r--src/mongo/s/commands/cluster_aggregate.h4
-rw-r--r--src/mongo/s/commands/cluster_commands_common.cpp47
-rw-r--r--src/mongo/s/commands/cluster_commands_common.h15
-rw-r--r--src/mongo/s/commands/cluster_count_cmd.cpp124
-rw-r--r--src/mongo/s/commands/cluster_drop_cmd.cpp17
-rw-r--r--src/mongo/s/commands/cluster_drop_database_cmd.cpp19
-rw-r--r--src/mongo/s/commands/cluster_enable_sharding_cmd.cpp3
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp114
-rw-r--r--src/mongo/s/commands/cluster_flush_router_config_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_get_shard_version_cmd.cpp12
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp328
-rw-r--r--src/mongo/s/commands/cluster_merge_chunks_cmd.cpp33
-rw-r--r--src/mongo/s/commands/cluster_move_chunk_cmd.cpp42
-rw-r--r--src/mongo/s/commands/cluster_move_primary_cmd.cpp10
-rw-r--r--src/mongo/s/commands/cluster_plan_cache_cmd.cpp5
-rw-r--r--src/mongo/s/commands/cluster_shard_collection_cmd.cpp51
-rw-r--r--src/mongo/s/commands/cluster_split_cmd.cpp44
-rw-r--r--src/mongo/s/commands/cluster_write.cpp43
-rw-r--r--src/mongo/s/commands/commands_public.cpp576
-rw-r--r--src/mongo/s/commands/run_on_all_shards_cmd.cpp4
-rw-r--r--src/mongo/s/commands/strategy.cpp11
-rw-r--r--src/mongo/s/config.cpp366
-rw-r--r--src/mongo/s/config.h146
-rw-r--r--src/mongo/s/query/cluster_find.cpp26
-rw-r--r--src/mongo/s/sharding_raii.cpp159
-rw-r--r--src/mongo/s/sharding_raii.h152
46 files changed, 2732 insertions, 2427 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index f6ef30c691d..eede0298c6d 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -243,12 +243,10 @@ env.Library(
'chunk.cpp',
'chunk_manager.cpp',
'cluster_identity_loader.cpp',
- 'config.cpp',
'config_server_client.cpp',
'grid.cpp',
'shard_util.cpp',
'sharding_egress_metadata_hook.cpp',
- 'sharding_raii.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/audit',
@@ -265,8 +263,10 @@ env.Library(
env.CppUnitTest(
target='catalog_cache_test',
source=[
- 'chunk_manager_test.cpp',
'chunk_manager_index_bounds_test.cpp',
+ 'chunk_manager_query_test.cpp',
+ 'chunk_manager_refresh_test.cpp',
+ 'chunk_manager_test_fixture.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_test_fixture',
diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp
index ff6df77d6f5..e5091f16954 100644
--- a/src/mongo/s/catalog/type_chunk.cpp
+++ b/src/mongo/s/catalog/type_chunk.cpp
@@ -132,6 +132,15 @@ bool ChunkRange::operator!=(const ChunkRange& other) const {
return !(*this == other);
}
+ChunkType::ChunkType() = default;
+
+ChunkType::ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId)
+ : _ns(nss.ns()),
+ _min(range.getMin()),
+ _max(range.getMax()),
+ _version(version),
+ _shard(std::move(shardId)) {}
+
StatusWith<ChunkType> ChunkType::fromConfigBSON(const BSONObj& source) {
ChunkType chunk;
diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h
index 06a26db34be..6484f97b03b 100644
--- a/src/mongo/s/catalog/type_chunk.h
+++ b/src/mongo/s/catalog/type_chunk.h
@@ -32,6 +32,7 @@
#include <string>
#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/shard_id.h"
@@ -143,6 +144,9 @@ public:
static const BSONField<Date_t> DEPRECATED_lastmod;
static const BSONField<OID> DEPRECATED_epoch;
+ ChunkType();
+ ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId);
+
/**
* Constructs a new ChunkType object from BSON that has the config server's config.chunks
* collection format.
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index d2c8eaf5504..c3607d6053f 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -26,64 +26,391 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
#include "mongo/platform/basic.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
+#include "mongo/db/query/collation/collator_factory_interface.h"
+#include "mongo/db/repl/optime_with.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_database.h"
-#include "mongo/s/config.h"
+#include "mongo/s/chunk_diff.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/log.h"
+#include "mongo/util/timer.h"
namespace mongo {
+namespace {
-using std::shared_ptr;
-using std::string;
+// How many times to try refreshing the routing info if the set of chunks loaded from the config
+// server is found to be inconsistent.
+const int kMaxInconsistentRoutingInfoRefreshAttempts = 3;
-CatalogCache::CatalogCache() = default;
+/**
+ * This is an adapter so we can use config diffs - mongos and mongod do them slightly differently.
+ *
+ * The mongos adapter here tracks all shards, and stores ranges by (max, Chunk) in the map.
+ */
+class CMConfigDiffTracker : public ConfigDiffTracker<std::shared_ptr<Chunk>> {
+public:
+ CMConfigDiffTracker(const NamespaceString& nss,
+ RangeMap* currMap,
+ ChunkVersion* maxVersion,
+ MaxChunkVersionMap* maxShardVersions)
+ : ConfigDiffTracker<std::shared_ptr<Chunk>>(
+ nss.ns(), currMap, maxVersion, maxShardVersions) {}
-CatalogCache::~CatalogCache() = default;
+ bool isTracked(const ChunkType& chunk) const final {
+ // Mongos tracks all shards
+ return true;
+ }
-StatusWith<std::shared_ptr<DBConfig>> CatalogCache::getDatabase(OperationContext* opCtx,
- StringData dbName) {
- stdx::lock_guard<stdx::mutex> guard(_mutex);
+ bool isMinKeyIndexed() const final {
+ return false;
+ }
- auto it = _databases.find(dbName);
- if (it != _databases.end()) {
- return it->second;
+ std::pair<BSONObj, std::shared_ptr<Chunk>> rangeFor(OperationContext* opCtx,
+ const ChunkType& chunk) const final {
+ return std::make_pair(chunk.getMax(), std::make_shared<Chunk>(chunk));
}
- // Need to load from the store
- auto status = Grid::get(opCtx)->catalogClient(opCtx)->getDatabase(opCtx, dbName.toString());
- if (!status.isOK()) {
- return status.getStatus();
+ ShardId shardFor(OperationContext* opCtx, const ShardId& shardId) const final {
+ const auto shard =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
+ return shard->getId();
}
+};
+
+} // namespace
+
+CatalogCache::CatalogCache() = default;
+
+CatalogCache::~CatalogCache() = default;
+
+StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx,
+ StringData dbName) {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
- const auto& dbOpTimePair = status.getValue();
- auto db = std::make_shared<DBConfig>(dbOpTimePair.value, dbOpTimePair.opTime);
try {
- db->load(opCtx);
- auto emplaceResult = _databases.try_emplace(dbName, std::move(db));
- return emplaceResult.first->second;
+ return {CachedDatabaseInfo(_getDatabase_inlock(opCtx, dbName))};
} catch (const DBException& ex) {
return ex.toStatus();
}
}
-void CatalogCache::invalidate(StringData dbName) {
- stdx::lock_guard<stdx::mutex> guard(_mutex);
+StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo(
+ OperationContext* opCtx, const NamespaceString& nss) {
+ int numRefreshAttempts = 0;
- ShardedDatabasesMap::iterator it = _databases.find(dbName);
- if (it != _databases.end()) {
- _databases.erase(it);
+ while (true) {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+
+ std::shared_ptr<DatabaseInfoEntry> dbEntry;
+ try {
+ dbEntry = _getDatabase_inlock(opCtx, nss.db());
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+
+ auto& collections = dbEntry->collections;
+
+ auto it = collections.find(nss.ns());
+ if (it == collections.end()) {
+ auto shardStatus =
+ Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->primaryShardId);
+ if (!shardStatus.isOK()) {
+ return {ErrorCodes::fromInt(40371),
+ str::stream() << "The primary shard for collection " << nss.ns()
+ << " could not be loaded due to error "
+ << shardStatus.getStatus().toString()};
+ }
+
+ return {CachedCollectionRoutingInfo(
+ dbEntry->primaryShardId, nss, std::move(shardStatus.getValue()))};
+ }
+
+ auto& collEntry = it->second;
+
+ if (collEntry.needsRefresh) {
+ numRefreshAttempts++;
+
+ try {
+ auto newRoutingInfo =
+ refreshCollectionRoutingInfo(opCtx, nss, std::move(collEntry.routingInfo));
+ if (newRoutingInfo == nullptr) {
+ collections.erase(it);
+
+ // Loop around so we can return an "unsharded" routing info
+ continue;
+ }
+
+ collEntry.routingInfo = std::move(newRoutingInfo);
+ collEntry.needsRefresh = false;
+ } catch (const DBException& ex) {
+ // It is possible that the metadata is being changed concurrently, so retry the
+ // refresh with a wait
+ if (ex.getCode() == ErrorCodes::ConflictingOperationInProgress &&
+ numRefreshAttempts < kMaxInconsistentRoutingInfoRefreshAttempts) {
+ ul.unlock();
+
+ log() << "Metadata refresh for " << nss.ns() << " failed and will be retried"
+ << causedBy(redact(ex));
+
+ // Do the sleep outside of the mutex
+ sleepFor(Milliseconds(10) * numRefreshAttempts);
+ continue;
+ }
+
+ return ex.toStatus();
+ }
+ }
+
+ return {CachedCollectionRoutingInfo(dbEntry->primaryShardId, collEntry.routingInfo)};
+ }
+}
+
+StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo(
+ OperationContext* opCtx, StringData ns) {
+ return getCollectionRoutingInfo(opCtx, NamespaceString(ns));
+}
+
+StatusWith<CachedCollectionRoutingInfo> CatalogCache::getShardedCollectionRoutingInfoWithRefresh(
+ OperationContext* opCtx, const NamespaceString& nss) {
+ invalidateShardedCollection(nss);
+
+ auto routingInfoStatus = getCollectionRoutingInfo(opCtx, nss);
+ if (routingInfoStatus.isOK() && !routingInfoStatus.getValue().cm()) {
+ return {ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Collection " << nss.ns() << " is not sharded."};
}
+
+ return routingInfoStatus;
}
-void CatalogCache::invalidateAll() {
- stdx::lock_guard<stdx::mutex> guard(_mutex);
+StatusWith<CachedCollectionRoutingInfo> CatalogCache::getShardedCollectionRoutingInfoWithRefresh(
+ OperationContext* opCtx, StringData ns) {
+ return getShardedCollectionRoutingInfoWithRefresh(opCtx, NamespaceString(ns));
+}
+
+void CatalogCache::onStaleConfigError(CachedCollectionRoutingInfo&& ccrt) {
+ if (!ccrt._cm) {
+ // Here we received a stale config error for a collection which we previously thought was
+ // unsharded.
+ invalidateShardedCollection(ccrt._nss);
+ return;
+ }
+ // Here we received a stale config error for a collection which we previously though was sharded
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+
+ auto it = _databases.find(NamespaceString(ccrt._cm->getns()).db());
+ if (it == _databases.end()) {
+ // If the database does not exist, the collection must have been dropped so there is
+ // nothing to invalidate. The getCollectionRoutingInfo will handle the reload of the
+ // entire database and its collections.
+ return;
+ }
+
+ auto& collections = it->second->collections;
+
+ auto itColl = collections.find(ccrt._cm->getns());
+ if (itColl == collections.end()) {
+ // If the collection does not exist, this means it must have been dropped since the last
+ // time we retrieved a cache entry for it. Doing nothing in this case will cause the
+ // next call to getCollectionRoutingInfo to return an unsharded collection.
+ return;
+ } else if (itColl->second.routingInfo->getVersion() == ccrt._cm->getVersion()) {
+ // If the versions match, the last version of the routing information that we used is no
+ // longer valid, so trigger a refresh.
+ itColl->second.needsRefresh = true;
+ }
+}
+
+void CatalogCache::invalidateShardedCollection(const NamespaceString& nss) {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+
+ auto it = _databases.find(nss.db());
+ if (it == _databases.end()) {
+ return;
+ }
+
+ it->second->collections[nss.ns()].needsRefresh = true;
+}
+
+void CatalogCache::invalidateShardedCollection(StringData ns) {
+ invalidateShardedCollection(NamespaceString(ns));
+}
+
+void CatalogCache::purgeDatabase(StringData dbName) {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+
+ auto it = _databases.find(dbName);
+ if (it == _databases.end()) {
+ return;
+ }
+
+ _databases.erase(it);
+}
+
+void CatalogCache::purgeAllDatabases() {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
_databases.clear();
}
+std::shared_ptr<ChunkManager> CatalogCache::refreshCollectionRoutingInfo(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ std::shared_ptr<ChunkManager> existingRoutingInfo) {
+ Timer t;
+
+ const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx);
+
+ // Decide whether to do a full or partial load based on the state of the collection
+ auto collStatus = catalogClient->getCollection(opCtx, nss.ns());
+ if (collStatus == ErrorCodes::NamespaceNotFound) {
+ return nullptr;
+ }
+
+ const auto coll = uassertStatusOK(std::move(collStatus)).value;
+ if (coll.getDropped()) {
+ return nullptr;
+ }
+
+ ChunkVersion startingCollectionVersion;
+ ChunkMap chunkMap =
+ SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>();
+
+ if (!existingRoutingInfo) {
+ // If we don't have a basis chunk manager, do a full refresh
+ startingCollectionVersion = ChunkVersion(0, 0, coll.getEpoch());
+ } else if (existingRoutingInfo->getVersion().epoch() != coll.getEpoch()) {
+ // If the collection's epoch has changed, do a full refresh
+ startingCollectionVersion = ChunkVersion(0, 0, coll.getEpoch());
+ } else {
+ // Otherwise do a partial refresh
+ startingCollectionVersion = existingRoutingInfo->getVersion();
+ chunkMap = existingRoutingInfo->chunkMap();
+ }
+
+ log() << "Refreshing chunks based on version " << startingCollectionVersion;
+
+ // Diff tracker should *always* find at least one chunk if collection exists
+ const auto diffQuery =
+ CMConfigDiffTracker::createConfigDiffQuery(nss, startingCollectionVersion);
+
+ // Query the chunks which have changed
+ std::vector<ChunkType> newChunks;
+ repl::OpTime opTime;
+ uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks(
+ opCtx,
+ diffQuery.query,
+ diffQuery.sort,
+ boost::none,
+ &newChunks,
+ &opTime,
+ repl::ReadConcernLevel::kMajorityReadConcern));
+
+ ChunkVersion collectionVersion = startingCollectionVersion;
+
+ ShardVersionMap unusedShardVersions;
+ CMConfigDiffTracker differ(nss, &chunkMap, &collectionVersion, &unusedShardVersions);
+
+ const int diffsApplied = differ.calculateConfigDiff(opCtx, newChunks);
+
+ if (diffsApplied < 1) {
+ log() << "Refresh took " << t.millis() << " ms and failed because the collection's "
+ "sharding metadata either changed in between or "
+ "became corrupted";
+
+ uasserted(ErrorCodes::ConflictingOperationInProgress,
+ "Collection sharding status changed during refresh or became corrupted");
+ }
+
+ // If at least one diff was applied, the metadata is correct, but it might not have changed so
+ // in this case there is no need to recreate the chunk manager.
+ //
+ // NOTE: In addition to the above statement, it is also important that we return the same chunk
+ // manager object, because the write commands' code relies on changes of the chunk manager's
+ // sequence number to detect batch writes not making progress because of chunks moving across
+ // shards too frequently.
+ if (collectionVersion == startingCollectionVersion) {
+ log() << "Refresh took " << t.millis() << " ms and didn't find any metadata changes";
+
+ return existingRoutingInfo;
+ }
+
+ std::unique_ptr<CollatorInterface> defaultCollator;
+ if (!coll.getDefaultCollation().isEmpty()) {
+ // The collation should have been validated upon collection creation
+ defaultCollator = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
+ ->makeFromBSON(coll.getDefaultCollation()));
+ }
+
+ log() << "Refresh took " << t.millis() << " ms and found version " << collectionVersion;
+
+ return stdx::make_unique<ChunkManager>(nss,
+ coll.getKeyPattern(),
+ std::move(defaultCollator),
+ coll.getUnique(),
+ std::move(chunkMap),
+ collectionVersion);
+}
+
+std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase_inlock(
+ OperationContext* opCtx, StringData dbName) {
+ auto it = _databases.find(dbName);
+ if (it != _databases.end()) {
+ return it->second;
+ }
+
+ const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx);
+
+ const auto dbNameCopy = dbName.toString();
+
+ // Load the database entry
+ const auto opTimeWithDb = uassertStatusOK(catalogClient->getDatabase(opCtx, dbNameCopy));
+ const auto& dbDesc = opTimeWithDb.value;
+
+ // Load the sharded collections entries
+ std::vector<CollectionType> collections;
+ repl::OpTime collLoadConfigOptime;
+ uassertStatusOK(
+ catalogClient->getCollections(opCtx, &dbNameCopy, &collections, &collLoadConfigOptime));
+
+ StringMap<CollectionRoutingInfoEntry> collectionEntries;
+ for (const auto& coll : collections) {
+ collectionEntries[coll.getNs().ns()].needsRefresh = true;
+ }
+
+ return _databases[dbName] = std::shared_ptr<DatabaseInfoEntry>(new DatabaseInfoEntry{
+ dbDesc.getPrimary(), dbDesc.getSharded(), std::move(collectionEntries)});
+}
+
+CachedDatabaseInfo::CachedDatabaseInfo(std::shared_ptr<CatalogCache::DatabaseInfoEntry> db)
+ : _db(std::move(db)) {}
+
+const ShardId& CachedDatabaseInfo::primaryId() const {
+ return _db->primaryShardId;
+}
+
+bool CachedDatabaseInfo::shardingEnabled() const {
+ return _db->shardingEnabled;
+}
+
+CachedCollectionRoutingInfo::CachedCollectionRoutingInfo(ShardId primaryId,
+ std::shared_ptr<ChunkManager> cm)
+ : _primaryId(std::move(primaryId)), _cm(std::move(cm)) {}
+
+CachedCollectionRoutingInfo::CachedCollectionRoutingInfo(ShardId primaryId,
+ NamespaceString nss,
+ std::shared_ptr<Shard> primary)
+ : _primaryId(std::move(primaryId)), _nss(std::move(nss)), _primary(std::move(primary)) {}
+
} // namespace mongo
diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h
index 0e63f94b52a..528b2df4673 100644
--- a/src/mongo/s/catalog_cache.h
+++ b/src/mongo/s/catalog_cache.h
@@ -28,19 +28,20 @@
#pragma once
-#include <memory>
-
#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
+#include "mongo/s/chunk_manager.h"
+#include "mongo/s/chunk_version.h"
+#include "mongo/s/client/shard.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/string_map.h"
namespace mongo {
-class DBConfig;
+class CachedDatabaseInfo;
+class CachedCollectionRoutingInfo;
class OperationContext;
-template <typename T>
-class StatusWith;
/**
* This is the root of the "read-only" hierarchy of cached catalog metadata. It is read only
@@ -62,26 +63,184 @@ public:
*
* Returns the database cache entry if the database exists or a failed status otherwise.
*/
- StatusWith<std::shared_ptr<DBConfig>> getDatabase(OperationContext* opCtx, StringData dbName);
+ StatusWith<CachedDatabaseInfo> getDatabase(OperationContext* opCtx, StringData dbName);
+
+ /**
+ * Blocking shortcut method to get a specific sharded collection from a given database using the
+ * complete namespace. If the collection is sharded returns a ScopedChunkManager initialized
+ * with ChunkManager. If the collection is not sharded, returns a ScopedChunkManager initialized
+ * with the primary shard for the specified database. If an error occurs loading the metadata
+ * returns a failed status.
+ */
+ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& nss);
+ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(OperationContext* opCtx,
+ StringData ns);
+
+ /**
+ * Same as getCollectionRoutingInfo above, but in addition causes the namespace to be refreshed
+ * and returns a NamespaceNotSharded error if the collection is not sharded.
+ */
+ StatusWith<CachedCollectionRoutingInfo> getShardedCollectionRoutingInfoWithRefresh(
+ OperationContext* opCtx, const NamespaceString& nss);
+ StatusWith<CachedCollectionRoutingInfo> getShardedCollectionRoutingInfoWithRefresh(
+ OperationContext* opCtx, StringData ns);
+
+ /**
+ * Non-blocking method to be called whenever using the specified routing table has encountered a
+ * stale config exception. Returns immediately and causes the routing table to be refreshed the
+ * next time getCollectionRoutingInfo is called. Does nothing if the routing table has been
+ * refreshed already.
+ */
+ void onStaleConfigError(CachedCollectionRoutingInfo&&);
+
+ /**
+ * Non-blocking method, which indiscriminately causes the routing table for the specified
+ * namespace to be refreshed the next time getCollectionRoutingInfo is called.
+ */
+ void invalidateShardedCollection(const NamespaceString& nss);
+ void invalidateShardedCollection(StringData ns);
+
+ /**
+ * Blocking method, which removes the entire specified database (including its collections) from
+ * the cache.
+ */
+ void purgeDatabase(StringData dbName);
/**
- * Removes the database information for the specified name from the cache, so that the
- * next time getDatabase is called, it will be reloaded.
+ * Blocking method, which removes all databases (including their collections) from the cache.
*/
- void invalidate(StringData dbName);
+ void purgeAllDatabases();
/**
- * Purges all cached database information, which will cause the data to be reloaded again.
+ * Blocking method, which refreshes the routing information for the specified collection. If
+ * 'existingRoutingInfo' has been specified uses this as a basis to perform an 'incremental'
+ * refresh, which only fetches the chunks which changed. Otherwise does a full refresh, fetching
+ * all the chunks for the collection.
+ *
+ * Returns the refreshed routing information if the collection is still sharded or nullptr if it
+ * is not. If refresh fails for any reason, throws a DBException.
+ *
+ * With the exception of ConflictingOperationInProgress, error codes thrown from this method are
+ * final in that there is nothing that can be done to remedy them other than pass the error to
+ * the user.
+ *
+ * ConflictingOperationInProgress indicates that the chunk metadata was found to be
+ * inconsistent. Since this may be transient, due to the collection being dropped or recreated,
+ * the caller must retry the reload up to some configurable number of attempts.
+ *
+ * NOTE: Should never be called directly and is exposed as public for testing purposes only.
*/
- void invalidateAll();
+ static std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ std::shared_ptr<ChunkManager> existingRoutingInfo);
private:
- using ShardedDatabasesMap = StringMap<std::shared_ptr<DBConfig>>;
+ // Make the cache entries friends so they can access the private classes below
+ friend class CachedDatabaseInfo;
+ friend class CachedCollectionRoutingInfo;
+
+ /**
+ * Cache entry describing a collection.
+ */
+ struct CollectionRoutingInfoEntry {
+ std::shared_ptr<ChunkManager> routingInfo;
+
+ bool needsRefresh{true};
+ };
+
+ /**
+ * Cache entry describing a database.
+ */
+ struct DatabaseInfoEntry {
+ ShardId primaryShardId;
+
+ bool shardingEnabled;
+
+ StringMap<CollectionRoutingInfoEntry> collections;
+ };
+
+ using DatabaseInfoMap = StringMap<std::shared_ptr<DatabaseInfoEntry>>;
+
+ /**
+ * Ensures that the specified database is in the cache, loading it if necessary. If the database
+ * was not in cache, all the sharded collections will be in the 'needsRefresh' state.
+ */
+ std::shared_ptr<DatabaseInfoEntry> _getDatabase_inlock(OperationContext* opCtx,
+ StringData dbName);
// Mutex to serialize access to the structures below
stdx::mutex _mutex;
- ShardedDatabasesMap _databases;
+ // Map from DB name to the info for that database
+ DatabaseInfoMap _databases;
+};
+
+/**
+ * Constructed exclusively by the CatalogCache, contains a reference to the cached information for
+ * the specified database.
+ */
+class CachedDatabaseInfo {
+public:
+ const ShardId& primaryId() const;
+
+ bool shardingEnabled() const;
+
+private:
+ friend class CatalogCache;
+
+ CachedDatabaseInfo(std::shared_ptr<CatalogCache::DatabaseInfoEntry> db);
+
+ std::shared_ptr<CatalogCache::DatabaseInfoEntry> _db;
+};
+
+/**
+ * Constructed exclusively by the CatalogCache contains a reference to the routing information for
+ * the specified collection.
+ */
+class CachedCollectionRoutingInfo {
+public:
+ /**
+ * Returns the ID of the primary shard for the database owining this collection, regardless of
+ * whether it is sharded or not.
+ */
+ const ShardId& primaryId() const {
+ return _primaryId;
+ }
+
+ /**
+ * If the collection is sharded, returns a chunk manager for it. Otherwise, nullptr.
+ */
+ std::shared_ptr<ChunkManager> cm() const {
+ return _cm;
+ }
+
+ /**
+ * If the collection is not sharded, returns its primary shard. Otherwise, nullptr.
+ */
+ std::shared_ptr<Shard> primary() const {
+ return _primary;
+ }
+
+private:
+ friend class CatalogCache;
+
+ CachedCollectionRoutingInfo(ShardId primaryId, std::shared_ptr<ChunkManager> cm);
+
+ CachedCollectionRoutingInfo(ShardId primaryId,
+ NamespaceString nss,
+ std::shared_ptr<Shard> primary);
+
+ // The id of the primary shard containing the database
+ ShardId _primaryId;
+
+ // Reference to the corresponding chunk manager (if sharded) or null
+ std::shared_ptr<ChunkManager> _cm;
+
+ // Reference to the primary of the database (if not sharded) or null
+ NamespaceString _nss;
+ std::shared_ptr<Shard> _primary;
};
} // namespace mongo
diff --git a/src/mongo/s/chunk_diff.cpp b/src/mongo/s/chunk_diff.cpp
index f21555043ad..07d569a503d 100644
--- a/src/mongo/s/chunk_diff.cpp
+++ b/src/mongo/s/chunk_diff.cpp
@@ -105,7 +105,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* opCtx,
// Store epoch now so it doesn't change when we change max
OID currEpoch = _maxVersion->epoch();
- _validDiffs = 0;
+ int validDiffs = 0;
for (const ChunkType& chunk : chunks) {
const ChunkVersion& chunkVersion = chunk.getVersion();
@@ -121,7 +121,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* opCtx,
return -1;
}
- _validDiffs++;
+ validDiffs++;
// Get max changed version and chunk version
if (chunkVersion > *_maxVersion) {
@@ -151,7 +151,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* opCtx,
}
}
- LOG(3) << "found " << _validDiffs << " new chunks for collection " << _ns << " (tracking "
+ LOG(3) << "found " << validDiffs << " new chunks for collection " << _ns << " (tracking "
<< newTracked.size() << "), new version is " << *_maxVersion;
for (const ChunkType& chunk : newTracked) {
@@ -167,7 +167,7 @@ int ConfigDiffTracker<ValType>::calculateConfigDiff(OperationContext* opCtx,
_currMap->insert(rangeFor(opCtx, chunk));
}
- return _validDiffs;
+ return validDiffs;
}
ConfigDiffTrackerBase::QueryAndSort ConfigDiffTrackerBase::createConfigDiffQuery(
diff --git a/src/mongo/s/chunk_diff.h b/src/mongo/s/chunk_diff.h
index 0cea9fa678a..5910cc93eed 100644
--- a/src/mongo/s/chunk_diff.h
+++ b/src/mongo/s/chunk_diff.h
@@ -93,12 +93,8 @@ public:
RangeMap* currMap,
ChunkVersion* maxVersion,
MaxChunkVersionMap* maxShardVersions);
- virtual ~ConfigDiffTracker();
- // Call after load for more information
- int numValidDiffs() const {
- return _validDiffs;
- }
+ virtual ~ConfigDiffTracker();
// Applies changes to the config data from a vector of chunks passed in. Also includes minor
// version changes for particular major-version chunks if explicitly specified.
@@ -135,9 +131,6 @@ private:
RangeMap* const _currMap;
ChunkVersion* const _maxVersion;
MaxChunkVersionMap* const _maxShardVersions;
-
- // Store for later use
- int _validDiffs{0};
};
} // namespace mongo
diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp
index 20cfd7e098f..047bfa1d696 100644
--- a/src/mongo/s/chunk_manager.cpp
+++ b/src/mongo/s/chunk_manager.cpp
@@ -32,296 +32,50 @@
#include "mongo/s/chunk_manager.h"
-#include <boost/next_prior.hpp>
#include <vector>
#include "mongo/bson/simple_bsonobj_comparator.h"
-#include "mongo/client/read_preference.h"
#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/query/collation/collation_index_key.h"
#include "mongo/db/query/index_bounds_builder.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/query/query_planner_common.h"
-#include "mongo/s/catalog/sharding_catalog_client.h"
-#include "mongo/s/chunk_diff.h"
-#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/util/log.h"
-#include "mongo/util/timer.h"
namespace mongo {
-
-using std::map;
-using std::pair;
-using std::set;
-using std::shared_ptr;
-using std::string;
-using std::unique_ptr;
-
namespace {
// Used to generate sequence numbers to assign to each newly created ChunkManager
AtomicUInt32 nextCMSequenceNumber(0);
-/**
- * This is an adapter so we can use config diffs - mongos and mongod do them slightly differently.
- *
- * The mongos adapter here tracks all shards, and stores ranges by (max, Chunk) in the map.
- */
-class CMConfigDiffTracker : public ConfigDiffTracker<shared_ptr<Chunk>> {
-public:
- CMConfigDiffTracker(const std::string& ns,
- RangeMap* currMap,
- ChunkVersion* maxVersion,
- MaxChunkVersionMap* maxShardVersions,
- ChunkManager* manager)
- : ConfigDiffTracker<shared_ptr<Chunk>>(ns, currMap, maxVersion, maxShardVersions),
- _manager(manager) {}
-
- bool isTracked(const ChunkType& chunk) const final {
- // Mongos tracks all shards
- return true;
- }
-
- bool isMinKeyIndexed() const final {
- return false;
- }
-
- pair<BSONObj, shared_ptr<Chunk>> rangeFor(OperationContext* opCtx,
- const ChunkType& chunk) const final {
- return std::make_pair(chunk.getMax(), std::make_shared<Chunk>(chunk));
- }
-
- ShardId shardFor(OperationContext* opCtx, const ShardId& shardId) const final {
- const auto shard =
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
- return shard->getId();
- }
-
-private:
- ChunkManager* const _manager;
-};
-
-bool allOfType(BSONType type, const BSONObj& o) {
- BSONObjIterator it(o);
- while (it.more()) {
- if (it.next().type() != type) {
- return false;
- }
- }
- return true;
-}
-
-bool isChunkMapValid(const ChunkMap& chunkMap) {
-#define ENSURE(x) \
- do { \
- if (!(x)) { \
- log() << "ChunkManager::_isValid failed: " #x; \
- return false; \
- } \
- } while (0)
-
- if (chunkMap.empty()) {
- return true;
- }
-
- // Check endpoints
- ENSURE(allOfType(MinKey, chunkMap.begin()->second->getMin()));
- ENSURE(allOfType(MaxKey, boost::prior(chunkMap.end())->second->getMax()));
-
- // Make sure there are no gaps or overlaps
- for (ChunkMap::const_iterator it = boost::next(chunkMap.begin()), end = chunkMap.end();
- it != end;
- ++it) {
- ChunkMap::const_iterator last = boost::prior(it);
-
- if (SimpleBSONObjComparator::kInstance.evaluate(it->second->getMin() !=
- last->second->getMax())) {
- log() << last->second->toString();
- log() << it->second->toString();
- log() << it->second->getMin();
- log() << last->second->getMax();
- }
-
- ENSURE(SimpleBSONObjComparator::kInstance.evaluate(it->second->getMin() ==
- last->second->getMax()));
+void checkAllElementsAreOfType(BSONType type, const BSONObj& o) {
+ for (const auto&& element : o) {
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Not all elements of " << o << " are of type " << typeName(type),
+ element.type() == type);
}
-
- return true;
-
-#undef ENSURE
}
} // namespace
ChunkManager::ChunkManager(NamespaceString nss,
- const OID& epoch,
- const ShardKeyPattern& shardKeyPattern,
+ KeyPattern shardKeyPattern,
std::unique_ptr<CollatorInterface> defaultCollator,
- bool unique)
+ bool unique,
+ ChunkMap chunkMap,
+ ChunkVersion collectionVersion)
: _sequenceNumber(nextCMSequenceNumber.addAndFetch(1)),
_nss(std::move(nss)),
- _keyPattern(shardKeyPattern.getKeyPattern()),
+ _shardKeyPattern(shardKeyPattern),
_defaultCollator(std::move(defaultCollator)),
_unique(unique),
- _chunkMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>()),
- _chunkRangeMap(
- SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ShardAndChunkRange>()),
- _version(0, 0, epoch) {}
+ _chunkMap(std::move(chunkMap)),
+ _chunkMapViews(_constructChunkMapViews(collectionVersion.epoch(), _chunkMap)),
+ _collectionVersion(collectionVersion) {}
ChunkManager::~ChunkManager() = default;
-void ChunkManager::loadExistingRanges(OperationContext* opCtx, const ChunkManager* oldManager) {
- invariant(!_version.isSet());
-
- int tries = 3;
-
- while (tries--) {
- ChunkMap chunkMap =
- SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>();
- set<ShardId> shardIds;
- ShardVersionMap shardVersions;
-
- Timer t;
-
- log() << "ChunkManager loading chunks for " << _nss
- << " sequenceNumber: " << _sequenceNumber
- << " based on: " << (oldManager ? oldManager->getVersion().toString() : "(empty)");
-
- if (_load(opCtx, chunkMap, shardIds, &shardVersions, oldManager)) {
- // TODO: Merge into diff code above, so we validate in one place
- if (isChunkMapValid(chunkMap)) {
- _chunkMap = std::move(chunkMap);
- _shardVersions = std::move(shardVersions);
- _chunkRangeMap = _constructRanges(_chunkMap);
-
- log() << "ChunkManager load took " << t.millis() << " ms and found version "
- << _version;
-
- return;
- }
- }
-
- warning() << "ChunkManager load failed after " << t.millis()
- << " ms and will be retried up to " << tries << " more times";
-
- sleepmillis(10 * (3 - tries));
- }
-
- // This will abort construction so we should never have a reference to an invalid config
- msgasserted(13282,
- str::stream() << "Couldn't load a valid config for " << _nss.ns()
- << " after 3 attempts. Please try again.");
-}
-
-bool ChunkManager::_load(OperationContext* opCtx,
- ChunkMap& chunkMap,
- set<ShardId>& shardIds,
- ShardVersionMap* shardVersions,
- const ChunkManager* oldManager) {
- // Reset the max version, but not the epoch, when we aren't loading from the oldManager
- _version = ChunkVersion(0, 0, _version.epoch());
-
- // If we have a previous version of the ChunkManager to work from, use that info to reduce
- // our config query
- if (oldManager && oldManager->getVersion().isSet()) {
- // Get the old max version
- _version = oldManager->getVersion();
-
- // Load a copy of the old versions
- *shardVersions = oldManager->_shardVersions;
-
- // Load a copy of the chunk map, replacing the chunk manager with our own
- const ChunkMap& oldChunkMap = oldManager->getChunkMap();
-
- for (const auto& oldChunkMapEntry : oldChunkMap) {
- const auto& oldC = oldChunkMapEntry.second;
- chunkMap.emplace(oldC->getMax(), std::make_shared<Chunk>(*oldC));
- }
-
- LOG(2) << "loading chunk manager for collection " << _nss
- << " using old chunk manager w/ version " << _version.toString() << " and "
- << oldChunkMap.size() << " chunks";
- }
-
- // Get the diff query required
- const auto diffQuery = CMConfigDiffTracker::createConfigDiffQuery(_nss, _version);
-
- // Attach a diff tracker for the versioned chunk data
- CMConfigDiffTracker differ(_nss.ns(), &chunkMap, &_version, shardVersions, this);
-
- // Diff tracker should *always* find at least one chunk if collection exists
- repl::OpTime opTime;
- std::vector<ChunkType> chunks;
- uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks(
- opCtx,
- diffQuery.query,
- diffQuery.sort,
- boost::none,
- &chunks,
- &opTime,
- repl::ReadConcernLevel::kMajorityReadConcern));
-
- invariant(opTime >= _configOpTime);
- _configOpTime = opTime;
-
- int diffsApplied = differ.calculateConfigDiff(opCtx, chunks);
- if (diffsApplied > 0) {
- LOG(2) << "loaded " << diffsApplied << " chunks into new chunk manager for " << _nss
- << " with version " << _version;
-
- // Add all existing shards we find to the shards set
- for (ShardVersionMap::iterator it = shardVersions->begin(); it != shardVersions->end();) {
- auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, it->first);
- if (shardStatus.isOK()) {
- shardIds.insert(it->first);
- ++it;
- } else {
- invariant(shardStatus == ErrorCodes::ShardNotFound);
- shardVersions->erase(it++);
- }
- }
-
- _configOpTime = opTime;
-
- return true;
- } else if (diffsApplied == 0) {
- // No chunks were found for the ns
- warning() << "no chunks found when reloading " << _nss << ", previous version was "
- << _version;
-
- // Set all our data to empty
- chunkMap.clear();
- shardVersions->clear();
-
- _version = ChunkVersion(0, 0, OID());
- _configOpTime = opTime;
-
- return true;
- } else { // diffsApplied < 0
-
- bool allInconsistent = (differ.numValidDiffs() == 0);
- if (allInconsistent) {
- // All versions are different, this can be normal
- warning() << "major change in chunk information found when reloading " << _nss
- << ", previous version was " << _version;
- } else {
- // Inconsistent load halfway through (due to yielding cursor during load)
- // should be rare
- warning() << "inconsistent chunks found when reloading " << _nss
- << ", previous version was " << _version << ", this should be rare";
- }
-
- // Set all our data to empty to be extra safe
- chunkMap.clear();
- shardVersions->clear();
-
- _version = ChunkVersion(0, 0, OID());
-
- return allInconsistent;
- }
-}
-
std::shared_ptr<Chunk> ChunkManager::findIntersectingChunk(const BSONObj& shardKey,
const BSONObj& collation) const {
const bool hasSimpleCollation = (collation.isEmpty() && !_defaultCollator) ||
@@ -351,7 +105,7 @@ std::shared_ptr<Chunk> ChunkManager::findIntersectingChunkWithSimpleCollation(
void ChunkManager::getShardIdsForQuery(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& collation,
- set<ShardId>* shardIds) const {
+ std::set<ShardId>* shardIds) const {
auto qr = stdx::make_unique<QueryRequest>(_nss);
qr->setFilter(query);
@@ -370,7 +124,7 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx,
}
// Fast path for targeting equalities on the shard key.
- auto shardKeyToFind = _keyPattern.extractShardKeyFromQuery(*cq);
+ auto shardKeyToFind = _shardKeyPattern.extractShardKeyFromQuery(*cq);
if (!shardKeyToFind.isEmpty()) {
try {
auto chunk = findIntersectingChunk(shardKeyToFind, collation);
@@ -387,20 +141,20 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx,
// Query { a : { $gte : 1, $lt : 2 },
// b : { $gte : 3, $lt : 4 } }
// => Bounds { a : [1, 2), b : [3, 4) }
- IndexBounds bounds = getIndexBoundsForQuery(_keyPattern.toBSON(), *cq);
+ IndexBounds bounds = getIndexBoundsForQuery(_shardKeyPattern.toBSON(), *cq);
// Transforms bounds for each shard key field into full shard key ranges
// for example :
// Key { a : 1, b : 1 }
// Bounds { a : [1, 2), b : [3, 4) }
// => Ranges { a : 1, b : 3 } => { a : 2, b : 4 }
- BoundList ranges = _keyPattern.flattenBounds(bounds);
+ BoundList ranges = _shardKeyPattern.flattenBounds(bounds);
for (BoundList::const_iterator it = ranges.begin(); it != ranges.end(); ++it) {
getShardIdsForRange(it->first /*min*/, it->second /*max*/, shardIds);
// once we know we need to visit all shards no need to keep looping
- if (shardIds->size() == _shardVersions.size()) {
+ if (shardIds->size() == _chunkMapViews.shardVersions.size()) {
break;
}
}
@@ -409,38 +163,38 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx,
// For now, we satisfy that assumption by adding a shard with no matches rather than returning
// an empty set of shards.
if (shardIds->empty()) {
- shardIds->insert(_chunkRangeMap.begin()->second.getShardId());
+ shardIds->insert(_chunkMapViews.chunkRangeMap.begin()->second.shardId);
}
}
void ChunkManager::getShardIdsForRange(const BSONObj& min,
const BSONObj& max,
std::set<ShardId>* shardIds) const {
- auto it = _chunkRangeMap.upper_bound(min);
- auto end = _chunkRangeMap.upper_bound(max);
+ auto it = _chunkMapViews.chunkRangeMap.upper_bound(min);
+ auto end = _chunkMapViews.chunkRangeMap.upper_bound(max);
// The chunk range map must always cover the entire key space
- invariant(it != _chunkRangeMap.end());
+ invariant(it != _chunkMapViews.chunkRangeMap.end());
// We need to include the last chunk
- if (end != _chunkRangeMap.cend()) {
+ if (end != _chunkMapViews.chunkRangeMap.cend()) {
++end;
}
for (; it != end; ++it) {
- shardIds->insert(it->second.getShardId());
+ shardIds->insert(it->second.shardId);
// No need to iterate through the rest of the ranges, because we already know we need to use
// all shards.
- if (shardIds->size() == _shardVersions.size()) {
+ if (shardIds->size() == _chunkMapViews.shardVersions.size()) {
break;
}
}
}
-void ChunkManager::getAllShardIds(set<ShardId>* all) const {
- std::transform(_shardVersions.begin(),
- _shardVersions.end(),
+void ChunkManager::getAllShardIds(std::set<ShardId>* all) const {
+ std::transform(_chunkMapViews.shardVersions.begin(),
+ _chunkMapViews.shardVersions.end(),
std::inserter(*all, all->begin()),
[](const ShardVersionMap::value_type& pair) { return pair.first; });
}
@@ -457,7 +211,7 @@ IndexBounds ChunkManager::getIndexBoundsForQuery(const BSONObj& key,
}
// Consider shard key as an index
- string accessMethod = IndexNames::findPluginName(key);
+ std::string accessMethod = IndexNames::findPluginName(key);
dassert(accessMethod == IndexNames::BTREE || accessMethod == IndexNames::HASHED);
// Use query framework to generate index bounds
@@ -564,19 +318,19 @@ bool ChunkManager::compatibleWith(const ChunkManager& other, const ShardId& shar
}
ChunkVersion ChunkManager::getVersion(const ShardId& shardName) const {
- auto it = _shardVersions.find(shardName);
- if (it == _shardVersions.end()) {
+ auto it = _chunkMapViews.shardVersions.find(shardName);
+ if (it == _chunkMapViews.shardVersions.end()) {
// Shards without explicitly tracked shard versions (meaning they have no chunks) always
// have a version of (0, 0, epoch)
- return ChunkVersion(0, 0, _version.epoch());
+ return ChunkVersion(0, 0, _collectionVersion.epoch());
}
return it->second;
}
-string ChunkManager::toString() const {
+std::string ChunkManager::toString() const {
StringBuilder sb;
- sb << "ChunkManager: " << _nss.ns() << " key:" << _keyPattern.toString() << '\n';
+ sb << "ChunkManager: " << _nss.ns() << " key:" << _shardKeyPattern.toString() << '\n';
for (const auto& entry : _chunkMap) {
sb << "\t" << entry.second->toString() << '\n';
@@ -585,47 +339,82 @@ string ChunkManager::toString() const {
return sb.str();
}
-ChunkManager::ChunkRangeMap ChunkManager::_constructRanges(const ChunkMap& chunkMap) {
+ChunkManager::ChunkMapViews ChunkManager::_constructChunkMapViews(const OID& epoch,
+ const ChunkMap& chunkMap) {
+ invariant(!chunkMap.empty());
+
ChunkRangeMap chunkRangeMap =
SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ShardAndChunkRange>();
- if (chunkMap.empty()) {
- return chunkRangeMap;
- }
+ ShardVersionMap shardVersions;
ChunkMap::const_iterator current = chunkMap.cbegin();
while (current != chunkMap.cend()) {
- const auto rangeFirst = current;
+ const auto& firstChunkInRange = current->second;
+
+ // Tracks the max shard version for the shard on which the current range will reside
+ auto shardVersionIt = shardVersions.find(firstChunkInRange->getShardId());
+ if (shardVersionIt == shardVersions.end()) {
+ shardVersionIt =
+ shardVersions.emplace(firstChunkInRange->getShardId(), ChunkVersion(0, 0, epoch))
+ .first;
+ }
+
+ auto& maxShardVersion = shardVersionIt->second;
+
current = std::find_if(
- current, chunkMap.cend(), [&rangeFirst](const ChunkMap::value_type& chunkMapEntry) {
- return chunkMapEntry.second->getShardId() != rangeFirst->second->getShardId();
+ current,
+ chunkMap.cend(),
+ [&firstChunkInRange, &maxShardVersion](const ChunkMap::value_type& chunkMapEntry) {
+ const auto& currentChunk = chunkMapEntry.second;
+
+ if (currentChunk->getShardId() != firstChunkInRange->getShardId())
+ return true;
+
+ if (currentChunk->getLastmod() > maxShardVersion)
+ maxShardVersion = currentChunk->getLastmod();
+
+ return false;
});
+
const auto rangeLast = std::prev(current);
- const BSONObj rangeMin = rangeFirst->second->getMin();
+ const BSONObj rangeMin = firstChunkInRange->getMin();
const BSONObj rangeMax = rangeLast->second->getMax();
- auto insertResult = chunkRangeMap.insert(std::make_pair(
- rangeMax, ShardAndChunkRange(rangeMin, rangeMax, rangeFirst->second->getShardId())));
- invariant(insertResult.second);
- if (insertResult.first != chunkRangeMap.begin()) {
+ const auto insertResult = chunkRangeMap.insert(std::make_pair(
+ rangeMax, ShardAndChunkRange{{rangeMin, rangeMax}, firstChunkInRange->getShardId()}));
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Metadata contains two chunks with the same max value "
+ << rangeMax,
+ insertResult.second);
+
+ const auto& insertIterator = insertResult.first;
+
+ if (insertIterator != chunkRangeMap.begin()) {
// Make sure there are no gaps in the ranges
- insertResult.first--;
- invariant(
- SimpleBSONObjComparator::kInstance.evaluate(insertResult.first->first == rangeMin));
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Gap or an overlap between ranges "
+ << insertIterator->second.range.toString()
+ << " and "
+ << std::prev(insertIterator)->second.range.toString(),
+ SimpleBSONObjComparator::kInstance.evaluate(std::prev(insertIterator)->first ==
+ rangeMin));
}
+
+ // If a shard has chunks it must have a shard version, otherwise we have an invalid chunk
+ // somewhere, which should have been caught at chunk load time
+ invariant(maxShardVersion.isSet());
}
invariant(!chunkRangeMap.empty());
- invariant(allOfType(MinKey, chunkRangeMap.begin()->second.getMin()));
- invariant(allOfType(MaxKey, chunkRangeMap.rbegin()->first));
+ invariant(!shardVersions.empty());
- return chunkRangeMap;
-}
+ checkAllElementsAreOfType(MinKey, chunkRangeMap.begin()->second.min());
+ checkAllElementsAreOfType(MaxKey, chunkRangeMap.rbegin()->first);
-repl::OpTime ChunkManager::getConfigOpTime() const {
- return _configOpTime;
+ return {std::move(chunkRangeMap), std::move(shardVersions)};
}
} // namespace mongo
diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h
index 365d4d5df62..f1edeefc668 100644
--- a/src/mongo/s/chunk_manager.h
+++ b/src/mongo/s/chunk_manager.h
@@ -35,8 +35,6 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/collation/collator_interface.h"
-#include "mongo/db/repl/optime.h"
-#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/chunk.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/client/shard.h"
@@ -60,10 +58,11 @@ class ChunkManager {
public:
ChunkManager(NamespaceString nss,
- const OID& epoch,
- const ShardKeyPattern& shardKeyPattern,
+ KeyPattern shardKeyPattern,
std::unique_ptr<CollatorInterface> defaultCollator,
- bool unique);
+ bool unique,
+ ChunkMap chunkMap,
+ ChunkVersion collectionVersion);
~ChunkManager();
@@ -79,7 +78,7 @@ public:
}
const ShardKeyPattern& getShardKeyPattern() const {
- return _keyPattern;
+ return _shardKeyPattern;
}
const CollatorInterface* getDefaultCollator() const {
@@ -91,10 +90,12 @@ public:
}
ChunkVersion getVersion() const {
- return _version;
+ return _collectionVersion;
}
- const ChunkMap& getChunkMap() const {
+ ChunkVersion getVersion(const ShardId& shardId) const;
+
+ const ChunkMap& chunkMap() const {
return _chunkMap;
}
@@ -102,12 +103,9 @@ public:
return _chunkMap.size();
}
- // Loads existing ranges based on info in chunk manager
- void loadExistingRanges(OperationContext* opCtx, const ChunkManager* oldManager);
-
- //
- // Methods to use once loaded / created
- //
+ const ShardVersionMap& shardVersions() const {
+ return _chunkMapViews.shardVersions;
+ }
/**
* Given a shard key (or a prefix) that has been extracted from a document, returns the chunk
@@ -177,57 +175,46 @@ public:
std::string toString() const;
- ChunkVersion getVersion(const ShardId& shardName) const;
-
- /**
- * Returns the opTime of config server the last time chunks were loaded.
- */
- repl::OpTime getConfigOpTime() const;
-
private:
+ friend class CollectionRoutingDataLoader;
+
/**
* Represents a range of chunk keys [getMin(), getMax()) and the id of the shard on which they
* reside according to the metadata.
*/
- class ShardAndChunkRange {
- public:
- ShardAndChunkRange(const BSONObj& min, const BSONObj& max, ShardId inShardId)
- : _range(min, max), _shardId(std::move(inShardId)) {}
-
- const BSONObj& getMin() const {
- return _range.getMin();
- }
-
- const BSONObj& getMax() const {
- return _range.getMax();
+ struct ShardAndChunkRange {
+ const BSONObj& min() const {
+ return range.getMin();
}
- const ShardId& getShardId() const {
- return _shardId;
+ const BSONObj& max() const {
+ return range.getMax();
}
- private:
- ChunkRange _range;
- ShardId _shardId;
+ ChunkRange range;
+ ShardId shardId;
};
using ChunkRangeMap = BSONObjIndexedMap<ShardAndChunkRange>;
/**
- * If load was successful, returns true and it is guaranteed that the _chunkMap and
- * _chunkRangeMap are consistent with each other. If false is returned, it is not safe to use
- * the chunk manager anymore.
+ * Contains different transformations of the chunk map for efficient querying
*/
- bool _load(OperationContext* opCtx,
- ChunkMap& chunks,
- std::set<ShardId>& shardIds,
- ShardVersionMap* shardVersions,
- const ChunkManager* oldManager);
+ struct ChunkMapViews {
+ // Transformation of the chunk map containing what range of keys reside on which shard. The
+ // index is the max key of the respective range and the union of all ranges in a such
+ // constructed map must cover the complete space from [MinKey, MaxKey).
+ const ChunkRangeMap chunkRangeMap;
+
+ // Map from shard id to the maximum chunk version for that shard. If a shard contains no
+ // chunks, it won't be present in this map.
+ const ShardVersionMap shardVersions;
+ };
/**
- * Merges consecutive chunks, which reside on the same shard into a single range.
+ * Does a single pass over the chunkMap and constructs the ChunkMapViews object.
*/
- static ChunkRangeMap _constructRanges(const ChunkMap& chunkMap);
+ static ChunkMapViews _constructChunkMapViews(const OID& epoch, const ChunkMap& chunkMap);
// The shard versioning mechanism hinges on keeping track of the number of times we reload
// ChunkManagers.
@@ -237,7 +224,7 @@ private:
const NamespaceString _nss;
// The key pattern used to shard the collection
- const ShardKeyPattern _keyPattern;
+ const ShardKeyPattern _shardKeyPattern;
// Default collation to use for routing data queries for this collection
const std::unique_ptr<CollatorInterface> _defaultCollator;
@@ -247,23 +234,15 @@ private:
// Map from the max for each chunk to an entry describing the chunk. The union of all chunks'
// ranges must cover the complete space from [MinKey, MaxKey).
- ChunkMap _chunkMap;
-
- // Transformation of the chunk map containing what range of keys reside on which shard. The
- // index is the max key of the respective range and the union of all ranges in a such
- // constructed map must cover the complete space from [MinKey, MaxKey).
- ChunkRangeMap _chunkRangeMap;
+ const ChunkMap _chunkMap;
- // Max known version per shard
- ShardVersionMap _shardVersions;
+ // Different transformations of the chunk map for efficient querying
+ const ChunkMapViews _chunkMapViews;
// Max version across all chunks
- ChunkVersion _version;
+ const ChunkVersion _collectionVersion;
- // OpTime of config server the last time chunks were loaded.
- repl::OpTime _configOpTime;
-
- // Auto-split throttling state
+ // Auto-split throttling state (state mutable by write commands)
struct AutoSplitThrottle {
public:
AutoSplitThrottle() : _splitTickets(maxParallelSplits) {}
@@ -280,8 +259,6 @@ private:
ChunkManager*,
Chunk*,
long);
-
- friend class TestableChunkManager;
};
} // namespace mongo
diff --git a/src/mongo/s/chunk_manager_test.cpp b/src/mongo/s/chunk_manager_query_test.cpp
index 08f8357b776..013c4618d6c 100644
--- a/src/mongo/s/chunk_manager_test.cpp
+++ b/src/mongo/s/chunk_manager_query_test.cpp
@@ -32,143 +32,13 @@
#include <set>
-#include "mongo/client/remote_command_targeter_mock.h"
-#include "mongo/db/client.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
-#include "mongo/s/catalog/sharding_catalog_test_fixture.h"
-#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/catalog/type_collection.h"
-#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/chunk_manager.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/scopeguard.h"
+#include "mongo/s/chunk_manager_test_fixture.h"
namespace mongo {
namespace {
-using executor::RemoteCommandResponse;
-using executor::RemoteCommandRequest;
-
-const NamespaceString kNss("TestDB", "TestColl");
-
-class ChunkManagerTestFixture : public ShardingCatalogTestFixture {
-protected:
- void setUp() override {
- ShardingCatalogTestFixture::setUp();
- setRemote(HostAndPort("FakeRemoteClient:34567"));
- configTargeter()->setFindHostReturnValue(HostAndPort{CONFIG_HOST_PORT});
- }
-
- /**
- * Returns a chunk manager with chunks at the specified split points. Each individual chunk is
- * placed on a separate shard with id ranging from "0" to the number of chunks.
- */
- std::unique_ptr<ChunkManager> makeChunkManager(
- const ShardKeyPattern& shardKeyPattern,
- std::unique_ptr<CollatorInterface> defaultCollator,
- bool unique,
- const std::vector<BSONObj>& splitPoints) {
- ChunkVersion version(1, 0, OID::gen());
-
- std::vector<BSONObj> shards;
- std::vector<BSONObj> initialChunks;
-
- auto splitPointsIncludingEnds(splitPoints);
- splitPointsIncludingEnds.insert(splitPointsIncludingEnds.begin(),
- shardKeyPattern.getKeyPattern().globalMin());
- splitPointsIncludingEnds.push_back(shardKeyPattern.getKeyPattern().globalMax());
-
- for (size_t i = 1; i < splitPointsIncludingEnds.size(); ++i) {
- ShardType shard;
- shard.setName(str::stream() << (i - 1));
- shard.setHost(str::stream() << "Host" << (i - 1) << ":12345");
-
- shards.push_back(shard.toBSON());
-
- ChunkType chunk;
- chunk.setNS(kNss.ns());
- chunk.setMin(shardKeyPattern.getKeyPattern().extendRangeBound(
- splitPointsIncludingEnds[i - 1], false));
- chunk.setMax(shardKeyPattern.getKeyPattern().extendRangeBound(
- splitPointsIncludingEnds[i], false));
- chunk.setShard(shard.getName());
- chunk.setVersion(version);
-
- initialChunks.push_back(chunk.toConfigBSON());
-
- version.incMajor();
- }
-
- // Load the initial manager
- auto manager = stdx::make_unique<ChunkManager>(
- kNss, version.epoch(), shardKeyPattern, std::move(defaultCollator), unique);
-
- auto future = launchAsync([&manager] {
- ON_BLOCK_EXIT([&] { Client::destroy(); });
- Client::initThread("Test");
- auto opCtx = cc().makeOperationContext();
- manager->loadExistingRanges(opCtx.get(), nullptr);
- });
-
- expectFindOnConfigSendBSONObjVector(initialChunks);
- expectFindOnConfigSendBSONObjVector(shards);
-
- future.timed_get(kFutureTimeout);
-
- return manager;
- }
-};
-
-using ChunkManagerLoadTest = ChunkManagerTestFixture;
-
-TEST_F(ChunkManagerLoadTest, IncrementalLoadAfterSplit) {
- const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
-
- auto initialManager(makeChunkManager(shardKeyPattern, nullptr, true, {}));
-
- ChunkVersion version = initialManager->getVersion();
-
- CollectionType collType;
- collType.setNs(kNss);
- collType.setEpoch(version.epoch());
- collType.setUpdatedAt(jsTime());
- collType.setKeyPattern(shardKeyPattern.toBSON());
- collType.setUnique(false);
-
- ChunkManager manager(kNss, version.epoch(), shardKeyPattern, nullptr, true);
-
- auto future =
- launchAsync([&] { manager.loadExistingRanges(operationContext(), initialManager.get()); });
-
- // Return set of chunks, which represent a split
- expectFindOnConfigSendBSONObjVector([&]() {
- version.incMajor();
-
- ChunkType chunk1;
- chunk1.setNS(kNss.ns());
- chunk1.setMin(shardKeyPattern.getKeyPattern().globalMin());
- chunk1.setMax(BSON("_id" << 0));
- chunk1.setShard({"0"});
- chunk1.setVersion(version);
-
- version.incMinor();
-
- ChunkType chunk2;
- chunk2.setNS(kNss.ns());
- chunk2.setMin(BSON("_id" << 0));
- chunk2.setMax(shardKeyPattern.getKeyPattern().globalMax());
- chunk2.setShard({"0"});
- chunk2.setVersion(version);
-
- return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
- }());
-
- future.timed_get(kFutureTimeout);
-}
-
-/**
- * Fixture to be used as a shortcut for tests which exercise the getShardIdsForQuery routing logic
- */
class ChunkManagerQueryTest : public ChunkManagerTestFixture {
protected:
void runQueryTest(const BSONObj& shardKey,
diff --git a/src/mongo/s/chunk_manager_refresh_test.cpp b/src/mongo/s/chunk_manager_refresh_test.cpp
new file mode 100644
index 00000000000..504893acf3c
--- /dev/null
+++ b/src/mongo/s/chunk_manager_refresh_test.cpp
@@ -0,0 +1,194 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include <set>
+
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/chunk_manager_test_fixture.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+namespace {
+
+using ChunkManagerLoadTest = ChunkManagerTestFixture;
+
+TEST_F(ChunkManagerLoadTest, IncrementalLoadAfterSplit) {
+ const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
+
+ auto initialRoutingInfo(makeChunkManager(shardKeyPattern, nullptr, true, {}));
+ ASSERT_EQ(1, initialRoutingInfo->numChunks());
+
+ auto future = launchAsync([&] {
+ auto client = serviceContext()->makeClient("Test");
+ auto opCtx = client->makeOperationContext();
+ return CatalogCache::refreshCollectionRoutingInfo(opCtx.get(), kNss, initialRoutingInfo);
+ });
+
+ ChunkVersion version = initialRoutingInfo->getVersion();
+
+ expectFindOnConfigSendBSONObjVector([&]() {
+ CollectionType collType;
+ collType.setNs(kNss);
+ collType.setEpoch(version.epoch());
+ collType.setKeyPattern(shardKeyPattern.toBSON());
+ collType.setUnique(false);
+
+ return std::vector<BSONObj>{collType.toBSON()};
+ }());
+
+ // Return set of chunks, which represent a split
+ expectFindOnConfigSendBSONObjVector([&]() {
+ version.incMajor();
+ ChunkType chunk1(
+ kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"0"});
+
+ version.incMinor();
+ ChunkType chunk2(
+ kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"});
+
+ return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
+ }());
+
+ auto newRoutingInfo(future.timed_get(kFutureTimeout));
+ ASSERT_EQ(2, newRoutingInfo->numChunks());
+ ASSERT_EQ(version, newRoutingInfo->getVersion());
+ ASSERT_EQ(version, newRoutingInfo->getVersion({"0"}));
+ ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), newRoutingInfo->getVersion({"1"}));
+}
+
+TEST_F(ChunkManagerLoadTest, IncrementalLoadAfterMove) {
+ const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
+
+ auto initialRoutingInfo(makeChunkManager(shardKeyPattern, nullptr, true, {BSON("_id" << 0)}));
+ ASSERT_EQ(2, initialRoutingInfo->numChunks());
+
+ auto future = launchAsync([&] {
+ auto client = serviceContext()->makeClient("Test");
+ auto opCtx = client->makeOperationContext();
+ return CatalogCache::refreshCollectionRoutingInfo(opCtx.get(), kNss, initialRoutingInfo);
+ });
+
+ ChunkVersion version = initialRoutingInfo->getVersion();
+
+ expectFindOnConfigSendBSONObjVector([&]() {
+ CollectionType collType;
+ collType.setNs(kNss);
+ collType.setEpoch(version.epoch());
+ collType.setKeyPattern(shardKeyPattern.toBSON());
+ collType.setUnique(false);
+
+ return std::vector<BSONObj>{collType.toBSON()};
+ }());
+
+ ChunkVersion expectedDestShardVersion;
+
+ // Return set of chunks, which represent a move
+ expectFindOnConfigSendBSONObjVector([&]() {
+ version.incMajor();
+ expectedDestShardVersion = version;
+ ChunkType chunk1(
+ kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"1"});
+
+ version.incMinor();
+ ChunkType chunk2(
+ kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"});
+
+ return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
+ }());
+
+ auto newRoutingInfo(future.timed_get(kFutureTimeout));
+ ASSERT_EQ(2, newRoutingInfo->numChunks());
+ ASSERT_EQ(version, newRoutingInfo->getVersion());
+ ASSERT_EQ(version, newRoutingInfo->getVersion({"0"}));
+ ASSERT_EQ(expectedDestShardVersion, newRoutingInfo->getVersion({"1"}));
+}
+
+TEST_F(ChunkManagerLoadTest, IncrementalLoadAfterMoveLastChunk) {
+ const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
+
+ auto initialRoutingInfo(makeChunkManager(shardKeyPattern, nullptr, true, {}));
+ ASSERT_EQ(1, initialRoutingInfo->numChunks());
+
+ auto future = launchAsync([&] {
+ auto client = serviceContext()->makeClient("Test");
+ auto opCtx = client->makeOperationContext();
+ return CatalogCache::refreshCollectionRoutingInfo(opCtx.get(), kNss, initialRoutingInfo);
+ });
+
+ ChunkVersion version = initialRoutingInfo->getVersion();
+
+ expectFindOnConfigSendBSONObjVector([&]() {
+ CollectionType collType;
+ collType.setNs(kNss);
+ collType.setEpoch(version.epoch());
+ collType.setKeyPattern(shardKeyPattern.toBSON());
+ collType.setUnique(false);
+
+ return std::vector<BSONObj>{collType.toBSON()};
+ }());
+
+ // Return set of chunks, which represent a move
+ expectFindOnConfigSendBSONObjVector([&]() {
+ version.incMajor();
+ ChunkType chunk1(kNss,
+ {shardKeyPattern.getKeyPattern().globalMin(),
+ shardKeyPattern.getKeyPattern().globalMax()},
+ version,
+ {"1"});
+
+ return std::vector<BSONObj>{chunk1.toConfigBSON()};
+ }());
+
+ expectFindOnConfigSendBSONObjVector([&]() {
+ ShardType shard1;
+ shard1.setName("0");
+ shard1.setHost(str::stream() << "Host0:12345");
+
+ ShardType shard2;
+ shard2.setName("1");
+ shard2.setHost(str::stream() << "Host1:12345");
+
+ return std::vector<BSONObj>{shard1.toBSON(), shard2.toBSON()};
+ }());
+
+ auto newRoutingInfo(future.timed_get(kFutureTimeout));
+ ASSERT_EQ(1, newRoutingInfo->numChunks());
+ ASSERT_EQ(version, newRoutingInfo->getVersion());
+ ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), newRoutingInfo->getVersion({"0"}));
+ ASSERT_EQ(version, newRoutingInfo->getVersion({"1"}));
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/chunk_manager_test_fixture.cpp b/src/mongo/s/chunk_manager_test_fixture.cpp
new file mode 100644
index 00000000000..ada08673d70
--- /dev/null
+++ b/src/mongo/s/chunk_manager_test_fixture.cpp
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include <set>
+#include <vector>
+
+#include "mongo/s/chunk_manager_test_fixture.h"
+
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/client.h"
+#include "mongo/db/query/collation/collator_factory_mock.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+const NamespaceString ChunkManagerTestFixture::kNss("TestDB", "TestColl");
+
+void ChunkManagerTestFixture::setUp() {
+ ShardingCatalogTestFixture::setUp();
+ setRemote(HostAndPort("FakeRemoteClient:34567"));
+ configTargeter()->setFindHostReturnValue(HostAndPort{CONFIG_HOST_PORT});
+
+ CollatorFactoryInterface::set(serviceContext(), stdx::make_unique<CollatorFactoryMock>());
+}
+
+std::shared_ptr<ChunkManager> ChunkManagerTestFixture::makeChunkManager(
+ const ShardKeyPattern& shardKeyPattern,
+ std::unique_ptr<CollatorInterface> defaultCollator,
+ bool unique,
+ const std::vector<BSONObj>& splitPoints) {
+ ChunkVersion version(1, 0, OID::gen());
+
+ const BSONObj collectionBSON = [&]() {
+ CollectionType coll;
+ coll.setNs(kNss);
+ coll.setEpoch(version.epoch());
+ coll.setKeyPattern(shardKeyPattern.getKeyPattern());
+ coll.setUnique(unique);
+
+ if (defaultCollator) {
+ coll.setDefaultCollation(defaultCollator->getSpec().toBSON());
+ }
+
+ return coll.toBSON();
+ }();
+
+ std::vector<BSONObj> shards;
+ std::vector<BSONObj> initialChunks;
+
+ auto splitPointsIncludingEnds(splitPoints);
+ splitPointsIncludingEnds.insert(splitPointsIncludingEnds.begin(),
+ shardKeyPattern.getKeyPattern().globalMin());
+ splitPointsIncludingEnds.push_back(shardKeyPattern.getKeyPattern().globalMax());
+
+ for (size_t i = 1; i < splitPointsIncludingEnds.size(); ++i) {
+ ShardType shard;
+ shard.setName(str::stream() << (i - 1));
+ shard.setHost(str::stream() << "Host" << (i - 1) << ":12345");
+
+ shards.push_back(shard.toBSON());
+
+ ChunkType chunk(
+ kNss,
+ {shardKeyPattern.getKeyPattern().extendRangeBound(splitPointsIncludingEnds[i - 1],
+ false),
+ shardKeyPattern.getKeyPattern().extendRangeBound(splitPointsIncludingEnds[i], false)},
+ version,
+ shard.getName());
+
+ initialChunks.push_back(chunk.toConfigBSON());
+
+ version.incMajor();
+ }
+
+ auto future = launchAsync([&] {
+ auto client = serviceContext()->makeClient("Test");
+ auto opCtx = client->makeOperationContext();
+ return CatalogCache::refreshCollectionRoutingInfo(opCtx.get(), kNss, nullptr);
+ });
+
+ expectFindOnConfigSendBSONObjVector({collectionBSON});
+ expectFindOnConfigSendBSONObjVector(initialChunks);
+ expectFindOnConfigSendBSONObjVector(shards);
+
+ return future.timed_get(kFutureTimeout);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/chunk_manager_test_fixture.h b/src/mongo/s/chunk_manager_test_fixture.h
new file mode 100644
index 00000000000..aaa059dd49d
--- /dev/null
+++ b/src/mongo/s/chunk_manager_test_fixture.h
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/db/namespace_string.h"
+#include "mongo/s/catalog/sharding_catalog_test_fixture.h"
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+
+class BSONObj;
+class ChunkManager;
+class CollatorInterface;
+class ShardKeyPattern;
+
+class ChunkManagerTestFixture : public ShardingCatalogTestFixture {
+protected:
+ void setUp() override;
+
+ /**
+ * Returns a chunk manager with chunks at the specified split points. Each individual chunk is
+ * placed on a separate shard with shard id being a single number ranging from "0" to the number
+ * of chunks.
+ */
+ std::shared_ptr<ChunkManager> makeChunkManager(
+ const ShardKeyPattern& shardKeyPattern,
+ std::unique_ptr<CollatorInterface> defaultCollator,
+ bool unique,
+ const std::vector<BSONObj>& splitPoints);
+
+ static const NamespaceString kNss;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp
index 89d2836dc92..9e2aaaaf4e5 100644
--- a/src/mongo/s/client/parallel.cpp
+++ b/src/mongo/s/client/parallel.cpp
@@ -39,11 +39,9 @@
#include "mongo/db/bson/dotted_path_support.h"
#include "mongo/db/query/query_request.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
#include "mongo/util/net/socket_exception.h"
@@ -321,42 +319,20 @@ void ParallelSortClusteredCursor::fullInit(OperationContext* opCtx) {
finishInit(opCtx);
}
-void ParallelSortClusteredCursor::_markStaleNS(OperationContext* opCtx,
- const NamespaceString& staleNS,
- const StaleConfigException& e,
- bool& forceReload) {
- if (e.requiresFullReload()) {
- Grid::get(opCtx)->catalogCache()->invalidate(staleNS.db());
- }
-
- if (_staleNSMap.find(staleNS.ns()) == _staleNSMap.end())
+void ParallelSortClusteredCursor::_markStaleNS(const NamespaceString& staleNS,
+ const StaleConfigException& e) {
+ if (_staleNSMap.find(staleNS.ns()) == _staleNSMap.end()) {
_staleNSMap[staleNS.ns()] = 1;
+ }
- int tries = ++_staleNSMap[staleNS.ns()];
+ const int tries = ++_staleNSMap[staleNS.ns()];
if (tries >= 5) {
throw SendStaleConfigException(staleNS.ns(),
- str::stream() << "too many retries of stale version info",
+ "too many retries of stale version info",
e.getVersionReceived(),
e.getVersionWanted());
}
-
- forceReload = tries > 2;
-}
-
-void ParallelSortClusteredCursor::_handleStaleNS(OperationContext* opCtx,
- const NamespaceString& staleNS,
- bool forceReload) {
- auto scopedCMStatus = ScopedChunkManager::get(opCtx, staleNS);
- if (!scopedCMStatus.isOK()) {
- log() << "cannot reload database info for stale namespace " << staleNS.ns();
- return;
- }
-
- const auto& scopedCM = scopedCMStatus.getValue();
-
- // Reload chunk manager, potentially forcing the namespace
- scopedCM.db()->getChunkManagerIfExists(opCtx, staleNS.ns(), true, forceReload);
}
void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(
@@ -459,12 +435,12 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
shared_ptr<Shard> primary;
{
- auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss);
- if (scopedCMStatus != ErrorCodes::NamespaceNotFound) {
- uassertStatusOK(scopedCMStatus.getStatus());
- const auto& scopedCM = scopedCMStatus.getValue();
- manager = scopedCM.cm();
- primary = scopedCM.primary();
+ auto routingInfoStatus =
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss);
+ if (routingInfoStatus != ErrorCodes::NamespaceNotFound) {
+ auto routingInfo = uassertStatusOK(std::move(routingInfoStatus));
+ manager = routingInfo.cm();
+ primary = routingInfo.primary();
}
}
@@ -642,20 +618,17 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
if (staleNS.size() == 0)
staleNS = nss; // ns is the *versioned* namespace, be careful of this
- // Probably need to retry fully
- bool forceReload;
- _markStaleNS(opCtx, staleNS, e, forceReload);
+ _markStaleNS(staleNS, e);
+ Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNS);
- LOG(1) << "stale config of ns " << staleNS
- << " during initialization, will retry with forced : " << forceReload
+ LOG(1) << "stale config of ns " << staleNS << " during initialization, will retry"
<< causedBy(redact(e));
// This is somewhat strange
- if (staleNS != nss)
+ if (staleNS != nss) {
warning() << "versioned ns " << nss.ns() << " doesn't match stale config namespace "
<< staleNS;
-
- _handleStaleNS(opCtx, staleNS, forceReload);
+ }
// Restart with new chunk manager
startInit(opCtx);
@@ -860,26 +833,21 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) {
if (retry) {
// Refresh stale namespaces
if (staleNSExceptions.size()) {
- for (map<string, StaleConfigException>::iterator i = staleNSExceptions.begin(),
- end = staleNSExceptions.end();
- i != end;
- ++i) {
- NamespaceString staleNS(i->first);
- const StaleConfigException& exception = i->second;
+ for (const auto& exEntry : staleNSExceptions) {
+ const NamespaceString staleNS(exEntry.first);
+ const StaleConfigException& ex = exEntry.second;
- bool forceReload;
- _markStaleNS(opCtx, staleNS, exception, forceReload);
+ _markStaleNS(staleNS, ex);
+ Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNS);
- LOG(1) << "stale config of ns " << staleNS
- << " on finishing query, will retry with forced : " << forceReload
- << causedBy(redact(exception));
+ LOG(1) << "stale config of ns " << staleNS << " on finishing query, will retry"
+ << causedBy(redact(ex));
// This is somewhat strange
- if (staleNS != ns)
+ if (staleNS != ns) {
warning() << "versioned ns " << ns << " doesn't match stale config namespace "
<< staleNS;
-
- _handleStaleNS(opCtx, staleNS, forceReload);
+ }
}
}
diff --git a/src/mongo/s/client/parallel.h b/src/mongo/s/client/parallel.h
index d375858bae0..d1680f1e74f 100644
--- a/src/mongo/s/client/parallel.h
+++ b/src/mongo/s/client/parallel.h
@@ -117,11 +117,7 @@ private:
void _finishCons();
- void _markStaleNS(OperationContext* opCtx,
- const NamespaceString& staleNS,
- const StaleConfigException& e,
- bool& forceReload);
- void _handleStaleNS(OperationContext* opCtx, const NamespaceString& staleNS, bool forceReload);
+ void _markStaleNS(const NamespaceString& staleNS, const StaleConfigException& e);
bool _didInit;
bool _done;
diff --git a/src/mongo/s/client/version_manager.cpp b/src/mongo/s/client/version_manager.cpp
index 299a89f5941..7f97716f620 100644
--- a/src/mongo/s/client/version_manager.cpp
+++ b/src/mongo/s/client/version_manager.cpp
@@ -36,13 +36,13 @@
#include "mongo/db/namespace_string.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/is_mongos.h"
#include "mongo/s/set_shard_version_request.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
@@ -257,21 +257,21 @@ bool checkShardVersion(OperationContext* opCtx,
const NamespaceString nss(ns);
+ auto const catalogCache = Grid::get(opCtx)->catalogCache();
+
if (authoritative) {
- ScopedChunkManager::refreshAndGet(opCtx, nss);
+ Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss);
}
- auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss);
-
- if (!scopedCMStatus.isOK()) {
+ auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, nss);
+ if (!routingInfoStatus.isOK()) {
return false;
}
- const auto& scopedCM = scopedCMStatus.getValue();
+ auto& routingInfo = routingInfoStatus.getValue();
- auto conf = scopedCM.db();
- const auto manager = scopedCM.cm();
- const auto primary = scopedCM.primary();
+ const auto manager = routingInfo.cm();
+ const auto primary = routingInfo.primary();
unsigned long long officialSequenceNumber = 0;
@@ -379,16 +379,7 @@ bool checkShardVersion(OperationContext* opCtx,
return true;
}
- if (result["reloadConfig"].trueValue()) {
- if (result["version"].timestampTime() == Date_t()) {
- warning() << "reloading full configuration for " << conf->name()
- << ", connection state indicates significant version changes";
-
- Grid::get(opCtx)->catalogCache()->invalidate(nss.db());
- }
-
- conf->getChunkManager(opCtx, nss.ns(), true);
- }
+ Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
const int maxNumTries = 7;
if (tryNumber < maxNumTries) {
diff --git a/src/mongo/s/commands/chunk_manager_targeter.cpp b/src/mongo/s/commands/chunk_manager_targeter.cpp
index 609dee87d9e..42de7ba2903 100644
--- a/src/mongo/s/commands/chunk_manager_targeter.cpp
+++ b/src/mongo/s/commands/chunk_manager_targeter.cpp
@@ -32,31 +32,18 @@
#include "mongo/s/commands/chunk_manager_targeter.h"
-#include <boost/thread/tss.hpp>
-
#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/collation/collation_index_key.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
-#include "mongo/s/sharding_raii.h"
-#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
-
-using std::shared_ptr;
-using str::stream;
-using std::map;
-using std::set;
-using std::string;
-using std::vector;
-
namespace {
enum UpdateType { UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown };
@@ -65,11 +52,6 @@ enum CompareResult { CompareResult_Unknown, CompareResult_GTE, CompareResult_LT
const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1));
-// To match legacy reload behavior, we have to backoff on config reload per-thread
-// TODO: Centralize this behavior better by refactoring config reload in mongos
-boost::thread_specific_ptr<Backoff> perThreadBackoff;
-const int maxWaitMillis = 500;
-
/**
* There are two styles of update expressions:
*
@@ -138,15 +120,6 @@ bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, ChunkM
return true;
}
-void refreshBackoff() {
- if (!perThreadBackoff.get()) {
- perThreadBackoff.reset(new Backoff(maxWaitMillis, maxWaitMillis * 2));
- }
-
- perThreadBackoff.get()->nextSleepMillis();
-}
-
-
//
// Utilities to compare shard versions
//
@@ -173,25 +146,19 @@ CompareResult compareShardVersions(const ChunkVersion& shardVersionA,
return CompareResult_Unknown;
}
- if (shardVersionA < shardVersionB) {
+ if (shardVersionA < shardVersionB)
return CompareResult_LT;
- }
-
else
return CompareResult_GTE;
}
-ChunkVersion getShardVersion(StringData shardName,
- const ChunkManager* manager,
- const Shard* primary) {
- dassert(!(manager && primary));
- dassert(manager || primary);
-
- if (primary) {
- return ChunkVersion::UNSHARDED();
+ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo,
+ const ShardId& shardId) {
+ if (routingInfo.cm()) {
+ return routingInfo.cm()->getVersion(shardId);
}
- return manager->getVersion(shardName.toString());
+ return ChunkVersion::UNSHARDED();
}
/**
@@ -205,26 +172,21 @@ ChunkVersion getShardVersion(StringData shardName,
* Note that the signature here is weird since our cached map of chunk versions is stored in a
* ChunkManager or is implicit in the primary shard of the collection.
*/
-CompareResult compareAllShardVersions(const ChunkManager* cachedChunkManager,
- const Shard* cachedPrimary,
- const map<ShardId, ChunkVersion>& remoteShardVersions) {
+CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routingInfo,
+ const ShardVersionMap& remoteShardVersions) {
CompareResult finalResult = CompareResult_GTE;
- for (map<ShardId, ChunkVersion>::const_iterator it = remoteShardVersions.begin();
- it != remoteShardVersions.end();
- ++it) {
- // Get the remote and cached version for the next shard
- const ShardId& shardName = it->first;
- const ChunkVersion& remoteShardVersion = it->second;
+ for (const auto& shardVersionEntry : remoteShardVersions) {
+ const ShardId& shardId = shardVersionEntry.first;
+ const ChunkVersion& remoteShardVersion = shardVersionEntry.second;
ChunkVersion cachedShardVersion;
try {
// Throws b/c shard constructor throws
- cachedShardVersion =
- getShardVersion(shardName.toString(), cachedChunkManager, cachedPrimary);
+ cachedShardVersion = getShardVersion(routingInfo, shardId);
} catch (const DBException& ex) {
- warning() << "could not lookup shard " << shardName
+ warning() << "could not lookup shard " << shardId
<< " in local cache, shard metadata may have changed"
<< " or be unavailable" << causedBy(ex);
@@ -236,6 +198,7 @@ CompareResult compareAllShardVersions(const ChunkManager* cachedChunkManager,
if (result == CompareResult_Unknown)
return result;
+
if (result == CompareResult_LT)
finalResult = CompareResult_LT;
@@ -248,10 +211,10 @@ CompareResult compareAllShardVersions(const ChunkManager* cachedChunkManager,
/**
* Whether or not the manager/primary pair is different from the other manager/primary pair.
*/
-bool isMetadataDifferent(const shared_ptr<ChunkManager>& managerA,
- const shared_ptr<Shard>& primaryA,
- const shared_ptr<ChunkManager>& managerB,
- const shared_ptr<Shard>& primaryB) {
+bool isMetadataDifferent(const std::shared_ptr<ChunkManager>& managerA,
+ const std::shared_ptr<Shard>& primaryA,
+ const std::shared_ptr<ChunkManager>& managerB,
+ const std::shared_ptr<Shard>& primaryB) {
if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) ||
(!primaryA && primaryB))
return true;
@@ -268,10 +231,10 @@ bool isMetadataDifferent(const shared_ptr<ChunkManager>& managerA,
* Whether or not the manager/primary pair was changed or refreshed from a previous version
* of the metadata.
*/
-bool wasMetadataRefreshed(const shared_ptr<ChunkManager>& managerA,
- const shared_ptr<Shard>& primaryA,
- const shared_ptr<ChunkManager>& managerB,
- const shared_ptr<Shard>& primaryB) {
+bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA,
+ const std::shared_ptr<Shard>& primaryA,
+ const std::shared_ptr<ChunkManager>& managerB,
+ const std::shared_ptr<Shard>& primaryB) {
if (isMetadataDifferent(managerA, primaryA, managerB, primaryB))
return true;
@@ -290,14 +253,18 @@ ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss, TargeterS
Status ChunkManagerTargeter::init(OperationContext* opCtx) {
- auto scopedCMStatus = ScopedChunkManager::getOrCreate(opCtx, _nss);
- if (!scopedCMStatus.isOK()) {
- return scopedCMStatus.getStatus();
+ auto shardDbStatus = createShardDatabase(opCtx, _nss.db());
+ if (!shardDbStatus.isOK()) {
+ return shardDbStatus.getStatus();
}
- const auto& scopedCM = scopedCMStatus.getValue();
- _manager = scopedCM.cm();
- _primary = scopedCM.primary();
+ const auto routingInfoStatus =
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, _nss);
+ if (!routingInfoStatus.isOK()) {
+ return routingInfoStatus.getStatus();
+ }
+
+ _routingInfo = std::move(routingInfoStatus.getValue());
return Status::OK();
}
@@ -311,21 +278,21 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
ShardEndpoint** endpoint) const {
BSONObj shardKey;
- if (_manager) {
+ if (_routingInfo->cm()) {
//
// Sharded collections have the following requirements for targeting:
//
// Inserts must contain the exact shard key.
//
- shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(doc);
+ shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(doc);
// Check shard key exists
if (shardKey.isEmpty()) {
- return Status(ErrorCodes::ShardKeyNotFound,
- stream() << "document " << doc
- << " does not contain shard key for pattern "
- << _manager->getShardKeyPattern().toString());
+ return {ErrorCodes::ShardKeyNotFound,
+ str::stream() << "document " << doc
+ << " does not contain shard key for pattern "
+ << _routingInfo->cm()->getShardKeyPattern().toString()};
}
// Check shard key size on insert
@@ -338,13 +305,13 @@ Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
if (!shardKey.isEmpty()) {
*endpoint = targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize()).release();
} else {
- if (!_primary) {
+ if (!_routingInfo->primary()) {
return Status(ErrorCodes::NamespaceNotFound,
str::stream() << "could not target insert in collection " << getNS().ns()
<< "; no metadata found");
}
- *endpoint = new ShardEndpoint(_primary->getId(), ChunkVersion::UNSHARDED());
+ *endpoint = new ShardEndpoint(_routingInfo->primary()->getId(), ChunkVersion::UNSHARDED());
}
return Status::OK();
@@ -376,14 +343,14 @@ Status ChunkManagerTargeter::targetUpdate(
UpdateType updateType = getUpdateExprType(updateDoc.getUpdateExpr());
if (updateType == UpdateType_Unknown) {
- return Status(ErrorCodes::UnsupportedFormat,
- stream() << "update document " << updateExpr
- << " has mixed $operator and non-$operator style fields");
+ return {ErrorCodes::UnsupportedFormat,
+ str::stream() << "update document " << updateExpr
+ << " has mixed $operator and non-$operator style fields"};
}
BSONObj shardKey;
- if (_manager) {
+ if (_routingInfo->cm()) {
//
// Sharded collections have the following futher requirements for targeting:
//
@@ -395,7 +362,7 @@ Status ChunkManagerTargeter::targetUpdate(
if (updateType == UpdateType_OpStyle) {
// Target using the query
StatusWith<BSONObj> status =
- _manager->getShardKeyPattern().extractShardKeyFromQuery(opCtx, query);
+ _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(opCtx, query);
// Bad query
if (!status.isOK())
@@ -404,7 +371,7 @@ Status ChunkManagerTargeter::targetUpdate(
shardKey = status.getValue();
} else {
// Target using the replacement document
- shardKey = _manager->getShardKeyPattern().extractShardKeyFromDoc(updateExpr);
+ shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(updateExpr);
}
// Check shard key size on upsert.
@@ -431,13 +398,13 @@ Status ChunkManagerTargeter::targetUpdate(
// We failed to target a single shard.
// Upserts are required to target a single shard.
- if (_manager && updateDoc.getUpsert()) {
+ if (_routingInfo->cm() && updateDoc.getUpsert()) {
return Status(ErrorCodes::ShardKeyNotFound,
str::stream() << "An upsert on a sharded collection must contain the shard "
"key and have the simple collation. Update request: "
<< updateDoc.toBSON()
<< ", shard key pattern: "
- << _manager->getShardKeyPattern().toString());
+ << _routingInfo->cm()->getShardKeyPattern().toString());
}
// Parse update query.
@@ -454,8 +421,8 @@ Status ChunkManagerTargeter::targetUpdate(
}
// Single (non-multi) updates must target a single shard or be exact-ID.
- if (_manager && !updateDoc.getMulti() &&
- !isExactIdQuery(opCtx, *cq.getValue(), _manager.get())) {
+ if (_routingInfo->cm() && !updateDoc.getMulti() &&
+ !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) {
return Status(ErrorCodes::ShardKeyNotFound,
str::stream()
<< "A single update on a sharded collection must contain an exact "
@@ -464,7 +431,7 @@ Status ChunkManagerTargeter::targetUpdate(
"request: "
<< updateDoc.toBSON()
<< ", shard key pattern: "
- << _manager->getShardKeyPattern().toString());
+ << _routingInfo->cm()->getShardKeyPattern().toString());
}
if (updateType == UpdateType_OpStyle) {
@@ -480,7 +447,7 @@ Status ChunkManagerTargeter::targetDelete(
std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
BSONObj shardKey;
- if (_manager) {
+ if (_routingInfo->cm()) {
//
// Sharded collections have the following further requirements for targeting:
//
@@ -489,7 +456,8 @@ Status ChunkManagerTargeter::targetDelete(
// Get the shard key
StatusWith<BSONObj> status =
- _manager->getShardKeyPattern().extractShardKeyFromQuery(opCtx, deleteDoc.getQuery());
+ _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(opCtx,
+ deleteDoc.getQuery());
// Bad query
if (!status.isOK())
@@ -527,8 +495,8 @@ Status ChunkManagerTargeter::targetDelete(
}
// Single deletes must target a single shard or be exact-ID.
- if (_manager && deleteDoc.getLimit() == 1 &&
- !isExactIdQuery(opCtx, *cq.getValue(), _manager.get())) {
+ if (_routingInfo->cm() && deleteDoc.getLimit() == 1 &&
+ !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) {
return Status(ErrorCodes::ShardKeyNotFound,
str::stream()
<< "A single delete on a sharded collection must contain an exact "
@@ -537,7 +505,7 @@ Status ChunkManagerTargeter::targetDelete(
"request: "
<< deleteDoc.toBSON()
<< ", shard key pattern: "
- << _manager->getShardKeyPattern().toString());
+ << _routingInfo->cm()->getShardKeyPattern().toString());
}
return targetQuery(opCtx, deleteDoc.getQuery(), collation, endpoints);
@@ -558,26 +526,28 @@ Status ChunkManagerTargeter::targetQuery(
const BSONObj& query,
const BSONObj& collation,
std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
- if (!_primary && !_manager) {
- return Status(ErrorCodes::NamespaceNotFound,
- stream() << "could not target query in " << getNS().ns()
- << "; no metadata found");
+ if (!_routingInfo->primary() && !_routingInfo->cm()) {
+ return {ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target query in " << getNS().ns()
+ << "; no metadata found"};
}
- set<ShardId> shardIds;
- if (_manager) {
+ std::set<ShardId> shardIds;
+ if (_routingInfo->cm()) {
try {
- _manager->getShardIdsForQuery(opCtx, query, collation, &shardIds);
+ _routingInfo->cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds);
} catch (const DBException& ex) {
return ex.toStatus();
}
} else {
- shardIds.insert(_primary->getId());
+ shardIds.insert(_routingInfo->primary()->getId());
}
for (const ShardId& shardId : shardIds) {
endpoints->push_back(stdx::make_unique<ShardEndpoint>(
- shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED()));
+ shardId,
+ _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
+ : ChunkVersion::UNSHARDED()));
}
return Status::OK();
@@ -586,7 +556,7 @@ Status ChunkManagerTargeter::targetQuery(
std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey,
const BSONObj& collation,
long long estDataSize) const {
- auto chunk = _manager->findIntersectingChunk(shardKey, collation);
+ auto chunk = _routingInfo->cm()->findIntersectingChunk(shardKey, collation);
// Track autosplit stats for sharded collections
// Note: this is only best effort accounting and is not accurate.
@@ -595,27 +565,29 @@ std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONOb
}
return stdx::make_unique<ShardEndpoint>(chunk->getShardId(),
- _manager->getVersion(chunk->getShardId()));
+ _routingInfo->cm()->getVersion(chunk->getShardId()));
}
Status ChunkManagerTargeter::targetCollection(
std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
- if (!_primary && !_manager) {
- return Status(ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target full range of " << getNS().ns()
- << "; metadata not found");
+ if (!_routingInfo->primary() && !_routingInfo->cm()) {
+ return {ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target full range of " << getNS().ns()
+ << "; metadata not found"};
}
- set<ShardId> shardIds;
- if (_manager) {
- _manager->getAllShardIds(&shardIds);
+ std::set<ShardId> shardIds;
+ if (_routingInfo->cm()) {
+ _routingInfo->cm()->getAllShardIds(&shardIds);
} else {
- shardIds.insert(_primary->getId());
+ shardIds.insert(_routingInfo->primary()->getId());
}
for (const ShardId& shardId : shardIds) {
endpoints->push_back(stdx::make_unique<ShardEndpoint>(
- shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED()));
+ shardId,
+ _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
+ : ChunkVersion::UNSHARDED()));
}
return Status::OK();
@@ -623,19 +595,20 @@ Status ChunkManagerTargeter::targetCollection(
Status ChunkManagerTargeter::targetAllShards(
std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
- if (!_primary && !_manager) {
- return Status(ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target every shard with versions for "
- << getNS().ns()
- << "; metadata not found");
+ if (!_routingInfo->primary() && !_routingInfo->cm()) {
+ return {ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target every shard with versions for " << getNS().ns()
+ << "; metadata not found"};
}
- vector<ShardId> shardIds;
+ std::vector<ShardId> shardIds;
grid.shardRegistry()->getAllShardIds(&shardIds);
for (const ShardId& shardId : shardIds) {
endpoints->push_back(stdx::make_unique<ShardEndpoint>(
- shardId, _manager ? _manager->getVersion(shardId) : ChunkVersion::UNSHARDED()));
+ shardId,
+ _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
+ : ChunkVersion::UNSHARDED()));
}
return Status::OK();
@@ -649,8 +622,7 @@ void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint,
if (staleInfo["vWanted"].eoo()) {
// If we don't have a vWanted sent, assume the version is higher than our current
// version.
- remoteShardVersion =
- getShardVersion(endpoint.shardName.toString(), _manager.get(), _primary.get());
+ remoteShardVersion = getShardVersion(*_routingInfo, endpoint.shardName);
remoteShardVersion.incMajor();
} else {
remoteShardVersion = ChunkVersion::fromBSON(staleInfo, "vWanted");
@@ -699,18 +671,14 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
// Get the latest metadata information from the cache if there were issues
//
- shared_ptr<ChunkManager> lastManager = _manager;
- shared_ptr<Shard> lastPrimary = _primary;
+ auto lastManager = _routingInfo->cm();
+ auto lastPrimary = _routingInfo->primary();
- auto scopedCMStatus = ScopedChunkManager::getOrCreate(opCtx, _nss);
- if (!scopedCMStatus.isOK()) {
- return scopedCMStatus.getStatus();
+ auto initStatus = init(opCtx);
+ if (!initStatus.isOK()) {
+ return initStatus;
}
- const auto& scopedCM = scopedCMStatus.getValue();
- _manager = scopedCM.cm();
- _primary = scopedCM.primary();
-
// We now have the latest metadata from the cache.
//
@@ -718,8 +686,6 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
// Either we couldn't target at all, or we have stale versions, but not both.
//
- dassert(!(_needsTargetingRefresh && !_remoteShardVersions.empty()));
-
if (_needsTargetingRefresh) {
// Reset the field
_needsTargetingRefresh = false;
@@ -728,63 +694,44 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
// the
// metadata since we last got it from the cache.
- bool alreadyRefreshed = wasMetadataRefreshed(lastManager, lastPrimary, _manager, _primary);
+ bool alreadyRefreshed = wasMetadataRefreshed(
+ lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary());
// If didn't already refresh the targeting information, refresh it
if (!alreadyRefreshed) {
// To match previous behavior, we just need an incremental refresh here
- return refreshNow(opCtx, RefreshType_RefreshChunkManager);
+ return refreshNow(opCtx);
}
- *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary);
+ *wasChanged = isMetadataDifferent(
+ lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary());
return Status::OK();
} else if (!_remoteShardVersions.empty()) {
// If we got stale shard versions from remote shards, we may need to refresh
// NOTE: Not sure yet if this can happen simultaneously with targeting issues
- CompareResult result =
- compareAllShardVersions(_manager.get(), _primary.get(), _remoteShardVersions);
+ CompareResult result = compareAllShardVersions(*_routingInfo, _remoteShardVersions);
+
// Reset the versions
_remoteShardVersions.clear();
- if (result == CompareResult_Unknown) {
+ if (result == CompareResult_Unknown || result == CompareResult_LT) {
// Our current shard versions aren't all comparable to the old versions, maybe drop
- return refreshNow(opCtx, RefreshType_ReloadDatabase);
- } else if (result == CompareResult_LT) {
- // Our current shard versions are less than the remote versions, but no drop
- return refreshNow(opCtx, RefreshType_RefreshChunkManager);
+ return refreshNow(opCtx);
}
- *wasChanged = isMetadataDifferent(lastManager, lastPrimary, _manager, _primary);
+ *wasChanged = isMetadataDifferent(
+ lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary());
return Status::OK();
}
- // unreachable
- dassert(false);
- return Status::OK();
+ MONGO_UNREACHABLE;
}
-Status ChunkManagerTargeter::refreshNow(OperationContext* opCtx, RefreshType refreshType) {
- if (refreshType == RefreshType_ReloadDatabase) {
- Grid::get(opCtx)->catalogCache()->invalidate(_nss.db().toString());
- }
-
- // Try not to spam the configs
- refreshBackoff();
+Status ChunkManagerTargeter::refreshNow(OperationContext* opCtx) {
+ Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*_routingInfo));
- ScopedChunkManager::refreshAndGet(opCtx, _nss);
-
- auto scopedCMStatus = ScopedChunkManager::get(opCtx, _nss);
- if (!scopedCMStatus.isOK()) {
- return scopedCMStatus.getStatus();
- }
-
- const auto& scopedCM = scopedCMStatus.getValue();
-
- _manager = scopedCM.cm();
- _primary = scopedCM.primary();
-
- return Status::OK();
+ return init(opCtx);
}
} // namespace mongo
diff --git a/src/mongo/s/commands/chunk_manager_targeter.cpp-8454ea5f b/src/mongo/s/commands/chunk_manager_targeter.cpp-8454ea5f
new file mode 100644
index 00000000000..42de7ba2903
--- /dev/null
+++ b/src/mongo/s/commands/chunk_manager_targeter.cpp-8454ea5f
@@ -0,0 +1,737 @@
+/**
+ * Copyright (C) 2013 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/commands/chunk_manager_targeter.h"
+
+#include "mongo/db/matcher/extensions_callback_noop.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/query/canonical_query.h"
+#include "mongo/db/query/collation/collation_index_key.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/commands/cluster_commands_common.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/shard_key_pattern.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+namespace {
+
+enum UpdateType { UpdateType_Replacement, UpdateType_OpStyle, UpdateType_Unknown };
+
+enum CompareResult { CompareResult_Unknown, CompareResult_GTE, CompareResult_LT };
+
+const ShardKeyPattern virtualIdShardKey(BSON("_id" << 1));
+
+/**
+ * There are two styles of update expressions:
+ *
+ * Replacement style: coll.update({ x : 1 }, { y : 2 })
+ * OpStyle: coll.update({ x : 1 }, { $set : { y : 2 } })
+ */
+UpdateType getUpdateExprType(const BSONObj& updateExpr) {
+ // Empty update is replacement-style, by default
+ if (updateExpr.isEmpty()) {
+ return UpdateType_Replacement;
+ }
+
+ UpdateType updateType = UpdateType_Unknown;
+
+ BSONObjIterator it(updateExpr);
+ while (it.more()) {
+ BSONElement next = it.next();
+
+ if (next.fieldName()[0] == '$') {
+ if (updateType == UpdateType_Unknown) {
+ updateType = UpdateType_OpStyle;
+ } else if (updateType == UpdateType_Replacement) {
+ return UpdateType_Unknown;
+ }
+ } else {
+ if (updateType == UpdateType_Unknown) {
+ updateType = UpdateType_Replacement;
+ } else if (updateType == UpdateType_OpStyle) {
+ return UpdateType_Unknown;
+ }
+ }
+ }
+
+ return updateType;
+}
+
+/**
+ * This returns "does the query have an _id field" and "is the _id field querying for a direct
+ * value like _id : 3 and not _id : { $gt : 3 }"
+ *
+ * If the query does not use the collection default collation, the _id field cannot contain strings,
+ * objects, or arrays.
+ *
+ * Ex: { _id : 1 } => true
+ * { foo : <anything>, _id : 1 } => true
+ * { _id : { $lt : 30 } } => false
+ * { foo : <anything> } => false
+ */
+bool isExactIdQuery(OperationContext* opCtx, const CanonicalQuery& query, ChunkManager* manager) {
+ auto shardKey = virtualIdShardKey.extractShardKeyFromQuery(query);
+ BSONElement idElt = shardKey["_id"];
+
+ if (!idElt) {
+ return false;
+ }
+
+ if (CollationIndexKey::isCollatableType(idElt.type()) && manager &&
+ !query.getQueryRequest().getCollation().isEmpty() &&
+ !CollatorInterface::collatorsMatch(query.getCollator(), manager->getDefaultCollator())) {
+
+ // The collation applies to the _id field, but the user specified a collation which doesn't
+ // match the collection default.
+ return false;
+ }
+
+ return true;
+}
+
+//
+// Utilities to compare shard versions
+//
+
+/**
+ * Returns the relationship of two shard versions. Shard versions of a collection that has not
+ * been dropped and recreated and where there is at least one chunk on a shard are comparable,
+ * otherwise the result is ambiguous.
+ */
+CompareResult compareShardVersions(const ChunkVersion& shardVersionA,
+ const ChunkVersion& shardVersionB) {
+ // Collection may have been dropped
+ if (!shardVersionA.hasEqualEpoch(shardVersionB)) {
+ return CompareResult_Unknown;
+ }
+
+ // Zero shard versions are only comparable to themselves
+ if (!shardVersionA.isSet() || !shardVersionB.isSet()) {
+ // If both are zero...
+ if (!shardVersionA.isSet() && !shardVersionB.isSet()) {
+ return CompareResult_GTE;
+ }
+
+ return CompareResult_Unknown;
+ }
+
+ if (shardVersionA < shardVersionB)
+ return CompareResult_LT;
+ else
+ return CompareResult_GTE;
+}
+
+ChunkVersion getShardVersion(const CachedCollectionRoutingInfo& routingInfo,
+ const ShardId& shardId) {
+ if (routingInfo.cm()) {
+ return routingInfo.cm()->getVersion(shardId);
+ }
+
+ return ChunkVersion::UNSHARDED();
+}
+
+/**
+ * Returns the relationship between two maps of shard versions. As above, these maps are often
+ * comparable when the collection has not been dropped and there is at least one chunk on the
+ * shards. If any versions in the maps are not comparable, the result is _Unknown.
+ *
+ * If any versions in the first map (cached) are _LT the versions in the second map (remote),
+ * the first (cached) versions are _LT the second (remote) versions.
+ *
+ * Note that the signature here is weird since our cached map of chunk versions is stored in a
+ * ChunkManager or is implicit in the primary shard of the collection.
+ */
+CompareResult compareAllShardVersions(const CachedCollectionRoutingInfo& routingInfo,
+ const ShardVersionMap& remoteShardVersions) {
+ CompareResult finalResult = CompareResult_GTE;
+
+ for (const auto& shardVersionEntry : remoteShardVersions) {
+ const ShardId& shardId = shardVersionEntry.first;
+ const ChunkVersion& remoteShardVersion = shardVersionEntry.second;
+
+ ChunkVersion cachedShardVersion;
+
+ try {
+ // Throws b/c shard constructor throws
+ cachedShardVersion = getShardVersion(routingInfo, shardId);
+ } catch (const DBException& ex) {
+ warning() << "could not lookup shard " << shardId
+ << " in local cache, shard metadata may have changed"
+ << " or be unavailable" << causedBy(ex);
+
+ return CompareResult_Unknown;
+ }
+
+ // Compare the remote and cached versions
+ CompareResult result = compareShardVersions(cachedShardVersion, remoteShardVersion);
+
+ if (result == CompareResult_Unknown)
+ return result;
+
+ if (result == CompareResult_LT)
+ finalResult = CompareResult_LT;
+
+ // Note that we keep going after _LT b/c there could be more _Unknowns.
+ }
+
+ return finalResult;
+}
+
+/**
+ * Whether or not the manager/primary pair is different from the other manager/primary pair.
+ */
+bool isMetadataDifferent(const std::shared_ptr<ChunkManager>& managerA,
+ const std::shared_ptr<Shard>& primaryA,
+ const std::shared_ptr<ChunkManager>& managerB,
+ const std::shared_ptr<Shard>& primaryB) {
+ if ((managerA && !managerB) || (!managerA && managerB) || (primaryA && !primaryB) ||
+ (!primaryA && primaryB))
+ return true;
+
+ if (managerA) {
+ return !managerA->getVersion().isStrictlyEqualTo(managerB->getVersion());
+ }
+
+ dassert(NULL != primaryA.get());
+ return primaryA->getId() != primaryB->getId();
+}
+
+/**
+* Whether or not the manager/primary pair was changed or refreshed from a previous version
+* of the metadata.
+*/
+bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA,
+ const std::shared_ptr<Shard>& primaryA,
+ const std::shared_ptr<ChunkManager>& managerB,
+ const std::shared_ptr<Shard>& primaryB) {
+ if (isMetadataDifferent(managerA, primaryA, managerB, primaryB))
+ return true;
+
+ if (managerA) {
+ dassert(managerB.get()); // otherwise metadata would be different
+ return managerA->getSequenceNumber() != managerB->getSequenceNumber();
+ }
+
+ return false;
+}
+
+} // namespace
+
+ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss, TargeterStats* stats)
+ : _nss(nss), _needsTargetingRefresh(false), _stats(stats) {}
+
+
+Status ChunkManagerTargeter::init(OperationContext* opCtx) {
+ auto shardDbStatus = createShardDatabase(opCtx, _nss.db());
+ if (!shardDbStatus.isOK()) {
+ return shardDbStatus.getStatus();
+ }
+
+ const auto routingInfoStatus =
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, _nss);
+ if (!routingInfoStatus.isOK()) {
+ return routingInfoStatus.getStatus();
+ }
+
+ _routingInfo = std::move(routingInfoStatus.getValue());
+
+ return Status::OK();
+}
+
+const NamespaceString& ChunkManagerTargeter::getNS() const {
+ return _nss;
+}
+
+Status ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
+ const BSONObj& doc,
+ ShardEndpoint** endpoint) const {
+ BSONObj shardKey;
+
+ if (_routingInfo->cm()) {
+ //
+ // Sharded collections have the following requirements for targeting:
+ //
+ // Inserts must contain the exact shard key.
+ //
+
+ shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(doc);
+
+ // Check shard key exists
+ if (shardKey.isEmpty()) {
+ return {ErrorCodes::ShardKeyNotFound,
+ str::stream() << "document " << doc
+ << " does not contain shard key for pattern "
+ << _routingInfo->cm()->getShardKeyPattern().toString()};
+ }
+
+ // Check shard key size on insert
+ Status status = ShardKeyPattern::checkShardKeySize(shardKey);
+ if (!status.isOK())
+ return status;
+ }
+
+ // Target the shard key or database primary
+ if (!shardKey.isEmpty()) {
+ *endpoint = targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize()).release();
+ } else {
+ if (!_routingInfo->primary()) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target insert in collection " << getNS().ns()
+ << "; no metadata found");
+ }
+
+ *endpoint = new ShardEndpoint(_routingInfo->primary()->getId(), ChunkVersion::UNSHARDED());
+ }
+
+ return Status::OK();
+}
+
+Status ChunkManagerTargeter::targetUpdate(
+ OperationContext* opCtx,
+ const BatchedUpdateDocument& updateDoc,
+ std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
+ //
+ // Update targeting may use either the query or the update. This is to support save-style
+ // updates, of the form:
+ //
+ // coll.update({ _id : xxx }, { _id : xxx, shardKey : 1, foo : bar }, { upsert : true })
+ //
+ // Because drivers do not know the shard key, they can't pull the shard key automatically
+ // into the query doc, and to correctly support upsert we must target a single shard.
+ //
+ // The rule is simple - If the update is replacement style (no '$set'), we target using the
+ // update. If the update is replacement style, we target using the query.
+ //
+ // If we have the exact shard key in either the query or replacement doc, we target using
+ // that extracted key.
+ //
+
+ BSONObj query = updateDoc.getQuery();
+ BSONObj updateExpr = updateDoc.getUpdateExpr();
+
+ UpdateType updateType = getUpdateExprType(updateDoc.getUpdateExpr());
+
+ if (updateType == UpdateType_Unknown) {
+ return {ErrorCodes::UnsupportedFormat,
+ str::stream() << "update document " << updateExpr
+ << " has mixed $operator and non-$operator style fields"};
+ }
+
+ BSONObj shardKey;
+
+ if (_routingInfo->cm()) {
+ //
+ // Sharded collections have the following futher requirements for targeting:
+ //
+ // Upserts must be targeted exactly by shard key.
+ // Non-multi updates must be targeted exactly by shard key *or* exact _id.
+ //
+
+ // Get the shard key
+ if (updateType == UpdateType_OpStyle) {
+ // Target using the query
+ StatusWith<BSONObj> status =
+ _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(opCtx, query);
+
+ // Bad query
+ if (!status.isOK())
+ return status.getStatus();
+
+ shardKey = status.getValue();
+ } else {
+ // Target using the replacement document
+ shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(updateExpr);
+ }
+
+ // Check shard key size on upsert.
+ if (updateDoc.getUpsert()) {
+ Status status = ShardKeyPattern::checkShardKeySize(shardKey);
+ if (!status.isOK())
+ return status;
+ }
+ }
+
+ const BSONObj collation = updateDoc.isCollationSet() ? updateDoc.getCollation() : BSONObj();
+
+ // Target the shard key, query, or replacement doc
+ if (!shardKey.isEmpty()) {
+ try {
+ endpoints->push_back(
+ targetShardKey(shardKey, collation, (query.objsize() + updateExpr.objsize())));
+ return Status::OK();
+ } catch (const DBException&) {
+ // This update is potentially not constrained to a single shard
+ }
+ }
+
+ // We failed to target a single shard.
+
+ // Upserts are required to target a single shard.
+ if (_routingInfo->cm() && updateDoc.getUpsert()) {
+ return Status(ErrorCodes::ShardKeyNotFound,
+ str::stream() << "An upsert on a sharded collection must contain the shard "
+ "key and have the simple collation. Update request: "
+ << updateDoc.toBSON()
+ << ", shard key pattern: "
+ << _routingInfo->cm()->getShardKeyPattern().toString());
+ }
+
+ // Parse update query.
+ auto qr = stdx::make_unique<QueryRequest>(getNS());
+ qr->setFilter(updateDoc.getQuery());
+ if (!collation.isEmpty()) {
+ qr->setCollation(collation);
+ }
+ auto cq = CanonicalQuery::canonicalize(opCtx, std::move(qr), ExtensionsCallbackNoop());
+ if (!cq.isOK()) {
+ return Status(cq.getStatus().code(),
+ str::stream() << "Could not parse update query " << updateDoc.getQuery()
+ << causedBy(cq.getStatus()));
+ }
+
+ // Single (non-multi) updates must target a single shard or be exact-ID.
+ if (_routingInfo->cm() && !updateDoc.getMulti() &&
+ !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) {
+ return Status(ErrorCodes::ShardKeyNotFound,
+ str::stream()
+ << "A single update on a sharded collection must contain an exact "
+ "match on _id (and have the collection default collation) or "
+ "contain the shard key (and have the simple collation). Update "
+ "request: "
+ << updateDoc.toBSON()
+ << ", shard key pattern: "
+ << _routingInfo->cm()->getShardKeyPattern().toString());
+ }
+
+ if (updateType == UpdateType_OpStyle) {
+ return targetQuery(opCtx, query, collation, endpoints);
+ } else {
+ return targetDoc(opCtx, updateExpr, collation, endpoints);
+ }
+}
+
+Status ChunkManagerTargeter::targetDelete(
+ OperationContext* opCtx,
+ const BatchedDeleteDocument& deleteDoc,
+ std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
+ BSONObj shardKey;
+
+ if (_routingInfo->cm()) {
+ //
+ // Sharded collections have the following further requirements for targeting:
+ //
+ // Limit-1 deletes must be targeted exactly by shard key *or* exact _id
+ //
+
+ // Get the shard key
+ StatusWith<BSONObj> status =
+ _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(opCtx,
+ deleteDoc.getQuery());
+
+ // Bad query
+ if (!status.isOK())
+ return status.getStatus();
+
+ shardKey = status.getValue();
+ }
+
+ const BSONObj collation = deleteDoc.isCollationSet() ? deleteDoc.getCollation() : BSONObj();
+
+
+ // Target the shard key or delete query
+ if (!shardKey.isEmpty()) {
+ try {
+ endpoints->push_back(targetShardKey(shardKey, collation, 0));
+ return Status::OK();
+ } catch (const DBException&) {
+ // This delete is potentially not constrained to a single shard
+ }
+ }
+
+ // We failed to target a single shard.
+
+ // Parse delete query.
+ auto qr = stdx::make_unique<QueryRequest>(getNS());
+ qr->setFilter(deleteDoc.getQuery());
+ if (!collation.isEmpty()) {
+ qr->setCollation(collation);
+ }
+ auto cq = CanonicalQuery::canonicalize(opCtx, std::move(qr), ExtensionsCallbackNoop());
+ if (!cq.isOK()) {
+ return Status(cq.getStatus().code(),
+ str::stream() << "Could not parse delete query " << deleteDoc.getQuery()
+ << causedBy(cq.getStatus()));
+ }
+
+ // Single deletes must target a single shard or be exact-ID.
+ if (_routingInfo->cm() && deleteDoc.getLimit() == 1 &&
+ !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) {
+ return Status(ErrorCodes::ShardKeyNotFound,
+ str::stream()
+ << "A single delete on a sharded collection must contain an exact "
+ "match on _id (and have the collection default collation) or "
+ "contain the shard key (and have the simple collation). Delete "
+ "request: "
+ << deleteDoc.toBSON()
+ << ", shard key pattern: "
+ << _routingInfo->cm()->getShardKeyPattern().toString());
+ }
+
+ return targetQuery(opCtx, deleteDoc.getQuery(), collation, endpoints);
+}
+
+Status ChunkManagerTargeter::targetDoc(
+ OperationContext* opCtx,
+ const BSONObj& doc,
+ const BSONObj& collation,
+ std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
+ // NOTE: This is weird and fragile, but it's the way our language works right now -
+ // documents are either A) invalid or B) valid equality queries over themselves.
+ return targetQuery(opCtx, doc, collation, endpoints);
+}
+
+Status ChunkManagerTargeter::targetQuery(
+ OperationContext* opCtx,
+ const BSONObj& query,
+ const BSONObj& collation,
+ std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
+ if (!_routingInfo->primary() && !_routingInfo->cm()) {
+ return {ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target query in " << getNS().ns()
+ << "; no metadata found"};
+ }
+
+ std::set<ShardId> shardIds;
+ if (_routingInfo->cm()) {
+ try {
+ _routingInfo->cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+ } else {
+ shardIds.insert(_routingInfo->primary()->getId());
+ }
+
+ for (const ShardId& shardId : shardIds) {
+ endpoints->push_back(stdx::make_unique<ShardEndpoint>(
+ shardId,
+ _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
+ : ChunkVersion::UNSHARDED()));
+ }
+
+ return Status::OK();
+}
+
+std::unique_ptr<ShardEndpoint> ChunkManagerTargeter::targetShardKey(const BSONObj& shardKey,
+ const BSONObj& collation,
+ long long estDataSize) const {
+ auto chunk = _routingInfo->cm()->findIntersectingChunk(shardKey, collation);
+
+ // Track autosplit stats for sharded collections
+ // Note: this is only best effort accounting and is not accurate.
+ if (estDataSize > 0) {
+ _stats->chunkSizeDelta[chunk->getMin()] += estDataSize;
+ }
+
+ return stdx::make_unique<ShardEndpoint>(chunk->getShardId(),
+ _routingInfo->cm()->getVersion(chunk->getShardId()));
+}
+
+Status ChunkManagerTargeter::targetCollection(
+ std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
+ if (!_routingInfo->primary() && !_routingInfo->cm()) {
+ return {ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target full range of " << getNS().ns()
+ << "; metadata not found"};
+ }
+
+ std::set<ShardId> shardIds;
+ if (_routingInfo->cm()) {
+ _routingInfo->cm()->getAllShardIds(&shardIds);
+ } else {
+ shardIds.insert(_routingInfo->primary()->getId());
+ }
+
+ for (const ShardId& shardId : shardIds) {
+ endpoints->push_back(stdx::make_unique<ShardEndpoint>(
+ shardId,
+ _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
+ : ChunkVersion::UNSHARDED()));
+ }
+
+ return Status::OK();
+}
+
+Status ChunkManagerTargeter::targetAllShards(
+ std::vector<std::unique_ptr<ShardEndpoint>>* endpoints) const {
+ if (!_routingInfo->primary() && !_routingInfo->cm()) {
+ return {ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target every shard with versions for " << getNS().ns()
+ << "; metadata not found"};
+ }
+
+ std::vector<ShardId> shardIds;
+ grid.shardRegistry()->getAllShardIds(&shardIds);
+
+ for (const ShardId& shardId : shardIds) {
+ endpoints->push_back(stdx::make_unique<ShardEndpoint>(
+ shardId,
+ _routingInfo->cm() ? _routingInfo->cm()->getVersion(shardId)
+ : ChunkVersion::UNSHARDED()));
+ }
+
+ return Status::OK();
+}
+
+void ChunkManagerTargeter::noteStaleResponse(const ShardEndpoint& endpoint,
+ const BSONObj& staleInfo) {
+ dassert(!_needsTargetingRefresh);
+
+ ChunkVersion remoteShardVersion;
+ if (staleInfo["vWanted"].eoo()) {
+ // If we don't have a vWanted sent, assume the version is higher than our current
+ // version.
+ remoteShardVersion = getShardVersion(*_routingInfo, endpoint.shardName);
+ remoteShardVersion.incMajor();
+ } else {
+ remoteShardVersion = ChunkVersion::fromBSON(staleInfo, "vWanted");
+ }
+
+ ShardVersionMap::iterator it = _remoteShardVersions.find(endpoint.shardName);
+ if (it == _remoteShardVersions.end()) {
+ _remoteShardVersions.insert(std::make_pair(endpoint.shardName, remoteShardVersion));
+ } else {
+ ChunkVersion& previouslyNotedVersion = it->second;
+ if (previouslyNotedVersion.hasEqualEpoch(remoteShardVersion)) {
+ if (previouslyNotedVersion.isOlderThan(remoteShardVersion)) {
+ previouslyNotedVersion = remoteShardVersion;
+ }
+ } else {
+ // Epoch changed midway while applying the batch so set the version to something
+ // unique
+ // and non-existent to force a reload when refreshIsNeeded is called.
+ previouslyNotedVersion = ChunkVersion::IGNORED();
+ }
+ }
+}
+
+void ChunkManagerTargeter::noteCouldNotTarget() {
+ dassert(_remoteShardVersions.empty());
+ _needsTargetingRefresh = true;
+}
+
+Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) {
+ bool dummy;
+ if (!wasChanged) {
+ wasChanged = &dummy;
+ }
+
+ *wasChanged = false;
+
+ //
+ // Did we have any stale config or targeting errors at all?
+ //
+
+ if (!_needsTargetingRefresh && _remoteShardVersions.empty()) {
+ return Status::OK();
+ }
+
+ //
+ // Get the latest metadata information from the cache if there were issues
+ //
+
+ auto lastManager = _routingInfo->cm();
+ auto lastPrimary = _routingInfo->primary();
+
+ auto initStatus = init(opCtx);
+ if (!initStatus.isOK()) {
+ return initStatus;
+ }
+
+ // We now have the latest metadata from the cache.
+
+ //
+ // See if and how we need to do a remote refresh.
+ // Either we couldn't target at all, or we have stale versions, but not both.
+ //
+
+ if (_needsTargetingRefresh) {
+ // Reset the field
+ _needsTargetingRefresh = false;
+
+ // If we couldn't target, we might need to refresh if we haven't remotely refreshed
+ // the
+ // metadata since we last got it from the cache.
+
+ bool alreadyRefreshed = wasMetadataRefreshed(
+ lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary());
+
+ // If didn't already refresh the targeting information, refresh it
+ if (!alreadyRefreshed) {
+ // To match previous behavior, we just need an incremental refresh here
+ return refreshNow(opCtx);
+ }
+
+ *wasChanged = isMetadataDifferent(
+ lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary());
+ return Status::OK();
+ } else if (!_remoteShardVersions.empty()) {
+ // If we got stale shard versions from remote shards, we may need to refresh
+ // NOTE: Not sure yet if this can happen simultaneously with targeting issues
+
+ CompareResult result = compareAllShardVersions(*_routingInfo, _remoteShardVersions);
+
+ // Reset the versions
+ _remoteShardVersions.clear();
+
+ if (result == CompareResult_Unknown || result == CompareResult_LT) {
+ // Our current shard versions aren't all comparable to the old versions, maybe drop
+ return refreshNow(opCtx);
+ }
+
+ *wasChanged = isMetadataDifferent(
+ lastManager, lastPrimary, _routingInfo->cm(), _routingInfo->primary());
+ return Status::OK();
+ }
+
+ MONGO_UNREACHABLE;
+}
+
+Status ChunkManagerTargeter::refreshNow(OperationContext* opCtx) {
+ Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*_routingInfo));
+
+ return init(opCtx);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/commands/chunk_manager_targeter.h b/src/mongo/s/commands/chunk_manager_targeter.h
index 36fe46a3fe5..97c0f4c1455 100644
--- a/src/mongo/s/commands/chunk_manager_targeter.h
+++ b/src/mongo/s/commands/chunk_manager_targeter.h
@@ -35,12 +35,12 @@
#include "mongo/bson/bsonobj_comparator_interface.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/ns_targeter.h"
namespace mongo {
class ChunkManager;
-class CollatorInterface;
class OperationContext;
class Shard;
struct ChunkVersion;
@@ -109,21 +109,12 @@ public:
Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged);
private:
- // Different ways we can refresh metadata
- enum RefreshType {
- // The version has gone up, but the collection hasn't been dropped
- RefreshType_RefreshChunkManager,
- // The collection may have been dropped, so we need to reload the db
- RefreshType_ReloadDatabase
- };
-
- typedef std::map<ShardId, ChunkVersion> ShardVersionMap;
-
+ using ShardVersionMap = std::map<ShardId, ChunkVersion>;
/**
* Performs an actual refresh from the config server.
*/
- Status refreshNow(OperationContext* opCtx, RefreshType refreshType);
+ Status refreshNow(OperationContext* opCtx);
/**
* Returns a vector of ShardEndpoints where a document might need to be placed.
@@ -170,10 +161,8 @@ private:
// Represents only the view and not really part of the targeter state. This is not owned here.
TargeterStats* _stats;
- // Zero or one of these are filled at all times
- // If sharded, _manager, if unsharded, _primary, on error, neither
- std::shared_ptr<ChunkManager> _manager;
- std::shared_ptr<Shard> _primary;
+ // The latest loaded routing cache entry
+ boost::optional<CachedCollectionRoutingInfo> _routingInfo;
// Map of shard->remote shard version reported from stale errors
ShardVersionMap _remoteShardVersions;
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index a6887ea0498..d9a182b2938 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -47,7 +47,7 @@
#include "mongo/db/views/view.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/chunk_manager.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_common.h"
@@ -55,7 +55,6 @@
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_query_knobs.h"
#include "mongo/s/query/store_possible_cursor.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
@@ -66,20 +65,22 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
BSONObj cmdObj,
int options,
BSONObjBuilder* result) {
- auto scopedShardDbStatus =
- ScopedShardDatabase::getExisting(opCtx, namespaces.executionNss.db());
- if (!scopedShardDbStatus.isOK()) {
- appendEmptyResultSet(
- *result, scopedShardDbStatus.getStatus(), namespaces.requestedNss.ns());
- return Status::OK();
- }
-
auto request = AggregationRequest::parseFromBSON(namespaces.executionNss, cmdObj);
if (!request.isOK()) {
return request.getStatus();
}
- const auto conf = scopedShardDbStatus.getValue().db();
+ auto const catalogCache = Grid::get(opCtx)->catalogCache();
+
+ auto executionNsRoutingInfoStatus =
+ catalogCache->getCollectionRoutingInfo(opCtx, namespaces.executionNss);
+ if (!executionNsRoutingInfoStatus.isOK()) {
+ appendEmptyResultSet(
+ *result, executionNsRoutingInfoStatus.getStatus(), namespaces.requestedNss.ns());
+ return Status::OK();
+ }
+
+ const auto& executionNsRoutingInfo = executionNsRoutingInfoStatus.getValue();
// Determine the appropriate collation and 'resolve' involved namespaces to make the
// ExpressionContext.
@@ -91,16 +92,20 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// command on an unsharded collection.
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
LiteParsedPipeline liteParsedPipeline(request.getValue());
- for (auto&& ns : liteParsedPipeline.getInvolvedNamespaces()) {
- uassert(28769, str::stream() << ns.ns() << " cannot be sharded", !conf->isSharded(ns.ns()));
- resolvedNamespaces[ns.coll()] = {ns, std::vector<BSONObj>{}};
+ for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) {
+ const auto resolvedNsRoutingInfo =
+ uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
+ uassert(
+ 28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm());
+ resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
}
- if (!conf->isSharded(namespaces.executionNss.ns())) {
- return aggPassthrough(opCtx, namespaces, conf, cmdObj, result, options);
+ if (!executionNsRoutingInfo.cm()) {
+ return aggPassthrough(
+ opCtx, namespaces, executionNsRoutingInfo.primary()->getId(), cmdObj, result, options);
}
- auto chunkMgr = conf->getChunkManager(opCtx, namespaces.executionNss.ns());
+ const auto chunkMgr = executionNsRoutingInfo.cm();
std::unique_ptr<CollatorInterface> collation;
if (!request.getValue().getCollation().isEmpty()) {
@@ -260,9 +265,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Run merging command on random shard, unless a stage needs the primary shard. Need to use
// ShardConnection so that the merging mongod is sent the config servers on connection init.
auto& prng = opCtx->getClient()->getPrng();
- const auto& mergingShardId =
+ const auto mergingShardId =
(needPrimaryShardMerger || internalQueryAlwaysMergeOnPrimaryShard.load())
- ? conf->getPrimaryId()
+ ? uassertStatusOK(catalogCache->getDatabase(opCtx, namespaces.executionNss.db()))
+ .primaryId()
: shardResults[prng.nextInt32(shardResults.size())].shardTargetId;
const auto mergingShard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId));
@@ -426,12 +432,12 @@ BSONObj ClusterAggregate::aggRunCommand(OperationContext* opCtx,
Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
const Namespaces& namespaces,
- DBConfig* conf,
+ const ShardId& shardId,
BSONObj cmdObj,
BSONObjBuilder* out,
int queryOptions) {
// Temporary hack. See comment on declaration for details.
- auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId());
+ auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId);
if (!shardStatus.isOK()) {
return shardStatus.getStatus();
}
diff --git a/src/mongo/s/commands/cluster_aggregate.h b/src/mongo/s/commands/cluster_aggregate.h
index b0fdd5d7375..d8e29744766 100644
--- a/src/mongo/s/commands/cluster_aggregate.h
+++ b/src/mongo/s/commands/cluster_aggregate.h
@@ -37,11 +37,11 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/s/commands/strategy.h"
-#include "mongo/s/config.h"
namespace mongo {
class OperationContext;
+class ShardId;
/**
* Methods for running aggregation across a sharded cluster.
@@ -90,7 +90,7 @@ private:
static Status aggPassthrough(OperationContext* opCtx,
const Namespaces& namespaces,
- DBConfig* conf,
+ const ShardId& shardId,
BSONObj cmd,
BSONObjBuilder* result,
int queryOptions);
diff --git a/src/mongo/s/commands/cluster_commands_common.cpp b/src/mongo/s/commands/cluster_commands_common.cpp
index 3ad8c373b9a..225cef2b6ef 100644
--- a/src/mongo/s/commands/cluster_commands_common.cpp
+++ b/src/mongo/s/commands/cluster_commands_common.cpp
@@ -40,7 +40,6 @@
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/version_manager.h"
#include "mongo/s/grid.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
@@ -54,17 +53,21 @@ namespace {
bool forceRemoteCheckShardVersionCB(OperationContext* opCtx, const string& ns) {
const NamespaceString nss(ns);
+ if (!nss.isValid()) {
+ return false;
+ }
+
// This will force the database catalog entry to be reloaded
- Grid::get(opCtx)->catalogCache()->invalidate(nss.db());
+ Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss);
- auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss);
- if (!scopedCMStatus.isOK()) {
+ auto routingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss);
+ if (!routingInfoStatus.isOK()) {
return false;
}
- const auto& scopedCM = scopedCMStatus.getValue();
+ auto& routingInfo = routingInfoStatus.getValue();
- return scopedCM.cm() != nullptr;
+ return routingInfo.cm() != nullptr;
}
} // namespace
@@ -261,4 +264,36 @@ std::vector<NamespaceString> getAllShardedCollectionsForDb(OperationContext* opC
return collectionsToReturn;
}
+CachedCollectionRoutingInfo getShardedCollection(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Collection " << nss.ns() << " is not sharded.",
+ routingInfo.cm());
+
+ return routingInfo;
+}
+
+StatusWith<CachedDatabaseInfo> createShardDatabase(OperationContext* opCtx, StringData dbName) {
+ auto dbStatus = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName);
+ if (dbStatus == ErrorCodes::NamespaceNotFound) {
+ auto createDbStatus =
+ Grid::get(opCtx)->catalogClient(opCtx)->createDatabase(opCtx, dbName.toString());
+ if (createDbStatus.isOK() || createDbStatus == ErrorCodes::NamespaceExists) {
+ dbStatus = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName);
+ } else {
+ dbStatus = createDbStatus;
+ }
+ }
+
+ if (dbStatus.isOK()) {
+ return dbStatus;
+ }
+
+ return {dbStatus.getStatus().code(),
+ str::stream() << "Database " << dbName << " not found due to "
+ << dbStatus.getStatus().reason()};
+}
+
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_commands_common.h b/src/mongo/s/commands/cluster_commands_common.h
index 7d6465bc400..960eb03e73b 100644
--- a/src/mongo/s/commands/cluster_commands_common.h
+++ b/src/mongo/s/commands/cluster_commands_common.h
@@ -39,6 +39,8 @@
namespace mongo {
class AScopedConnection;
+class CachedCollectionRoutingInfo;
+class CachedDatabaseInfo;
class DBClientBase;
class DBClientCursor;
class OperationContext;
@@ -140,4 +142,17 @@ bool appendEmptyResultSet(BSONObjBuilder& result, Status status, const std::stri
std::vector<NamespaceString> getAllShardedCollectionsForDb(OperationContext* opCtx,
StringData dbName);
+/**
+ * Abstracts the common pattern of refreshing a collection and checking if it is sharded used across
+ * multiple commands.
+ */
+CachedCollectionRoutingInfo getShardedCollection(OperationContext* opCtx,
+ const NamespaceString& nss);
+
+/**
+ * If the specified database exists already, loads it in the cache (if not already there) and
+ * returns it. Otherwise, if it does not exist, this call will implicitly create it as non-sharded.
+ */
+StatusWith<CachedDatabaseInfo> createShardDatabase(OperationContext* opCtx, StringData dbName);
+
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp
index 2fcf11086b9..45c46c26185 100644
--- a/src/mongo/s/commands/cluster_count_cmd.cpp
+++ b/src/mongo/s/commands/cluster_count_cmd.cpp
@@ -42,73 +42,42 @@
#include "mongo/util/timer.h"
namespace mongo {
-
-using std::string;
-using std::vector;
-
namespace {
-long long applySkipLimit(long long num, const BSONObj& cmd) {
- BSONElement s = cmd["skip"];
- BSONElement l = cmd["limit"];
-
- if (s.isNumber()) {
- num = num - s.numberLong();
- if (num < 0) {
- num = 0;
- }
- }
-
- if (l.isNumber()) {
- long long limit = l.numberLong();
- if (limit < 0) {
- limit = -limit;
- }
-
- // 0 limit means no limit
- if (limit < num && limit != 0) {
- num = limit;
- }
- }
-
- return num;
-}
-
-
class ClusterCountCmd : public Command {
public:
ClusterCountCmd() : Command("count", false) {}
- virtual bool slaveOk() const {
+ bool slaveOk() const override {
return true;
}
- virtual bool adminOnly() const {
+ bool adminOnly() const override {
return false;
}
-
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) override {
ActionSet actions;
actions.addAction(ActionType::find);
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions));
}
- virtual bool run(OperationContext* opCtx,
- const std::string& dbname,
- BSONObj& cmdObj,
- int options,
- std::string& errmsg,
- BSONObjBuilder& result) {
+ bool run(OperationContext* opCtx,
+ const std::string& dbname,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& result) override {
const NamespaceString nss(parseNs(dbname, cmdObj));
- uassert(
- ErrorCodes::InvalidNamespace, "count command requires valid namespace", nss.isValid());
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Invalid namespace specified '" << nss.ns() << "'",
+ nss.isValid());
long long skip = 0;
@@ -167,7 +136,7 @@ public:
}
}
- vector<Strategy::CommandResult> countResult;
+ std::vector<Strategy::CommandResult> countResult;
Strategy::commandOp(opCtx,
dbname,
countCmdBuilder.done(),
@@ -214,20 +183,19 @@ public:
long long total = 0;
BSONObjBuilder shardSubTotal(result.subobjStart("shards"));
- for (vector<Strategy::CommandResult>::const_iterator iter = countResult.begin();
- iter != countResult.end();
- ++iter) {
- const ShardId& shardName = iter->shardTargetId;
+ for (const auto& resultEntry : countResult) {
+ const ShardId& shardName = resultEntry.shardTargetId;
+ const auto resultBSON = resultEntry.result;
- if (iter->result["ok"].trueValue()) {
- long long shardCount = iter->result["n"].numberLong();
+ if (resultBSON["ok"].trueValue()) {
+ long long shardCount = resultBSON["n"].numberLong();
shardSubTotal.appendNumber(shardName.toString(), shardCount);
total += shardCount;
} else {
shardSubTotal.doneFast();
errmsg = "failed on : " + shardName.toString();
- result.append("cause", iter->result);
+ result.append("cause", resultBSON);
// Add "code" to the top-level response, if the failure of the sharded command
// can be accounted to a single error
@@ -247,17 +215,16 @@ public:
return true;
}
- virtual Status explain(OperationContext* opCtx,
- const std::string& dbname,
- const BSONObj& cmdObj,
- ExplainCommon::Verbosity verbosity,
- const rpc::ServerSelectionMetadata& serverSelectionMetadata,
- BSONObjBuilder* out) const {
+ Status explain(OperationContext* opCtx,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ ExplainCommon::Verbosity verbosity,
+ const rpc::ServerSelectionMetadata& serverSelectionMetadata,
+ BSONObjBuilder* out) const override {
const NamespaceString nss(parseNs(dbname, cmdObj));
- if (!nss.isValid()) {
- return Status{ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid collection name: " << nss.ns()};
- }
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Invalid namespace specified '" << nss.ns() << "'",
+ nss.isValid());
// Extract the targeting query.
BSONObj targetingQuery;
@@ -284,7 +251,7 @@ public:
// We will time how long it takes to run the commands on the shards
Timer timer;
- vector<Strategy::CommandResult> shardResults;
+ std::vector<Strategy::CommandResult> shardResults;
Strategy::commandOp(opCtx,
dbname,
explainCmdBob.obj(),
@@ -329,6 +296,33 @@ public:
opCtx, shardResults, mongosStageName, millisElapsed, out);
}
+private:
+ static long long applySkipLimit(long long num, const BSONObj& cmd) {
+ BSONElement s = cmd["skip"];
+ BSONElement l = cmd["limit"];
+
+ if (s.isNumber()) {
+ num = num - s.numberLong();
+ if (num < 0) {
+ num = 0;
+ }
+ }
+
+ if (l.isNumber()) {
+ long long limit = l.numberLong();
+ if (limit < 0) {
+ limit = -limit;
+ }
+
+ // 0 limit means no limit
+ if (limit < num && limit != 0) {
+ num = limit;
+ }
+ }
+
+ return num;
+ }
+
} clusterCountCmd;
} // namespace
diff --git a/src/mongo/s/commands/cluster_drop_cmd.cpp b/src/mongo/s/commands/cluster_drop_cmd.cpp
index 2c44a5a1dbc..583c8902cbf 100644
--- a/src/mongo/s/commands/cluster_drop_cmd.cpp
+++ b/src/mongo/s/commands/cluster_drop_cmd.cpp
@@ -41,7 +41,6 @@
#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/grid.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
@@ -80,20 +79,20 @@ public:
BSONObjBuilder& result) override {
const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj));
- auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname);
- if (scopedDbStatus == ErrorCodes::NamespaceNotFound) {
+ auto const catalogCache = Grid::get(opCtx)->catalogCache();
+
+ auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, nss);
+ if (routingInfoStatus == ErrorCodes::NamespaceNotFound) {
return true;
}
- uassertStatusOK(scopedDbStatus.getStatus());
-
- auto const db = scopedDbStatus.getValue().db();
+ auto routingInfo = uassertStatusOK(std::move(routingInfoStatus));
- if (!db->isSharded(nss.ns())) {
- _dropUnshardedCollectionFromShard(opCtx, db->getPrimaryId(), nss, &result);
+ if (!routingInfo.cm()) {
+ _dropUnshardedCollectionFromShard(opCtx, routingInfo.primaryId(), nss, &result);
} else {
uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->dropCollection(opCtx, nss));
- db->markNSNotSharded(nss.ns());
+ catalogCache->invalidateShardedCollection(nss);
}
return true;
diff --git a/src/mongo/s/commands/cluster_drop_database_cmd.cpp b/src/mongo/s/commands/cluster_drop_database_cmd.cpp
index f86cf073273..178fc5f36bc 100644
--- a/src/mongo/s/commands/cluster_drop_database_cmd.cpp
+++ b/src/mongo/s/commands/cluster_drop_database_cmd.cpp
@@ -40,9 +40,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/commands/sharded_command_processing.h"
-#include "mongo/s/config.h"
#include "mongo/s/grid.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -93,17 +91,19 @@ public:
auto scopedDistLock = uassertStatusOK(catalogClient->getDistLockManager()->lock(
opCtx, dbname, "dropDatabase", DistLockManager::kDefaultLockTimeout));
+ auto const catalogCache = Grid::get(opCtx)->catalogCache();
+
// Refresh the database metadata so it kicks off a full reload
- Grid::get(opCtx)->catalogCache()->invalidate(dbname);
+ catalogCache->purgeDatabase(dbname);
- auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, dbname);
+ auto dbInfoStatus = catalogCache->getDatabase(opCtx, dbname);
- if (scopedDbStatus == ErrorCodes::NamespaceNotFound) {
+ if (dbInfoStatus == ErrorCodes::NamespaceNotFound) {
result.append("info", "database does not exist");
return true;
}
- uassertStatusOK(scopedDbStatus.getStatus());
+ uassertStatusOK(dbInfoStatus.getStatus());
catalogClient->logChange(opCtx,
"dropDatabase.start",
@@ -111,16 +111,15 @@ public:
BSONObj(),
ShardingCatalogClient::kMajorityWriteConcern);
- auto const db = scopedDbStatus.getValue().db();
+ auto& dbInfo = dbInfoStatus.getValue();
// Drop the database's collections from metadata
for (const auto& nss : getAllShardedCollectionsForDb(opCtx, dbname)) {
uassertStatusOK(catalogClient->dropCollection(opCtx, nss));
- db->markNSNotSharded(nss.ns());
}
// Drop the database from the primary shard first
- _dropDatabaseFromShard(opCtx, db->getPrimaryId(), dbname);
+ _dropDatabaseFromShard(opCtx, dbInfo.primaryId(), dbname);
// Drop the database from each of the remaining shards
{
@@ -146,7 +145,7 @@ public:
}
// Invalidate the database so the next access will do a full reload
- Grid::get(opCtx)->catalogCache()->invalidate(dbname);
+ catalogCache->purgeDatabase(dbname);
catalogClient->logChange(
opCtx, "dropDatabase", dbname, BSONObj(), ShardingCatalogClient::kMajorityWriteConcern);
diff --git a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp
index 1db7ea7ef03..64537920cb7 100644
--- a/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp
+++ b/src/mongo/s/commands/cluster_enable_sharding_cmd.cpp
@@ -41,7 +41,6 @@
#include "mongo/db/commands.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/config.h"
#include "mongo/s/grid.h"
#include "mongo/util/log.h"
@@ -109,7 +108,7 @@ public:
audit::logEnableSharding(Client::getCurrent(), dbname);
// Make sure to force update of any stale metadata
- Grid::get(opCtx)->catalogCache()->invalidate(dbname);
+ Grid::get(opCtx)->catalogCache()->purgeDatabase(dbname);
return true;
}
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 578968205af..feae1fab5e2 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -40,7 +40,6 @@
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_explain.h"
@@ -48,7 +47,6 @@
#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/commands/strategy.h"
#include "mongo/s/grid.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/timer.h"
@@ -63,48 +61,42 @@ class FindAndModifyCmd : public Command {
public:
FindAndModifyCmd() : Command("findAndModify", false, "findandmodify") {}
- virtual bool slaveOk() const {
+ bool slaveOk() const override {
return true;
}
- virtual bool adminOnly() const {
+ bool adminOnly() const override {
return false;
}
-
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return true;
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) override {
find_and_modify::addPrivilegesRequiredForFindAndModify(this, dbname, cmdObj, out);
}
- virtual Status explain(OperationContext* opCtx,
- const std::string& dbName,
- const BSONObj& cmdObj,
- ExplainCommon::Verbosity verbosity,
- const rpc::ServerSelectionMetadata& serverSelectionMetadata,
- BSONObjBuilder* out) const {
- const NamespaceString nss = parseNsCollectionRequired(dbName, cmdObj);
+ Status explain(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ ExplainCommon::Verbosity verbosity,
+ const rpc::ServerSelectionMetadata& serverSelectionMetadata,
+ BSONObjBuilder* out) const override {
+ const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
- auto scopedDB = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, dbName));
- const auto conf = scopedDB.db();
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
shared_ptr<ChunkManager> chunkMgr;
shared_ptr<Shard> shard;
- if (!conf->isSharded(nss.ns())) {
- auto shardStatus =
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId());
- if (!shardStatus.isOK()) {
- return shardStatus.getStatus();
- }
- shard = shardStatus.getValue();
+ if (!routingInfo.cm()) {
+ shard = routingInfo.primary();
} else {
- chunkMgr = _getChunkManager(opCtx, conf, nss);
+ chunkMgr = routingInfo.cm();
const BSONObj query = cmdObj.getObjectField("query");
@@ -118,7 +110,7 @@ public:
return collationElementStatus;
}
- StatusWith<BSONObj> status = _getShardKey(opCtx, chunkMgr, query);
+ StatusWith<BSONObj> status = _getShardKey(opCtx, *chunkMgr, query);
if (!status.isOK()) {
return status.getStatus();
}
@@ -131,6 +123,7 @@ public:
if (!shardStatus.isOK()) {
return shardStatus.getStatus();
}
+
shard = shardStatus.getValue();
}
@@ -143,7 +136,7 @@ public:
Timer timer;
BSONObjBuilder result;
- bool ok = _runCommand(opCtx, conf, chunkMgr, shard->getId(), nss, explainCmd.obj(), result);
+ bool ok = _runCommand(opCtx, chunkMgr, shard->getId(), nss, explainCmd.obj(), result);
long long millisElapsed = timer.millis();
if (!ok) {
@@ -164,24 +157,23 @@ public:
opCtx, shardResults, ClusterExplain::kSingleShard, millisElapsed, out);
}
- virtual bool run(OperationContext* opCtx,
- const std::string& dbName,
- BSONObj& cmdObj,
- int options,
- std::string& errmsg,
- BSONObjBuilder& result) {
+ bool run(OperationContext* opCtx,
+ const std::string& dbName,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& result) override {
const NamespaceString nss = parseNsCollectionRequired(dbName, cmdObj);
// findAndModify should only be creating database if upsert is true, but this would require
// that the parsing be pulled into this function.
- auto scopedDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, dbName));
- const auto conf = scopedDb.db();
-
- if (!conf->isSharded(nss.ns())) {
- return _runCommand(opCtx, conf, nullptr, conf->getPrimaryId(), nss, cmdObj, result);
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ if (!routingInfo.cm()) {
+ return _runCommand(opCtx, nullptr, routingInfo.primaryId(), nss, cmdObj, result);
}
- shared_ptr<ChunkManager> chunkMgr = _getChunkManager(opCtx, conf, nss);
+ const auto chunkMgr = routingInfo.cm();
const BSONObj query = cmdObj.getObjectField("query");
@@ -195,17 +187,11 @@ public:
return appendCommandStatus(result, collationElementStatus);
}
- StatusWith<BSONObj> status = _getShardKey(opCtx, chunkMgr, query);
- if (!status.isOK()) {
- // Bad query
- return appendCommandStatus(result, status.getStatus());
- }
+ BSONObj shardKey = uassertStatusOK(_getShardKey(opCtx, *chunkMgr, query));
- BSONObj shardKey = status.getValue();
auto chunk = chunkMgr->findIntersectingChunk(shardKey, collation);
- const bool ok =
- _runCommand(opCtx, conf, chunkMgr, chunk->getShardId(), nss, cmdObj, result);
+ const bool ok = _runCommand(opCtx, chunkMgr, chunk->getShardId(), nss, cmdObj, result);
if (ok) {
updateChunkWriteStatsAndSplitIfNeeded(
opCtx, chunkMgr.get(), chunk.get(), cmdObj.getObjectField("update").objsize());
@@ -215,21 +201,12 @@ public:
}
private:
- shared_ptr<ChunkManager> _getChunkManager(OperationContext* opCtx,
- DBConfig* conf,
- const NamespaceString& nss) const {
- shared_ptr<ChunkManager> chunkMgr = conf->getChunkManager(opCtx, nss.ns());
- massert(13002, "shard internal error chunk manager should never be null", chunkMgr);
-
- return chunkMgr;
- }
-
- StatusWith<BSONObj> _getShardKey(OperationContext* opCtx,
- shared_ptr<ChunkManager> chunkMgr,
- const BSONObj& query) const {
+ static StatusWith<BSONObj> _getShardKey(OperationContext* opCtx,
+ const ChunkManager& chunkMgr,
+ const BSONObj& query) {
// Verify that the query has an equality predicate using the shard key
StatusWith<BSONObj> status =
- chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(opCtx, query);
+ chunkMgr.getShardKeyPattern().extractShardKeyFromQuery(opCtx, query);
if (!status.isOK()) {
return status;
@@ -245,20 +222,19 @@ private:
return shardKey;
}
- bool _runCommand(OperationContext* opCtx,
- DBConfig* conf,
- shared_ptr<ChunkManager> chunkManager,
- const ShardId& shardId,
- const NamespaceString& nss,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) const {
+ static bool _runCommand(OperationContext* opCtx,
+ shared_ptr<ChunkManager> chunkManager,
+ const ShardId& shardId,
+ const NamespaceString& nss,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) {
BSONObj res;
const auto shard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
ShardConnection conn(shard->getConnString(), nss.ns(), chunkManager);
- bool ok = conn->runCommand(conf->name(), cmdObj, res);
+ bool ok = conn->runCommand(nss.db().toString(), cmdObj, res);
conn.done();
// ErrorCodes::RecvStaleConfig is the code for RecvStaleConfigException.
diff --git a/src/mongo/s/commands/cluster_flush_router_config_cmd.cpp b/src/mongo/s/commands/cluster_flush_router_config_cmd.cpp
index 35150ac3aca..8931166863c 100644
--- a/src/mongo/s/commands/cluster_flush_router_config_cmd.cpp
+++ b/src/mongo/s/commands/cluster_flush_router_config_cmd.cpp
@@ -70,7 +70,7 @@ public:
int options,
std::string& errmsg,
BSONObjBuilder& result) {
- Grid::get(opCtx)->catalogCache()->invalidateAll();
+ Grid::get(opCtx)->catalogCache()->purgeAllDatabases();
result.appendBool("flushed", true);
return true;
diff --git a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp
index 00e104c1e45..2bb23453bba 100644
--- a/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp
+++ b/src/mongo/s/commands/cluster_get_shard_version_cmd.cpp
@@ -35,8 +35,9 @@
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/grid.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -86,13 +87,10 @@ public:
BSONObjBuilder& result) override {
const NamespaceString nss(parseNs(dbname, cmdObj));
- auto scopedDb = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, nss.db()));
- auto config = scopedDb.db();
+ auto routingInfo = getShardedCollection(opCtx, nss);
+ const auto cm = routingInfo.cm();
- auto cm = config->getChunkManagerIfExists(opCtx, nss.ns());
- uassert(ErrorCodes::NamespaceNotSharded, "ns [" + nss.ns() + " is not sharded.", cm);
-
- for (const auto& cmEntry : cm->getChunkMap()) {
+ for (const auto& cmEntry : cm->chunkMap()) {
log() << redact(cmEntry.second->toString());
}
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index 088b8d6d4d1..b155d322b3e 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -45,27 +45,17 @@
#include "mongo/s/catalog/dist_lock_manager.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/commands/cluster_write.h"
#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/commands/strategy.h"
-#include "mongo/s/config.h"
#include "mongo/s/grid.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/stdx/chrono.h"
#include "mongo/util/log.h"
namespace mongo {
-
-using std::shared_ptr;
-using std::map;
-using std::set;
-using std::string;
-using std::vector;
-
namespace {
AtomicUInt32 JOB_NUMBER;
@@ -75,7 +65,7 @@ const Milliseconds kNoDistLockTimeout(-1);
/**
* Generates a unique name for the temporary M/R output collection.
*/
-string getTmpName(StringData coll) {
+std::string getTmpName(StringData coll) {
return str::stream() << "tmp.mrs." << coll << "_" << time(0) << "_"
<< JOB_NUMBER.fetchAndAdd(1);
}
@@ -85,14 +75,14 @@ string getTmpName(StringData coll) {
* be sent to the shards as part of the first phase of map/reduce.
*/
BSONObj fixForShards(const BSONObj& orig,
- const string& output,
- string& badShardedField,
+ const std::string& output,
+ std::string& badShardedField,
int maxChunkSizeBytes) {
BSONObjBuilder b;
BSONObjIterator i(orig);
while (i.more()) {
BSONElement e = i.next();
- const string fn = e.fieldName();
+ const std::string fn = e.fieldName();
if (fn == bypassDocumentValidationCommandOption() || fn == "map" || fn == "mapreduce" ||
fn == "mapReduce" || fn == "mapparams" || fn == "reduce" || fn == "query" ||
@@ -160,47 +150,49 @@ class MRCmd : public Command {
public:
MRCmd() : Command("mapReduce", false, "mapreduce") {}
- virtual bool slaveOk() const {
+ bool slaveOk() const override {
return true;
}
- virtual bool adminOnly() const {
+ bool adminOnly() const override {
return false;
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
+ return parseNsCollectionRequired(dbname, cmdObj).ns();
+ }
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return mr::mrSupportsWriteConcern(cmd);
}
- virtual void help(std::stringstream& help) const {
+ void help(std::stringstream& help) const override {
help << "Runs the sharded map/reduce command";
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) override {
mr::addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out);
}
- virtual bool run(OperationContext* opCtx,
- const std::string& dbname,
- BSONObj& cmdObj,
- int options,
- std::string& errmsg,
- BSONObjBuilder& result) {
+ bool run(OperationContext* opCtx,
+ const std::string& dbname,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& result) override {
Timer t;
const NamespaceString nss(parseNs(dbname, cmdObj));
- uassert(ErrorCodes::InvalidNamespace, "Invalid namespace", nss.isValid());
-
- const string shardResultCollection = getTmpName(nss.coll());
+ const std::string shardResultCollection = getTmpName(nss.coll());
bool shardedOutput = false;
- NamespaceString outputCollNss;
bool customOutDB = false;
+ NamespaceString outputCollNss;
bool inlineOutput = false;
- string outDB = dbname;
+ std::string outDB = dbname;
BSONElement outElmt = cmdObj.getField("out");
if (outElmt.type() == Object) {
@@ -218,7 +210,8 @@ public:
!customOut.hasField("db"));
} else {
// Mode must be 1st element
- const string finalColShort = customOut.firstElement().str();
+ const std::string finalColShort = customOut.firstElement().str();
+
if (customOut.hasField("db")) {
customOutDB = true;
outDB = customOut.getField("db").str();
@@ -231,44 +224,27 @@ public:
}
}
- // Ensure the input database exists
- auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbname);
- if (!status.isOK()) {
- return appendCommandStatus(result, status.getStatus());
- }
+ auto const catalogCache = Grid::get(opCtx)->catalogCache();
- shared_ptr<DBConfig> confIn = status.getValue();
+ // Ensure the input database exists and set up the input collection
+ auto inputRoutingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
- shared_ptr<DBConfig> confOut;
- if (customOutDB) {
- // Create the output database implicitly, since we have a custom output requested
- auto scopedDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, outDB));
- confOut = scopedDb.getSharedDbReference();
- } else {
- confOut = confIn;
- }
+ const bool shardedInput = inputRoutingInfo.cm() != nullptr;
- if (confOut->getPrimaryId() == "config" && !inlineOutput) {
- return appendCommandStatus(
- result,
- Status(ErrorCodes::CommandNotSupported,
- str::stream() << "Can not execute mapReduce with output database " << outDB
- << " which lives on config servers"));
+ // Create the output database implicitly if we have a custom output requested
+ if (customOutDB) {
+ uassertStatusOK(createShardDatabase(opCtx, outDB));
}
- const bool shardedInput = confIn && confIn->isSharded(nss.ns());
-
- if (!shardedOutput) {
- uassert(15920,
- "Cannot output to a non-sharded collection because "
- "sharded collection exists already",
- !confOut->isSharded(outputCollNss.ns()));
-
- // TODO: Should we also prevent going from non-sharded to sharded? During the
- // transition client may see partial data.
- }
+ // Ensure that the output database doesn't reside on the config server
+ auto outputDbInfo = uassertStatusOK(catalogCache->getDatabase(opCtx, outDB));
+ uassert(ErrorCodes::CommandNotSupported,
+ str::stream() << "Can not execute mapReduce with output database " << outDB
+ << " which lives on config servers",
+ inlineOutput || outputDbInfo.primaryId() != "config");
int64_t maxChunkSizeBytes = 0;
+
if (shardedOutput) {
// Will need to figure out chunks, ask shards for points
maxChunkSizeBytes = cmdObj["maxChunkSizeBytes"].numberLong();
@@ -279,29 +255,40 @@ public:
// maxChunkSizeBytes is sent as int BSON field
invariant(maxChunkSizeBytes < std::numeric_limits<int>::max());
+ } else if (outputCollNss.isValid()) {
+ auto outputRoutingInfo =
+ uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, outputCollNss));
+
+ uassert(15920,
+ "Cannot output to a non-sharded collection because "
+ "sharded collection exists already",
+ !outputRoutingInfo.cm());
+
+ // TODO: Should we also prevent going from non-sharded to sharded? During the
+ // transition client may see partial data.
}
const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
// modify command to run on shards with output to tmp collection
- string badShardedField;
+ std::string badShardedField;
BSONObj shardedCommand =
fixForShards(cmdObj, shardResultCollection, badShardedField, maxChunkSizeBytes);
if (!shardedInput && !shardedOutput && !customOutDB) {
LOG(1) << "simple MR, just passthrough";
- const auto shard =
- uassertStatusOK(shardRegistry->getShard(opCtx, confIn->getPrimaryId()));
+ invariant(inputRoutingInfo.primary());
- ShardConnection conn(shard->getConnString(), "");
+ ShardConnection conn(inputRoutingInfo.primary()->getConnString(), "");
BSONObj res;
bool ok = conn->runCommand(dbname, cmdObj, res);
conn.done();
if (auto wcErrorElem = res["writeConcernError"]) {
- appendWriteConcernErrorToCmdResponse(shard->getId(), wcErrorElem, result);
+ appendWriteConcernErrorToCmdResponse(
+ inputRoutingInfo.primary()->getId(), wcErrorElem, result);
}
result.appendElementsUnique(res);
@@ -323,12 +310,13 @@ public:
collation = cmdObj["collation"].embeddedObjectUserCheck();
}
- set<string> servers;
- vector<Strategy::CommandResult> mrCommandResults;
+ std::set<std::string> servers;
+ std::vector<Strategy::CommandResult> mrCommandResults;
BSONObjBuilder shardResultsB;
BSONObjBuilder shardCountsB;
- map<string, int64_t> countsMap;
+ std::map<std::string, int64_t> countsMap;
+
auto splitPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
{
@@ -349,12 +337,12 @@ public:
for (const auto& mrResult : mrCommandResults) {
// Need to gather list of all servers even if an error happened
- string server;
- {
+ const auto server = [&]() {
const auto shard =
uassertStatusOK(shardRegistry->getShard(opCtx, mrResult.shardTargetId));
- server = shard->getConnString().toString();
- }
+ return shard->getConnString().toString();
+ }();
+
servers.insert(server);
if (!ok) {
@@ -386,15 +374,14 @@ public:
if (singleResult.hasField("splitKeys")) {
BSONElement splitKeys = singleResult.getField("splitKeys");
- vector<BSONElement> pts = splitKeys.Array();
- for (vector<BSONElement>::iterator it = pts.begin(); it != pts.end(); ++it) {
- splitPts.insert(it->Obj().getOwned());
+ for (const auto& splitPt : splitKeys.Array()) {
+ splitPts.insert(splitPt.Obj().getOwned());
}
}
}
if (!ok) {
- _cleanUp(servers, dbname, shardResultCollection);
+ cleanUp(servers, dbname, shardResultCollection);
// Add "code" to the top-level response, if the failure of the sharded command
// can be accounted to a single error.
@@ -442,16 +429,15 @@ public:
bool ok = true;
BSONObj singleResult;
- bool hasWCError = false;
if (!shardedOutput) {
- const auto shard =
- uassertStatusOK(shardRegistry->getShard(opCtx, confOut->getPrimaryId()));
+ LOG(1) << "MR with single shard output, NS=" << outputCollNss
+ << " primary=" << outputDbInfo.primaryId();
- LOG(1) << "MR with single shard output, NS=" << outputCollNss.ns()
- << " primary=" << shard->toString();
+ const auto outputShard =
+ uassertStatusOK(shardRegistry->getShard(opCtx, outputDbInfo.primaryId()));
- ShardConnection conn(shard->getConnString(), outputCollNss.ns());
+ ShardConnection conn(outputShard->getConnString(), outputCollNss.ns());
ok = conn->runCommand(outDB, finalCmd.obj(), singleResult);
BSONObj counts = singleResult.getObjectField("counts");
@@ -460,79 +446,19 @@ public:
outputCount = counts.getIntField("output");
conn.done();
- if (!hasWCError) {
- if (auto wcErrorElem = singleResult["writeConcernError"]) {
- appendWriteConcernErrorToCmdResponse(shard->getId(), wcErrorElem, result);
- hasWCError = true;
- }
+
+ if (auto wcErrorElem = singleResult["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(outputShard->getId(), wcErrorElem, result);
}
} else {
LOG(1) << "MR with sharded output, NS=" << outputCollNss.ns();
- // Create the sharded collection if needed
- if (!confOut->isSharded(outputCollNss.ns())) {
- // Enable sharding on the output db
- Status status = Grid::get(opCtx)->catalogClient(opCtx)->enableSharding(
- opCtx, outputCollNss.db().toString());
-
- // If the database has sharding already enabled, we can ignore the error
- if (status.isOK()) {
- // Invalidate the output database so it gets reloaded on the next fetch attempt
- Grid::get(opCtx)->catalogCache()->invalidate(outputCollNss.db());
- } else if (status != ErrorCodes::AlreadyInitialized) {
- uassertStatusOK(status);
- }
-
- confOut.reset();
- confOut = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(
- opCtx, outputCollNss.db().toString()));
+ auto outputRoutingInfo =
+ uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, outputCollNss));
- // Shard collection according to split points
- vector<BSONObj> sortedSplitPts;
-
- // Points will be properly sorted using the set
- for (const auto& splitPt : splitPts) {
- sortedSplitPts.push_back(splitPt);
- }
-
- // Pre-split the collection onto all the shards for this database. Note that
- // it's not completely safe to pre-split onto non-primary shards using the
- // shardcollection method (a conflict may result if multiple map-reduces are
- // writing to the same output collection, for instance).
- //
- // TODO: pre-split mapReduce output in a safer way.
-
- const std::set<ShardId> outShardIds = [&]() {
- std::vector<ShardId> shardIds;
- shardRegistry->getAllShardIds(&shardIds);
- uassert(ErrorCodes::ShardNotFound,
- str::stream()
- << "Unable to find shards on which to place output collection "
- << outputCollNss.ns(),
- !shardIds.empty());
-
- return std::set<ShardId>(shardIds.begin(), shardIds.end());
- }();
-
-
- BSONObj sortKey = BSON("_id" << 1);
- ShardKeyPattern sortKeyPattern(sortKey);
-
- // The collection default collation for the output collection. This is empty,
- // representing the simple binary comparison collation.
- BSONObj defaultCollation;
-
- uassertStatusOK(
- Grid::get(opCtx)->catalogClient(opCtx)->shardCollection(opCtx,
- outputCollNss.ns(),
- sortKeyPattern,
- defaultCollation,
- true,
- sortedSplitPts,
- outShardIds));
-
- // Make sure the cached metadata for the collection knows that we are now sharded
- confOut->getChunkManager(opCtx, outputCollNss.ns(), true /* reload */);
+ // Create the sharded collection if needed
+ if (!outputRoutingInfo.cm()) {
+ outputRoutingInfo = createShardedOutputCollection(opCtx, outputCollNss, splitPts);
}
auto chunkSizes = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<int>();
@@ -567,14 +493,16 @@ public:
throw;
}
+ bool hasWCError = false;
+
for (const auto& mrResult : mrCommandResults) {
- string server;
- {
+ const auto server = [&]() {
const auto shard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(
opCtx, mrResult.shardTargetId));
- server = shard->getConnString().toString();
- }
+ return shard->getConnString().toString();
+ }();
+
singleResult = mrResult.result;
if (!hasWCError) {
if (auto wcErrorElem = singleResult["writeConcernError"]) {
@@ -597,7 +525,8 @@ public:
// get the size inserted for each chunk
// split cannot be called here since we already have the distributed lock
if (singleResult.hasField("chunkSizes")) {
- vector<BSONElement> sizes = singleResult.getField("chunkSizes").Array();
+ std::vector<BSONElement> sizes =
+ singleResult.getField("chunkSizes").Array();
for (unsigned int i = 0; i < sizes.size(); i += 2) {
BSONObj key = sizes[i].Obj().getOwned();
const long long size = sizes[i + 1].numberLong();
@@ -610,34 +539,37 @@ public:
}
// Do the splitting round
- shared_ptr<ChunkManager> cm =
- confOut->getChunkManagerIfExists(opCtx, outputCollNss.ns());
+ catalogCache->onStaleConfigError(std::move(outputRoutingInfo));
+ outputRoutingInfo =
+ uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, outputCollNss));
uassert(34359,
str::stream() << "Failed to write mapreduce output to " << outputCollNss.ns()
<< "; expected that collection to be sharded, but it was not",
- cm);
+ outputRoutingInfo.cm());
+
+ const auto outputCM = outputRoutingInfo.cm();
for (const auto& chunkSize : chunkSizes) {
BSONObj key = chunkSize.first;
const int size = chunkSize.second;
invariant(size < std::numeric_limits<int>::max());
- // key reported should be the chunk's minimum
- shared_ptr<Chunk> c = cm->findIntersectingChunkWithSimpleCollation(key);
+ // Key reported should be the chunk's minimum
+ auto c = outputCM->findIntersectingChunkWithSimpleCollation(key);
if (!c) {
warning() << "Mongod reported " << size << " bytes inserted for key " << key
<< " but can't find chunk";
} else {
- updateChunkWriteStatsAndSplitIfNeeded(opCtx, cm.get(), c.get(), size);
+ updateChunkWriteStatsAndSplitIfNeeded(opCtx, outputCM.get(), c.get(), size);
}
}
}
- _cleanUp(servers, dbname, shardResultCollection);
+ cleanUp(servers, dbname, shardResultCollection);
if (!ok) {
errmsg = str::stream() << "MR post processing failed: " << singleResult.toString();
- return 0;
+ return false;
}
// copy some elements from a single result
@@ -672,9 +604,69 @@ public:
private:
/**
+ * Creates and shards the collection for the output results.
+ */
+ static CachedCollectionRoutingInfo createShardedOutputCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObjSet& splitPts) {
+ auto const catalogClient = Grid::get(opCtx)->catalogClient(opCtx);
+ auto const catalogCache = Grid::get(opCtx)->catalogCache();
+ auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
+
+ // Enable sharding on the output db
+ Status status = catalogClient->enableSharding(opCtx, nss.db().toString());
+
+ // If the database has sharding already enabled, we can ignore the error
+ if (status.isOK()) {
+ // Invalidate the output database so it gets reloaded on the next fetch attempt
+ catalogCache->purgeDatabase(nss.db());
+ } else if (status != ErrorCodes::AlreadyInitialized) {
+ uassertStatusOK(status);
+ }
+
+ // Points will be properly sorted using the set
+ const std::vector<BSONObj> sortedSplitPts(splitPts.begin(), splitPts.end());
+
+ // Pre-split the collection onto all the shards for this database. Note that
+ // it's not completely safe to pre-split onto non-primary shards using the
+ // shardcollection method (a conflict may result if multiple map-reduces are
+ // writing to the same output collection, for instance).
+ //
+ // TODO: pre-split mapReduce output in a safer way.
+
+ const std::set<ShardId> outShardIds = [&]() {
+ std::vector<ShardId> shardIds;
+ shardRegistry->getAllShardIds(&shardIds);
+ uassert(ErrorCodes::ShardNotFound,
+ str::stream() << "Unable to find shards on which to place output collection "
+ << nss.ns(),
+ !shardIds.empty());
+
+ return std::set<ShardId>(shardIds.begin(), shardIds.end());
+ }();
+
+
+ BSONObj sortKey = BSON("_id" << 1);
+ ShardKeyPattern sortKeyPattern(sortKey);
+
+ // The collection default collation for the output collection. This is empty,
+ // representing the simple binary comparison collation.
+ BSONObj defaultCollation;
+
+ uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->shardCollection(
+ opCtx, nss.ns(), sortKeyPattern, defaultCollation, true, sortedSplitPts, outShardIds));
+
+ // Make sure the cached metadata for the collection knows that we are now sharded
+ catalogCache->invalidateShardedCollection(nss);
+ return uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
+ }
+
+ /**
* Drops the temporary results collections from each shard.
*/
- void _cleanUp(const set<string>& servers, string dbName, string shardResultCollection) {
+ static void cleanUp(const std::set<std::string>& servers,
+ const std::string& dbName,
+ const std::string& shardResultCollection) {
try {
// drop collections with tmp results on each shard
for (const auto& server : servers) {
diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
index 6b247823381..2a85b645df0 100644
--- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
+++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
@@ -37,12 +37,10 @@
#include "mongo/db/namespace_string.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/config.h"
+#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/grid.h"
-#include "mongo/s/sharding_raii.h"
namespace mongo {
@@ -60,14 +58,14 @@ class ClusterMergeChunksCommand : public Command {
public:
ClusterMergeChunksCommand() : Command("mergeChunks") {}
- virtual void help(stringstream& h) const {
+ void help(stringstream& h) const override {
h << "Merge Chunks command\n"
<< "usage: { mergeChunks : <ns>, bounds : [ <min key>, <max key> ] }";
}
- virtual Status checkAuthForCommand(Client* client,
- const std::string& dbname,
- const BSONObj& cmdObj) {
+ Status checkAuthForCommand(Client* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) override {
if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))),
ActionType::splitChunk)) {
@@ -76,17 +74,19 @@ public:
return Status::OK();
}
- virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const {
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
return parseNsFullyQualified(dbname, cmdObj);
}
- virtual bool adminOnly() const {
+ bool adminOnly() const override {
return true;
}
- virtual bool slaveOk() const {
+
+ bool slaveOk() const override {
return false;
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
@@ -104,10 +104,13 @@ public:
BSONObj& cmdObj,
int,
string& errmsg,
- BSONObjBuilder& result) {
+ BSONObjBuilder& result) override {
const NamespaceString nss(parseNs(dbname, cmdObj));
- auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss));
+ auto routingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx,
+ nss));
+ const auto cm = routingInfo.cm();
vector<BSONObj> bounds;
if (!FieldParser::extract(cmdObj, boundsField, &bounds, &errmsg)) {
@@ -137,8 +140,6 @@ public:
return false;
}
- auto const cm = scopedCM.cm();
-
if (!cm->getShardKeyPattern().isShardKey(minKey) ||
!cm->getShardKeyPattern().isShardKey(maxKey)) {
errmsg = stream() << "shard key bounds "
@@ -179,6 +180,8 @@ public:
bool ok = conn->runCommand("admin", remoteCmdObjB.obj(), remoteResult);
conn.done();
+ Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
+
result.appendElements(remoteResult);
return ok;
}
diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
index c3cb18ceb15..e597e80eeaa 100644
--- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
@@ -40,12 +40,11 @@
#include "mongo/db/write_concern_options.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/config_server_client.h"
#include "mongo/s/grid.h"
#include "mongo/s/migration_secondary_throttle_options.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/util/log.h"
#include "mongo/util/timer.h"
@@ -60,19 +59,19 @@ class MoveChunkCmd : public Command {
public:
MoveChunkCmd() : Command("moveChunk", false, "movechunk") {}
- virtual bool slaveOk() const {
+ bool slaveOk() const override {
return true;
}
- virtual bool adminOnly() const {
+ bool adminOnly() const override {
return true;
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return true;
}
- virtual void help(std::stringstream& help) const {
+ void help(std::stringstream& help) const override {
help << "Example: move chunk that contains the doc {num : 7} to shard001\n"
<< " { movechunk : 'test.foo' , find : { num : 7 } , to : 'shard0001' }\n"
<< "Example: move chunk with lower bound 0 and upper bound 10 to shard001\n"
@@ -80,9 +79,9 @@ public:
<< " , to : 'shard001' }\n";
}
- virtual Status checkAuthForCommand(Client* client,
- const std::string& dbname,
- const BSONObj& cmdObj) {
+ Status checkAuthForCommand(Client* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) override {
if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))),
ActionType::moveChunk)) {
@@ -92,21 +91,24 @@ public:
return Status::OK();
}
- virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const {
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
return parseNsFullyQualified(dbname, cmdObj);
}
- virtual bool run(OperationContext* opCtx,
- const std::string& dbname,
- BSONObj& cmdObj,
- int options,
- std::string& errmsg,
- BSONObjBuilder& result) {
+ bool run(OperationContext* opCtx,
+ const std::string& dbname,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& result) override {
Timer t;
const NamespaceString nss(parseNs(dbname, cmdObj));
- auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss));
+ auto routingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx,
+ nss));
+ const auto cm = routingInfo.cm();
const auto toElt = cmdObj["to"];
uassert(ErrorCodes::TypeMismatch,
@@ -145,8 +147,6 @@ public:
return false;
}
- auto const cm = scopedCM.cm();
-
shared_ptr<Chunk> chunk;
if (!find.isEmpty()) {
@@ -199,9 +199,7 @@ public:
secondaryThrottle,
cmdObj["_waitForDelete"].trueValue()));
- // Proactively refresh the chunk manager. Not strictly necessary, but this way it's
- // immediately up-to-date the next time it's used.
- scopedCM.db()->getChunkManagerIfExists(opCtx, nss.ns(), true);
+ Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
result.append("millis", t.millis());
return true;
diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp
index dc192f1a6a6..e1746bebd72 100644
--- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp
@@ -49,7 +49,6 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/commands/sharded_command_processing.h"
-#include "mongo/s/config.h"
#include "mongo/s/grid.h"
#include "mongo/util/log.h"
@@ -125,9 +124,9 @@ public:
auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
// Flush all cached information. This can't be perfect, but it's better than nothing.
- catalogCache->invalidate(dbname);
+ catalogCache->purgeDatabase(dbname);
- auto config = uassertStatusOK(catalogCache->getDatabase(opCtx, dbname));
+ auto dbInfo = uassertStatusOK(catalogCache->getDatabase(opCtx, dbname));
const auto toElt = cmdObj["to"];
uassert(ErrorCodes::TypeMismatch,
@@ -139,8 +138,7 @@ public:
return false;
}
- const auto fromShard =
- uassertStatusOK(shardRegistry->getShard(opCtx, config->getPrimaryId()));
+ const auto fromShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbInfo.primaryId()));
const auto toShard = [&]() {
auto toShardStatus = shardRegistry->getShard(opCtx, to);
@@ -223,7 +221,7 @@ public:
// Ensure the next attempt to retrieve the database or any of its collections will do a full
// reload
- catalogCache->invalidate(dbname);
+ catalogCache->purgeDatabase(dbname);
const string oldPrimary = fromShard->getConnString().toString();
diff --git a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp
index b29eff8b0b2..65f149927d0 100644
--- a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp
+++ b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp
@@ -33,11 +33,11 @@
#include "mongo/db/commands.h"
#include "mongo/db/query/collation/collation_spec.h"
#include "mongo/s/commands/strategy.h"
-#include "mongo/s/config.h"
#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
namespace mongo {
+namespace {
using std::string;
using std::stringstream;
@@ -153,8 +153,6 @@ bool ClusterPlanCacheCmd::run(OperationContext* opCtx,
// Register plan cache commands at startup
//
-namespace {
-
MONGO_INITIALIZER(RegisterPlanCacheCommands)(InitializerContext* context) {
// Leaked intentionally: a Command registers itself when constructed.
@@ -174,5 +172,4 @@ MONGO_INITIALIZER(RegisterPlanCacheCommands)(InitializerContext* context) {
}
} // namespace
-
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
index 7692e764e02..a1d99a608e6 100644
--- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
+++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
@@ -53,15 +53,12 @@
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_write.h"
-#include "mongo/s/config.h"
#include "mongo/s/config_server_client.h"
#include "mongo/s/grid.h"
#include "mongo/s/migration_secondary_throttle_options.h"
#include "mongo/s/shard_util.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -187,19 +184,21 @@ public:
auto const catalogClient = Grid::get(opCtx)->catalogClient(opCtx);
auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
+ auto const catalogCache = Grid::get(opCtx)->catalogCache();
- auto scopedShardedDb = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, nss.db()));
- const auto config = scopedShardedDb.db();
+ auto dbInfo = uassertStatusOK(catalogCache->getDatabase(opCtx, nss.db()));
// Ensure sharding is allowed on the database
uassert(ErrorCodes::IllegalOperation,
str::stream() << "sharding not enabled for db " << nss.db(),
- config->isShardingEnabled());
+ dbInfo.shardingEnabled());
+
+ auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
// Ensure that the collection is not sharded already
uassert(ErrorCodes::IllegalOperation,
str::stream() << "sharding already enabled for collection " << nss.ns(),
- !config->isSharded(nss.ns()));
+ !routingInfo.cm());
// NOTE: We *must* take ownership of the key here - otherwise the shared BSONObj becomes
// corrupt as soon as the command ends.
@@ -279,13 +278,7 @@ public:
}
// The rest of the checks require a connection to the primary db
- const ConnectionString shardConnString = [&]() {
- const auto shard =
- uassertStatusOK(shardRegistry->getShard(opCtx, config->getPrimaryId()));
- return shard->getConnString();
- }();
-
- ScopedDbConnection conn(shardConnString);
+ ScopedDbConnection conn(routingInfo.primary()->getConnString());
// Retrieve the collection metadata in order to verify that it is legal to shard this
// collection.
@@ -590,17 +583,21 @@ public:
initSplits,
std::set<ShardId>{}));
- // Make sure the cached metadata for the collection knows that we are now sharded
- config->getChunkManager(opCtx, nss.ns(), true /* reload */);
-
result << "collectionsharded" << nss.ns();
+ // Make sure the cached metadata for the collection knows that we are now sharded
+ catalogCache->invalidateShardedCollection(nss);
+
// Only initially move chunks when using a hashed shard key
if (isHashedShardKey && isEmpty) {
- // Reload the new config info. If we created more than one initial chunk, then
- // we need to move them around to balance.
- auto chunkManager = config->getChunkManager(opCtx, nss.ns(), true);
- ChunkMap chunkMap = chunkManager->getChunkMap();
+ routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Collection was successfully written as sharded but got dropped before it "
+ "could be evenly distributed",
+ routingInfo.cm());
+ auto chunkManager = routingInfo.cm();
+
+ const auto chunkMap = chunkManager->chunkMap();
// 2. Move and commit each "big chunk" to a different shard.
int i = 0;
@@ -646,7 +643,13 @@ public:
}
// Reload the config info, after all the migrations
- chunkManager = config->getChunkManager(opCtx, nss.ns(), true);
+ catalogCache->invalidateShardedCollection(nss);
+ routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Collection was successfully written as sharded but got dropped before it "
+ "could be evenly distributed",
+ routingInfo.cm());
+ chunkManager = routingInfo.cm();
// 3. Subdivide the big chunks by splitting at each of the points in "allSplits"
// that we haven't already split by.
@@ -689,10 +692,6 @@ public:
subSplits.push_back(splitPoint);
}
}
-
- // Proactively refresh the chunk manager. Not really necessary, but this way it's
- // immediately up-to-date the next time it's used.
- config->getChunkManager(opCtx, nss.ns(), true);
}
return true;
diff --git a/src/mongo/s/commands/cluster_split_cmd.cpp b/src/mongo/s/commands/cluster_split_cmd.cpp
index b63da3b2ee7..80ed1526663 100644
--- a/src/mongo/s/commands/cluster_split_cmd.cpp
+++ b/src/mongo/s/commands/cluster_split_cmd.cpp
@@ -37,15 +37,13 @@
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/field_parser.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_util.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -90,20 +88,19 @@ class SplitCollectionCmd : public Command {
public:
SplitCollectionCmd() : Command("split", false, "split") {}
- virtual bool slaveOk() const {
+ bool slaveOk() const override {
return true;
}
- virtual bool adminOnly() const {
+ bool adminOnly() const override {
return true;
}
-
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
- virtual void help(std::stringstream& help) const {
+ void help(std::stringstream& help) const override {
help << " example: - split the shard that contains give key\n"
<< " { split : 'alleyinsider.blog.posts' , find : { ts : 1 } }\n"
<< " example: - split the shard that contains the key with this as the middle\n"
@@ -111,9 +108,9 @@ public:
<< " NOTE: this does not move the chunks, it just creates a logical separation.";
}
- virtual Status checkAuthForCommand(Client* client,
- const std::string& dbname,
- const BSONObj& cmdObj) {
+ Status checkAuthForCommand(Client* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) override {
if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))),
ActionType::splitChunk)) {
@@ -122,19 +119,22 @@ public:
return Status::OK();
}
- virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const {
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
return parseNsFullyQualified(dbname, cmdObj);
}
- virtual bool run(OperationContext* opCtx,
- const std::string& dbname,
- BSONObj& cmdObj,
- int options,
- std::string& errmsg,
- BSONObjBuilder& result) {
+ bool run(OperationContext* opCtx,
+ const std::string& dbname,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& result) override {
const NamespaceString nss(parseNs(dbname, cmdObj));
- auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss));
+ auto routingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx,
+ nss));
+ const auto cm = routingInfo.cm();
const BSONField<BSONObj> findField("find", BSONObj());
const BSONField<BSONArray> boundsField("bounds", BSONArray());
@@ -190,8 +190,6 @@ public:
return false;
}
- auto const cm = scopedCM.cm();
-
std::shared_ptr<Chunk> chunk;
if (!find.isEmpty()) {
@@ -275,9 +273,7 @@ public:
ChunkRange(chunk->getMin(), chunk->getMax()),
{splitPoint}));
- // Proactively refresh the chunk manager. Not strictly necessary, but this way it's
- // immediately up-to-date the next time it's used.
- scopedCM.db()->getChunkManagerIfExists(opCtx, nss.ns(), true);
+ Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
return true;
}
diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/commands/cluster_write.cpp
index 730d5e8a178..8b8a3f2e644 100644
--- a/src/mongo/s/commands/cluster_write.cpp
+++ b/src/mongo/s/commands/cluster_write.cpp
@@ -39,14 +39,13 @@
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_collection.h"
-#include "mongo/s/chunk.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/chunk_manager_targeter.h"
#include "mongo/s/commands/dbclient_multi_command.h"
#include "mongo/s/config_server_client.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_util.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -66,11 +65,6 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) {
dassert(response->isValid(NULL));
}
-void reloadChunkManager(OperationContext* opCtx, const NamespaceString& nss) {
- auto config = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, nss.db()));
- config.db()->getChunkManagerIfExists(opCtx, nss.ns(), true);
-}
-
/**
* Given a maxChunkSize configuration and the number of chunks in a particular sharded collection,
* returns an optimal chunk size to use in order to achieve a good ratio between number of chunks
@@ -176,30 +170,31 @@ BSONObj findExtremeKeyForShard(OperationContext* opCtx,
void splitIfNeeded(OperationContext* opCtx,
const NamespaceString& nss,
const TargeterStats& stats) {
- auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss);
- if (!scopedCMStatus.isOK()) {
- warning() << "failed to get collection information for " << nss
- << " while checking for auto-split" << causedBy(scopedCMStatus.getStatus());
+ auto routingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss);
+ if (!routingInfoStatus.isOK()) {
+ log() << "failed to get collection information for " << nss
+ << " while checking for auto-split" << causedBy(routingInfoStatus.getStatus());
return;
}
- const auto& scopedCM = scopedCMStatus.getValue();
+ auto& routingInfo = routingInfoStatus.getValue();
- if (!scopedCM.cm()) {
+ if (!routingInfo.cm()) {
return;
}
for (auto it = stats.chunkSizeDelta.cbegin(); it != stats.chunkSizeDelta.cend(); ++it) {
std::shared_ptr<Chunk> chunk;
try {
- chunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation(it->first);
+ chunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation(it->first);
} catch (const AssertionException& ex) {
warning() << "could not find chunk while checking for auto-split: "
<< causedBy(redact(ex));
return;
}
- updateChunkWriteStatsAndSplitIfNeeded(opCtx, scopedCM.cm().get(), chunk.get(), it->second);
+ updateChunkWriteStatsAndSplitIfNeeded(
+ opCtx, routingInfo.cm().get(), chunk.get(), it->second);
}
}
@@ -472,21 +467,22 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx,
<< (suggestedMigrateChunk ? "" : (std::string) " (migrate suggested" +
(shouldBalance ? ")" : ", but no migrations allowed)"));
+ // Reload the chunk manager after the split
+ auto routingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx,
+ nss));
+
if (!shouldBalance || !suggestedMigrateChunk) {
- reloadChunkManager(opCtx, nss);
return;
}
// Top chunk optimization - try to move the top chunk out of this shard to prevent the hot
- // spot
- // from staying on a single shard. This is based on the assumption that succeeding inserts
- // will
- // fall on the top chunk.
+ // spot from staying on a single shard. This is based on the assumption that succeeding
+ // inserts will fall on the top chunk.
// We need to use the latest chunk manager (after the split) in order to have the most
// up-to-date view of the chunk we are about to move
- auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss));
- auto suggestedChunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation(
+ auto suggestedChunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation(
suggestedMigrateChunk->getMin());
ChunkType chunkToMove;
@@ -498,7 +494,8 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx,
uassertStatusOK(configsvr_client::rebalanceChunk(opCtx, chunkToMove));
- reloadChunkManager(opCtx, nss);
+ // Ensure the collection gets reloaded because of the move
+ Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss);
} catch (const DBException& ex) {
chunk->randomizeBytesWritten();
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index 7d14499e9d8..e219d9396f9 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -54,17 +54,14 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/commands/cluster_explain.h"
#include "mongo/s/commands/run_on_all_shards_cmd.h"
#include "mongo/s/commands/sharded_command_processing.h"
-#include "mongo/s/config.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/store_possible_cursor.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/s/stale_exception.h"
#include "mongo/scripting/engine.h"
#include "mongo/util/log.h"
@@ -86,20 +83,19 @@ using std::vector;
namespace {
bool cursorCommandPassthrough(OperationContext* opCtx,
- shared_ptr<DBConfig> conf,
+ StringData dbName,
+ const ShardId& shardId,
const BSONObj& cmdObj,
const NamespaceString& nss,
int options,
BSONObjBuilder* out) {
- const auto shardStatus =
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId());
+ const auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId);
if (!shardStatus.isOK()) {
- invariant(shardStatus.getStatus() == ErrorCodes::ShardNotFound);
return Command::appendCommandStatus(*out, shardStatus.getStatus());
}
const auto shard = shardStatus.getValue();
ScopedDbConnection conn(shard->getConnString());
- auto cursor = conn->query(str::stream() << conf->name() << ".$cmd",
+ auto cursor = conn->query(str::stream() << dbName << ".$cmd",
cmdObj,
-1, // nToReturn
0, // nToSkip
@@ -155,11 +151,13 @@ StatusWith<BSONObj> getCollation(const BSONObj& cmdObj) {
}
class PublicGridCommand : public Command {
-public:
+protected:
PublicGridCommand(const char* n, const char* oldname = NULL) : Command(n, false, oldname) {}
+
virtual bool slaveOk() const {
return true;
}
+
virtual bool adminOnly() const {
return false;
}
@@ -170,41 +168,29 @@ public:
return false;
}
- // all grid commands are designed not to lock
-
-protected:
- bool passthrough(OperationContext* opCtx,
- DBConfig* conf,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) {
- return _passthrough(opCtx, conf->name(), conf, cmdObj, 0, result);
- }
-
bool adminPassthrough(OperationContext* opCtx,
- DBConfig* conf,
+ const ShardId& shardId,
const BSONObj& cmdObj,
BSONObjBuilder& result) {
- return _passthrough(opCtx, "admin", conf, cmdObj, 0, result);
+ return passthrough(opCtx, "admin", shardId, cmdObj, result);
}
bool passthrough(OperationContext* opCtx,
- DBConfig* conf,
+ const std::string& db,
+ const ShardId& shardId,
const BSONObj& cmdObj,
- int options,
BSONObjBuilder& result) {
- return _passthrough(opCtx, conf->name(), conf, cmdObj, options, result);
+ return passthrough(opCtx, db, shardId, cmdObj, 0, result);
}
-private:
- bool _passthrough(OperationContext* opCtx,
- const string& db,
- DBConfig* conf,
- const BSONObj& cmdObj,
- int options,
- BSONObjBuilder& result) {
- const auto shardStatus =
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId());
- const auto shard = uassertStatusOK(shardStatus);
+ bool passthrough(OperationContext* opCtx,
+ const std::string& db,
+ const ShardId& shardId,
+ const BSONObj& cmdObj,
+ int options,
+ BSONObjBuilder& result) {
+ const auto shard =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
ShardConnection conn(shard->getConnString(), "");
@@ -223,53 +209,50 @@ private:
};
class AllShardsCollectionCommand : public RunOnAllShardsCommand {
-public:
+protected:
AllShardsCollectionCommand(const char* n,
const char* oldname = NULL,
bool useShardConn = false,
bool implicitCreateDb = false)
: RunOnAllShardsCommand(n, oldname, useShardConn, implicitCreateDb) {}
- virtual void getShardIds(OperationContext* opCtx,
- const string& dbName,
- BSONObj& cmdObj,
- vector<ShardId>& shardIds) {
+ void getShardIds(OperationContext* opCtx,
+ const string& dbName,
+ BSONObj& cmdObj,
+ vector<ShardId>& shardIds) override {
const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
- auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName);
- uassertStatusOK(status.getStatus());
-
- shared_ptr<DBConfig> conf = status.getValue();
-
- if (!conf->isSharded(nss.ns())) {
- shardIds.push_back(conf->getPrimaryId());
- } else {
+ const auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ if (routingInfo.cm()) {
+ // If it's a sharded collection, send it to all shards
Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
+ } else {
+ // Otherwise just send it to the primary shard for the database
+ shardIds.push_back(routingInfo.primaryId());
}
}
};
class NotAllowedOnShardedCollectionCmd : public PublicGridCommand {
-public:
+protected:
NotAllowedOnShardedCollectionCmd(const char* n) : PublicGridCommand(n) {}
- virtual bool run(OperationContext* opCtx,
- const string& dbName,
- BSONObj& cmdObj,
- int options,
- string& errmsg,
- BSONObjBuilder& result) {
+ bool run(OperationContext* opCtx,
+ const string& dbName,
+ BSONObj& cmdObj,
+ int options,
+ string& errmsg,
+ BSONObjBuilder& result) override {
const NamespaceString nss(parseNs(dbName, cmdObj));
- auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName));
- if (!conf->isSharded(nss.ns())) {
- return passthrough(opCtx, conf.get(), cmdObj, options, result);
- }
+ const auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "can't do command: " << getName() << " on sharded collection",
+ !routingInfo.cm());
- return appendCommandStatus(
- result,
- Status(ErrorCodes::IllegalOperation,
- str::stream() << "can't do command: " << getName() << " on sharded collection"));
+ return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, options, result);
}
};
@@ -407,6 +390,7 @@ public:
class ReIndexCmd : public AllShardsCollectionCommand {
public:
ReIndexCmd() : AllShardsCollectionCommand("reIndex") {}
+
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) {
@@ -418,6 +402,7 @@ public:
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
+
} reIndexCmd;
class CollectionModCmd : public AllShardsCollectionCommand {
@@ -434,12 +419,13 @@ public:
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
return true;
}
-} collectionModCmd;
+} collectionModCmd;
class ValidateCmd : public PublicGridCommand {
public:
ValidateCmd() : PublicGridCommand("validate") {}
+
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) {
@@ -460,13 +446,13 @@ public:
BSONObjBuilder& output) {
const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
- auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName));
- if (!conf->isSharded(nss.ns())) {
- return passthrough(opCtx, conf.get(), cmdObj, output);
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ if (!routingInfo.cm()) {
+ return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, output);
}
- shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns());
- massert(40051, "chunk manager should not be null", cm);
+ const auto cm = routingInfo.cm();
vector<Strategy::CommandResult> results;
const BSONObj query;
@@ -512,33 +498,35 @@ public:
}
return true;
}
+
} validateCmd;
class CreateCmd : public PublicGridCommand {
public:
CreateCmd() : PublicGridCommand("create") {}
- virtual Status checkAuthForCommand(Client* client,
- const std::string& dbname,
- const BSONObj& cmdObj) {
+
+ Status checkAuthForCommand(Client* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) override {
const NamespaceString nss(parseNsCollectionRequired(dbname, cmdObj));
return AuthorizationSession::get(client)->checkAuthForCreate(nss, cmdObj);
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return true;
}
+
bool run(OperationContext* opCtx,
const string& dbName,
BSONObj& cmdObj,
int,
string& errmsg,
- BSONObjBuilder& result) {
- auto dbStatus = ScopedShardDatabase::getOrCreate(opCtx, dbName);
- if (!dbStatus.isOK()) {
- return appendCommandStatus(result, dbStatus.getStatus());
- }
+ BSONObjBuilder& result) override {
+ uassertStatusOK(createShardDatabase(opCtx, dbName));
- auto scopedDb = std::move(dbStatus.getValue());
- return passthrough(opCtx, scopedDb.db(), cmdObj, result);
+ const auto dbInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName));
+ return passthrough(opCtx, dbName, dbInfo.primaryId(), cmdObj, result);
}
} createCmd;
@@ -546,23 +534,27 @@ public:
class RenameCollectionCmd : public PublicGridCommand {
public:
RenameCollectionCmd() : PublicGridCommand("renameCollection") {}
+
virtual Status checkAuthForCommand(Client* client,
const std::string& dbname,
const BSONObj& cmdObj) {
return rename_collection::checkAuthForRenameCollectionCommand(client, dbname, cmdObj);
}
+
virtual bool adminOnly() const {
return true;
}
+
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
return true;
}
+
bool run(OperationContext* opCtx,
const string& dbName,
BSONObj& cmdObj,
- int,
+ int options,
string& errmsg,
- BSONObjBuilder& result) {
+ BSONObjBuilder& result) override {
const auto fullNsFromElt = cmdObj.firstElement();
uassert(ErrorCodes::InvalidNamespace,
"'renameCollection' must be of type String",
@@ -571,10 +563,6 @@ public:
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "Invalid source namespace: " << fullnsFrom.ns(),
fullnsFrom.isValid());
- const string dbNameFrom = fullnsFrom.db().toString();
-
- auto confFrom =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbNameFrom));
const auto fullnsToElt = cmdObj["to"];
uassert(ErrorCodes::InvalidNamespace,
@@ -584,24 +572,22 @@ public:
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "Invalid target namespace: " << fullnsTo.ns(),
fullnsTo.isValid());
- const string dbNameTo = fullnsTo.db().toString();
- auto confTo =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbNameTo));
- uassert(
- 13138, "You can't rename a sharded collection", !confFrom->isSharded(fullnsFrom.ns()));
- uassert(
- 13139, "You can't rename to a sharded collection", !confTo->isSharded(fullnsTo.ns()));
+ const auto fromRoutingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, fullnsFrom));
+ uassert(13138, "You can't rename a sharded collection", !fromRoutingInfo.cm());
- auto shardTo = confTo->getPrimaryId();
- auto shardFrom = confFrom->getPrimaryId();
+ const auto toRoutingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, fullnsTo));
+ uassert(13139, "You can't rename to a sharded collection", !toRoutingInfo.cm());
uassert(13137,
"Source and destination collections must be on same shard",
- shardFrom == shardTo);
+ fromRoutingInfo.primaryId() == toRoutingInfo.primaryId());
- return adminPassthrough(opCtx, confFrom.get(), cmdObj, result);
+ return adminPassthrough(opCtx, fromRoutingInfo.primaryId(), cmdObj, result);
}
+
} renameCollectionCmd;
class CopyDBCmd : public PublicGridCommand {
@@ -637,14 +623,14 @@ public:
"Invalid todb argument",
NamespaceString::validDBName(todb, NamespaceString::DollarInDbNameBehavior::Allow));
- auto scopedToDb = uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, todb));
+ auto toDbInfo = uassertStatusOK(createShardDatabase(opCtx, todb));
uassert(ErrorCodes::IllegalOperation,
"Cannot copy to a sharded database",
- !scopedToDb.db()->isShardingEnabled());
+ !toDbInfo.shardingEnabled());
- const string fromhost = cmdObj.getStringField("fromhost");
+ const std::string fromhost = cmdObj.getStringField("fromhost");
if (!fromhost.empty()) {
- return adminPassthrough(opCtx, scopedToDb.db(), cmdObj, result);
+ return adminPassthrough(opCtx, toDbInfo.primaryId(), cmdObj, result);
}
const auto fromDbElt = cmdObj["fromdb"];
@@ -657,10 +643,10 @@ public:
"invalid fromdb argument",
NamespaceString::validDBName(fromdb, NamespaceString::DollarInDbNameBehavior::Allow));
- auto scopedFromDb = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, fromdb));
+ auto fromDbInfo = uassertStatusOK(createShardDatabase(opCtx, fromdb));
uassert(ErrorCodes::IllegalOperation,
"Cannot copy from a sharded database",
- !scopedFromDb.db()->isShardingEnabled());
+ !fromDbInfo.shardingEnabled());
BSONObjBuilder b;
BSONForEach(e, cmdObj) {
@@ -670,12 +656,12 @@ public:
}
{
- const auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(
- opCtx, scopedFromDb.db()->getPrimaryId()));
+ const auto shard = uassertStatusOK(
+ Grid::get(opCtx)->shardRegistry()->getShard(opCtx, fromDbInfo.primaryId()));
b.append("fromhost", shard->getConnString().toString());
}
- return adminPassthrough(opCtx, scopedToDb.db(), b.obj(), result);
+ return adminPassthrough(opCtx, toDbInfo.primaryId(), b.obj(), result);
}
} clusterCopyDBCmd;
@@ -683,38 +669,38 @@ public:
class CollectionStats : public PublicGridCommand {
public:
CollectionStats() : PublicGridCommand("collStats", "collstats") {}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
+
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) override {
ActionSet actions;
actions.addAction(ActionType::collStats);
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions));
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
bool run(OperationContext* opCtx,
const string& dbName,
BSONObj& cmdObj,
- int,
+ int options,
string& errmsg,
- BSONObjBuilder& result) {
+ BSONObjBuilder& result) override {
const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
- auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName));
- if (!conf->isSharded(nss.ns())) {
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ if (!routingInfo.cm()) {
result.appendBool("sharded", false);
- result.append("primary", conf->getPrimaryId().toString());
-
- return passthrough(opCtx, conf.get(), cmdObj, result);
+ result.append("primary", routingInfo.primaryId().toString());
+ return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result);
}
- result.appendBool("sharded", true);
+ const auto cm = routingInfo.cm();
- shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns());
- massert(12594, "how could chunk manager be null!", cm);
+ result.appendBool("sharded", true);
BSONObjBuilder shardStats;
map<string, long long> counts;
@@ -860,35 +846,38 @@ public:
class DataSizeCmd : public PublicGridCommand {
public:
DataSizeCmd() : PublicGridCommand("dataSize", "datasize") {}
- virtual string parseNs(const string& dbname, const BSONObj& cmdObj) const override {
+
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
return parseNsFullyQualified(dbname, cmdObj);
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
+
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) override {
ActionSet actions;
actions.addAction(ActionType::find);
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions));
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
+
bool run(OperationContext* opCtx,
const string& dbName,
BSONObj& cmdObj,
- int,
+ int options,
string& errmsg,
- BSONObjBuilder& result) {
- const string fullns = parseNs(dbName, cmdObj);
- const string nsDBName = nsToDatabase(fullns);
+ BSONObjBuilder& result) override {
+ const NamespaceString nss(parseNs(dbName, cmdObj));
- auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nsDBName));
- if (!conf->isSharded(fullns)) {
- return passthrough(opCtx, conf.get(), cmdObj, result);
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ if (!routingInfo.cm()) {
+ return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result);
}
- shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, fullns);
- massert(13407, "how could chunk manager be null!", cm);
+ const auto cm = routingInfo.cm();
BSONObj min = cmdObj.getObjectField("min");
BSONObj max = cmdObj.getObjectField("max");
@@ -919,13 +908,12 @@ public:
for (const ShardId& shardId : shardIds) {
const auto shardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId);
if (!shardStatus.isOK()) {
- invariant(shardStatus.getStatus() == ErrorCodes::ShardNotFound);
continue;
}
ScopedDbConnection conn(shardStatus.getValue()->getConnString());
BSONObj res;
- bool ok = conn->runCommand(conf->name(), cmdObj, res);
+ bool ok = conn->runCommand(dbName, cmdObj, res);
conn.done();
if (!ok) {
@@ -949,19 +937,20 @@ public:
class ConvertToCappedCmd : public NotAllowedOnShardedCollectionCmd {
public:
ConvertToCappedCmd() : NotAllowedOnShardedCollectionCmd("convertToCapped") {}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
+
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) override {
ActionSet actions;
actions.addAction(ActionType::convertToCapped);
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions));
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return true;
}
- virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const {
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
return parseNsCollectionRequired(dbname, cmdObj).ns();
}
@@ -970,23 +959,24 @@ public:
class GroupCmd : public NotAllowedOnShardedCollectionCmd {
public:
GroupCmd() : NotAllowedOnShardedCollectionCmd("group") {}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
+
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) override {
ActionSet actions;
actions.addAction(ActionType::find);
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions));
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
- virtual bool passOptions() const {
+ bool passOptions() const override {
return true;
}
- virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const {
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
const auto nsElt = cmdObj.firstElement().embeddedObjectUserCheck()["ns"];
uassert(ErrorCodes::InvalidNamespace,
"'ns' must be of type String",
@@ -1003,7 +993,7 @@ public:
const BSONObj& cmdObj,
ExplainCommon::Verbosity verbosity,
const rpc::ServerSelectionMetadata& serverSelectionMetadata,
- BSONObjBuilder* out) const {
+ BSONObjBuilder* out) const override {
// We will time how long it takes to run the commands on the shards.
Timer timer;
@@ -1019,36 +1009,17 @@ public:
const NamespaceString nss(parseNs(dbname, cmdObj));
- // Note that this implementation will not handle targeting retries and fails when the
- // sharding metadata is too stale
- auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nss.db());
- if (!status.isOK()) {
- return Status(status.getStatus().code(),
- str::stream() << "Passthrough command failed: " << command.toString()
- << " on ns "
- << nss.ns()
- << ". Caused by "
- << causedBy(status.getStatus()));
- }
-
- shared_ptr<DBConfig> conf = status.getValue();
- if (conf->isSharded(nss.ns())) {
- return Status(ErrorCodes::IllegalOperation,
- str::stream() << "Passthrough command failed: " << command.toString()
- << " on ns "
- << nss.ns()
- << ". Cannot run on sharded namespace.");
- }
-
- const auto primaryShardStatus =
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, conf->getPrimaryId());
- if (!primaryShardStatus.isOK()) {
- return primaryShardStatus.getStatus();
- }
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Passthrough command failed: " << command.toString() << " on ns "
+ << nss.ns()
+ << ". Cannot run on sharded namespace.",
+ !routingInfo.cm());
BSONObj shardResult;
try {
- ShardConnection conn(primaryShardStatus.getValue()->getConnString(), "");
+ ShardConnection conn(routingInfo.primary()->getConnString(), "");
// TODO: this can throw a stale config when mongos is not up-to-date -- fix.
if (!conn->runCommand(nss.db().toString(), command, shardResult, options)) {
@@ -1060,6 +1031,7 @@ public:
<< "; result: "
<< shardResult);
}
+
conn.done();
} catch (const DBException& ex) {
return ex.toStatus();
@@ -1067,9 +1039,9 @@ public:
// Fill out the command result.
Strategy::CommandResult cmdResult;
- cmdResult.shardTargetId = conf->getPrimaryId();
+ cmdResult.shardTargetId = routingInfo.primaryId();
cmdResult.result = shardResult;
- cmdResult.target = primaryShardStatus.getValue()->getConnString();
+ cmdResult.target = routingInfo.primary()->getConnString();
return ClusterExplain::buildExplainResult(
opCtx, {cmdResult}, ClusterExplain::kSingleShard, timer.millis(), out);
@@ -1080,15 +1052,18 @@ public:
class SplitVectorCmd : public NotAllowedOnShardedCollectionCmd {
public:
SplitVectorCmd() : NotAllowedOnShardedCollectionCmd("splitVector") {}
- virtual bool passOptions() const {
+
+ bool passOptions() const override {
return true;
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
- virtual Status checkAuthForCommand(Client* client,
- const std::string& dbname,
- const BSONObj& cmdObj) {
+
+ Status checkAuthForCommand(Client* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) override {
if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))),
ActionType::splitVector)) {
@@ -1096,43 +1071,49 @@ public:
}
return Status::OK();
}
- virtual bool run(OperationContext* opCtx,
- const string& dbName,
- BSONObj& cmdObj,
- int options,
- string& errmsg,
- BSONObjBuilder& result) {
- string x = parseNs(dbName, cmdObj);
- if (!str::startsWith(x, dbName)) {
- errmsg = str::stream() << "doing a splitVector across dbs isn't supported via mongos";
- return false;
- }
+
+ std::string parseNs(const string& dbname, const BSONObj& cmdObj) const override {
+ return parseNsFullyQualified(dbname, cmdObj);
+ }
+
+ bool run(OperationContext* opCtx,
+ const string& dbName,
+ BSONObj& cmdObj,
+ int options,
+ string& errmsg,
+ BSONObjBuilder& result) override {
+ const std::string ns = parseNs(dbName, cmdObj);
+ uassert(ErrorCodes::IllegalOperation,
+ "Performing splitVector across dbs isn't supported via mongos",
+ str::startsWith(ns, dbName));
+
return NotAllowedOnShardedCollectionCmd::run(
opCtx, dbName, cmdObj, options, errmsg, result);
}
- virtual std::string parseNs(const string& dbname, const BSONObj& cmdObj) const {
- return parseNsFullyQualified(dbname, cmdObj);
- }
} splitVectorCmd;
class DistinctCmd : public PublicGridCommand {
public:
DistinctCmd() : PublicGridCommand("distinct") {}
- virtual void help(stringstream& help) const {
+
+ void help(stringstream& help) const override {
help << "{ distinct : 'collection name' , key : 'a.b' , query : {} }";
}
- virtual bool passOptions() const {
+
+ bool passOptions() const override {
return true;
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
+
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) override {
ActionSet actions;
actions.addAction(ActionType::find);
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions));
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
@@ -1141,18 +1122,13 @@ public:
BSONObj& cmdObj,
int options,
string& errmsg,
- BSONObjBuilder& result) {
+ BSONObjBuilder& result) override {
const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
- auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName);
- if (!status.isOK()) {
- return appendEmptyResultSet(result, status.getStatus(), nss.ns());
- }
-
- shared_ptr<DBConfig> conf = status.getValue();
- if (!conf->isSharded(nss.ns())) {
-
- if (passthrough(opCtx, conf.get(), cmdObj, options, result)) {
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ if (!routingInfo.cm()) {
+ if (passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, options, result)) {
return true;
}
@@ -1192,10 +1168,9 @@ public:
return false;
}
- shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns());
- massert(10420, "how could chunk manager be null!", cm);
+ const auto cm = routingInfo.cm();
- BSONObj query = getQuery(cmdObj);
+ auto query = getQuery(cmdObj);
auto queryCollation = getCollation(cmdObj);
if (!queryCollation.isOK()) {
return appendEmptyResultSet(result, queryCollation.getStatus(), nss.ns());
@@ -1230,7 +1205,7 @@ public:
ShardConnection conn(shardStatus.getValue()->getConnString(), nss.ns());
BSONObj res;
- bool ok = conn->runCommand(conf->name(), cmdObj, res, options);
+ bool ok = conn->runCommand(nss.db().toString(), cmdObj, res, options);
conn.done();
if (!ok) {
@@ -1340,16 +1315,18 @@ public:
return ClusterExplain::buildExplainResult(
opCtx, shardResults, mongosStageName, millisElapsed, out);
}
+
} disinctCmd;
class FileMD5Cmd : public PublicGridCommand {
public:
FileMD5Cmd() : PublicGridCommand("filemd5") {}
- virtual void help(stringstream& help) const {
+
+ void help(stringstream& help) const override {
help << " example: { filemd5 : ObjectId(aaaaaaa) , root : \"fs\" }";
}
- virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const {
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
std::string collectionName;
if (const auto rootElt = cmdObj["root"]) {
uassert(ErrorCodes::InvalidNamespace,
@@ -1363,31 +1340,32 @@ public:
return NamespaceString(dbname, collectionName).ns();
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) override {
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), ActionType::find));
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
bool run(OperationContext* opCtx,
const string& dbName,
BSONObj& cmdObj,
- int,
+ int options,
string& errmsg,
- BSONObjBuilder& result) {
+ BSONObjBuilder& result) override {
const NamespaceString nss(parseNs(dbName, cmdObj));
- auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName));
- if (!conf->isSharded(nss.ns())) {
- return passthrough(opCtx, conf.get(), cmdObj, result);
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ if (!routingInfo.cm()) {
+ return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result);
}
- shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns());
- massert(13091, "how could chunk manager be null!", cm);
+ const auto cm = routingInfo.cm();
+
if (SimpleBSONObjComparator::kInstance.evaluate(cm->getShardKeyPattern().toBSON() ==
BSON("files_id" << 1))) {
BSONObj finder = BSON("files_id" << cmdObj.firstElement());
@@ -1461,13 +1439,15 @@ public:
errmsg =
string("sharded filemd5 failed because: ") + res["errmsg"].valuestrsafe();
+
return false;
}
- uassert(16246,
- "Shard " + conf->name() +
- " is too old to support GridFS sharded by {files_id:1, n:1}",
- res.hasField("md5state"));
+ uassert(
+ 16246,
+ str::stream() << "Shard for database " << nss.db()
+ << " is too old to support GridFS sharded by {files_id:1, n:1}",
+ res.hasField("md5state"));
lastResult = res;
int nNext = res["numChunks"].numberInt();
@@ -1497,20 +1477,24 @@ public:
class Geo2dFindNearCmd : public PublicGridCommand {
public:
Geo2dFindNearCmd() : PublicGridCommand("geoNear") {}
- void help(stringstream& h) const {
+
+ void help(stringstream& h) const override {
h << "http://dochub.mongodb.org/core/geo#GeospatialIndexing-geoNearCommand";
}
- virtual bool passOptions() const {
+
+ bool passOptions() const override {
return true;
}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
+
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) override {
ActionSet actions;
actions.addAction(ActionType::find);
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions));
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
@@ -1519,16 +1503,16 @@ public:
BSONObj& cmdObj,
int options,
string& errmsg,
- BSONObjBuilder& result) {
+ BSONObjBuilder& result) override {
const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
- auto conf = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName));
- if (!conf->isSharded(nss.ns())) {
- return passthrough(opCtx, conf.get(), cmdObj, options, result);
+ auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ if (!routingInfo.cm()) {
+ return passthrough(opCtx, dbName, routingInfo.primaryId(), cmdObj, result);
}
- shared_ptr<ChunkManager> cm = conf->getChunkManager(opCtx, nss.ns());
- massert(13500, "how could chunk manager be null!", cm);
+ const auto cm = routingInfo.cm();
BSONObj query = getQuery(cmdObj);
auto collation = getCollation(cmdObj);
@@ -1628,64 +1612,76 @@ public:
return true;
}
+
} geo2dFindNearCmd;
-class CompactCmd : public PublicGridCommand {
+class CompactCmd : public Command {
public:
- CompactCmd() : PublicGridCommand("compact") {}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
+ CompactCmd() : Command("compact") {}
+
+ bool slaveOk() const override {
+ return true;
+ }
+
+ bool adminOnly() const override {
+ return false;
+ }
+
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) override {
ActionSet actions;
actions.addAction(ActionType::compact);
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions));
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
- virtual bool run(OperationContext* opCtx,
- const string& dbName,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- errmsg = "compact not allowed through mongos";
- return false;
+
+ bool run(OperationContext* opCtx,
+ const string& dbName,
+ BSONObj& cmdObj,
+ int options,
+ string& errmsg,
+ BSONObjBuilder& result) override {
+ uasserted(ErrorCodes::CommandNotSupported, "compact not allowed through mongos");
}
+
} compactCmd;
class EvalCmd : public PublicGridCommand {
public:
EvalCmd() : PublicGridCommand("eval", "$eval") {}
+
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) {
// $eval can do pretty much anything, so require all privileges.
RoleGraph::generateUniversalPrivileges(out);
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
- virtual bool run(OperationContext* opCtx,
- const string& dbName,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
+
+ bool run(OperationContext* opCtx,
+ const string& dbName,
+ BSONObj& cmdObj,
+ int options,
+ string& errmsg,
+ BSONObjBuilder& result) override {
RARELY {
warning() << "the eval command is deprecated" << startupWarningsLog;
}
- // $eval isn't allowed to access sharded collections, but we need to leave the
- // shard to detect that.
- auto status = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName);
- if (!status.isOK()) {
- return appendCommandStatus(result, status.getStatus());
- }
-
- shared_ptr<DBConfig> conf = status.getValue();
- return passthrough(opCtx, conf.get(), cmdObj, result);
+ // $eval isn't allowed to access sharded collections, but we need to leave the shard to
+ // detect that
+ const auto dbInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName));
+ return passthrough(opCtx, dbName, dbInfo.primaryId(), cmdObj, result);
}
+
} evalCmd;
class CmdListCollections final : public PublicGridCommand {
@@ -1711,7 +1707,7 @@ public:
str::stream() << "Not authorized to create users on db: " << dbname);
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
@@ -1723,18 +1719,23 @@ public:
BSONObjBuilder& result) final {
auto nss = NamespaceString::makeListCollectionsNSS(dbName);
- auto conf = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName);
- if (!conf.isOK()) {
- return appendEmptyResultSet(result, conf.getStatus(), dbName + ".$cmd.listCollections");
+ auto dbInfoStatus = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName);
+ if (!dbInfoStatus.isOK()) {
+ return appendEmptyResultSet(result, dbInfoStatus.getStatus(), nss.ns());
}
- return cursorCommandPassthrough(opCtx, conf.getValue(), cmdObj, nss, options, &result);
+ const auto& dbInfo = dbInfoStatus.getValue();
+
+ return cursorCommandPassthrough(
+ opCtx, dbName, dbInfo.primaryId(), cmdObj, nss, options, &result);
}
+
} cmdListCollections;
class CmdListIndexes final : public PublicGridCommand {
public:
CmdListIndexes() : PublicGridCommand("listIndexes") {}
+
virtual Status checkAuthForCommand(Client* client,
const std::string& dbname,
const BSONObj& cmdObj) {
@@ -1757,7 +1758,7 @@ public:
<< ns.coll());
}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
@@ -1767,18 +1768,15 @@ public:
int options,
string& errmsg,
BSONObjBuilder& result) final {
- auto conf = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName);
- if (!conf.isOK()) {
- return appendCommandStatus(result, conf.getStatus());
- }
+ const NamespaceString nss(parseNsCollectionRequired(dbName, cmdObj));
+
+ const auto routingInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
- const NamespaceString targetNss(parseNsCollectionRequired(dbName, cmdObj));
- const NamespaceString commandNss =
- NamespaceString::makeListIndexesNSS(targetNss.db(), targetNss.coll());
- dassert(targetNss == commandNss.getTargetNSForListIndexes());
+ const auto commandNss = NamespaceString::makeListIndexesNSS(nss.db(), nss.coll());
return cursorCommandPassthrough(
- opCtx, conf.getValue(), cmdObj, commandNss, options, &result);
+ opCtx, nss.db(), routingInfo.primaryId(), cmdObj, commandNss, options, &result);
}
} cmdListIndexes;
diff --git a/src/mongo/s/commands/run_on_all_shards_cmd.cpp b/src/mongo/s/commands/run_on_all_shards_cmd.cpp
index b534bf0628a..881b7d654ab 100644
--- a/src/mongo/s/commands/run_on_all_shards_cmd.cpp
+++ b/src/mongo/s/commands/run_on_all_shards_cmd.cpp
@@ -36,12 +36,12 @@
#include <set>
#include "mongo/db/jsobj.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/grid.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -80,7 +80,7 @@ bool RunOnAllShardsCommand::run(OperationContext* opCtx,
LOG(1) << "RunOnAllShardsCommand db: " << dbName << " cmd:" << redact(cmdObj);
if (_implicitCreateDb) {
- uassertStatusOK(ScopedShardDatabase::getOrCreate(opCtx, dbName));
+ uassertStatusOK(createShardDatabase(opCtx, dbName));
}
std::vector<ShardId> shardIds;
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 0182a091ab7..647e6601dfa 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -54,8 +54,6 @@
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/rpc/metadata/tracking_metadata.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_manager.h"
-#include "mongo/s/chunk_version.h"
#include "mongo/s/client/parallel.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
@@ -365,11 +363,10 @@ void Strategy::clientCommandOp(OperationContext* opCtx,
ShardConnection::checkMyConnectionVersions(opCtx, staleNS);
if (loops < 4) {
- // This throws out the entire database cache entry in response to
- // StaleConfigException instead of just the collection which encountered it. There
- // is no good reason for it other than the lack of lower-granularity cache
- // invalidation.
- Grid::get(opCtx)->catalogCache()->invalidate(NamespaceString(staleNS).db());
+ const NamespaceString nss(staleNS);
+ if (nss.isValid()) {
+ Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss);
+ }
}
} catch (const DBException& e) {
OpQueryReplyBuilder reply;
diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp
deleted file mode 100644
index f5aec193923..00000000000
--- a/src/mongo/s/config.cpp
+++ /dev/null
@@ -1,366 +0,0 @@
-/**
- * Copyright (C) 2008-2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/config.h"
-
-#include <vector>
-
-#include "mongo/db/lasterror.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/query/collation/collator_factory_interface.h"
-#include "mongo/s/catalog/sharding_catalog_client.h"
-#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/catalog/type_collection.h"
-#include "mongo/s/catalog/type_database.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_manager.h"
-#include "mongo/s/chunk_version.h"
-#include "mongo/s/grid.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-
-struct CollectionInfo {
- // The config server opTime at which the chunk manager below was loaded
- const repl::OpTime configOpTime;
-
- // The chunk manager
- const std::shared_ptr<ChunkManager> cm;
-};
-
-DBConfig::DBConfig(const DatabaseType& dbt, repl::OpTime configOpTime)
- : _name(dbt.getName()),
- _shardingEnabled(dbt.getSharded()),
- _primaryId(dbt.getPrimary()),
- _configOpTime(std::move(configOpTime)) {}
-
-DBConfig::~DBConfig() = default;
-
-bool DBConfig::isSharded(const std::string& ns) {
- stdx::lock_guard<stdx::mutex> lk(_lock);
-
- return _collections.count(ns) > 0;
-}
-
-void DBConfig::markNSNotSharded(const std::string& ns) {
- stdx::lock_guard<stdx::mutex> lk(_lock);
-
- CollectionInfoMap::iterator it = _collections.find(ns);
- if (it != _collections.end()) {
- _collections.erase(it);
- }
-}
-
-std::shared_ptr<ChunkManager> DBConfig::getChunkManagerIfExists(OperationContext* opCtx,
- const std::string& ns,
- bool shouldReload,
- bool forceReload) {
- // Don't report exceptions here as errors in GetLastError
- LastError::Disabled ignoreForGLE(&LastError::get(cc()));
-
- try {
- return getChunkManager(opCtx, ns, shouldReload, forceReload);
- } catch (const DBException&) {
- return nullptr;
- }
-}
-
-std::shared_ptr<ChunkManager> DBConfig::getChunkManager(OperationContext* opCtx,
- const std::string& ns,
- bool shouldReload,
- bool forceReload) {
- ChunkVersion oldVersion;
- std::shared_ptr<ChunkManager> oldManager;
-
- {
- stdx::lock_guard<stdx::mutex> lk(_lock);
-
- auto it = _collections.find(ns);
-
- const bool earlyReload = (it == _collections.end()) && (shouldReload || forceReload);
- if (earlyReload) {
- // This is to catch cases where there this is a new sharded collection.
- // Note: read the _reloadCount inside the _lock mutex, so _loadIfNeeded will always
- // be forced to perform a reload.
- const auto currentReloadIteration = _reloadCount.load();
- _loadIfNeeded(opCtx, currentReloadIteration);
-
- it = _collections.find(ns);
- }
-
- uassert(ErrorCodes::NamespaceNotSharded,
- str::stream() << "Collection is not sharded: " << ns,
- it != _collections.end());
-
- const auto& ci = it->second;
-
- if (!(shouldReload || forceReload) || earlyReload) {
- return ci.cm;
- }
-
- if (ci.cm) {
- oldManager = ci.cm;
- oldVersion = ci.cm->getVersion();
- }
- }
-
- // TODO: We need to keep this first one-chunk check in until we have a more efficient way of
- // creating/reusing a chunk manager, as doing so requires copying the full set of chunks
- // currently
- std::vector<ChunkType> newestChunk;
- if (oldVersion.isSet() && !forceReload) {
- uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks(
- opCtx,
- BSON(ChunkType::ns(ns)),
- BSON(ChunkType::DEPRECATED_lastmod() << -1),
- 1,
- &newestChunk,
- nullptr,
- repl::ReadConcernLevel::kMajorityReadConcern));
-
- if (!newestChunk.empty()) {
- invariant(newestChunk.size() == 1);
- ChunkVersion v = newestChunk[0].getVersion();
- if (v.equals(oldVersion)) {
- stdx::lock_guard<stdx::mutex> lk(_lock);
-
- auto it = _collections.find(ns);
- uassert(15885,
- str::stream() << "not sharded after reloading from chunks : " << ns,
- it != _collections.end());
-
- const auto& ci = it->second;
- return ci.cm;
- }
- }
- } else if (!oldVersion.isSet()) {
- warning() << "version 0 found when " << (forceReload ? "reloading" : "checking")
- << " chunk manager; collection '" << ns << "' initially detected as sharded";
- }
-
- std::unique_ptr<ChunkManager> tempChunkManager;
-
- {
- stdx::lock_guard<stdx::mutex> lll(_hitConfigServerLock);
-
- if (!newestChunk.empty() && !forceReload) {
- // If we have a target we're going for see if we've hit already
- stdx::lock_guard<stdx::mutex> lk(_lock);
-
- auto it = _collections.find(ns);
-
- if (it != _collections.end()) {
- const auto& ci = it->second;
-
- ChunkVersion currentVersion = newestChunk[0].getVersion();
-
- // Only reload if the version we found is newer than our own in the same epoch
- if (currentVersion <= ci.cm->getVersion() &&
- ci.cm->getVersion().hasEqualEpoch(currentVersion)) {
- return ci.cm;
- }
- }
- }
-
- // Reload the chunk manager outside of the DBConfig's mutex so as to not block operations
- // for different collections on the same database
- tempChunkManager.reset(new ChunkManager(
- NamespaceString(oldManager->getns()),
- oldManager->getVersion().epoch(),
- oldManager->getShardKeyPattern(),
- oldManager->getDefaultCollator() ? oldManager->getDefaultCollator()->clone() : nullptr,
- oldManager->isUnique()));
- tempChunkManager->loadExistingRanges(opCtx, oldManager.get());
-
- if (!tempChunkManager->numChunks()) {
- // Maybe we're not sharded any more, so do a full reload
- const auto currentReloadIteration = _reloadCount.load();
-
- const bool successful = [&]() {
- stdx::lock_guard<stdx::mutex> lk(_lock);
- return _loadIfNeeded(opCtx, currentReloadIteration);
- }();
-
- // If we aren't successful loading the database entry, we don't want to keep the stale
- // object around which has invalid data.
- if (!successful) {
- Grid::get(opCtx)->catalogCache()->invalidate(_name);
- }
-
- return getChunkManager(opCtx, ns);
- }
- }
-
- stdx::lock_guard<stdx::mutex> lk(_lock);
-
- auto it = _collections.find(ns);
- uassert(14822,
- str::stream() << "Collection " << ns << " became unsharded in the middle.",
- it != _collections.end());
-
- const auto& ci = it->second;
-
- // Reset if our versions aren't the same
- bool shouldReset = !tempChunkManager->getVersion().equals(ci.cm->getVersion());
-
- // Also reset if we're forced to do so
- if (!shouldReset && forceReload) {
- shouldReset = true;
- warning() << "chunk manager reload forced for collection '" << ns << "', config version is "
- << tempChunkManager->getVersion();
- }
-
- //
- // LEGACY BEHAVIOR
- //
- // It's possible to get into a state when dropping collections when our new version is
- // less than our prev version. Behave identically to legacy mongos, for now, and warn to
- // draw attention to the problem.
- //
- // TODO: Assert in next version, to allow smooth upgrades
- //
-
- if (shouldReset && tempChunkManager->getVersion() < ci.cm->getVersion()) {
- shouldReset = false;
-
- warning() << "not resetting chunk manager for collection '" << ns << "', config version is "
- << tempChunkManager->getVersion() << " and "
- << "old version is " << ci.cm->getVersion();
- }
-
- // end legacy behavior
-
- if (shouldReset) {
- const auto cmOpTime = tempChunkManager->getConfigOpTime();
-
- // The existing ChunkManager could have been updated since we last checked, so replace the
- // existing chunk manager only if it is strictly newer.
- if (cmOpTime > ci.cm->getConfigOpTime()) {
- _collections.erase(ns);
- auto emplacedEntryIt =
- _collections.emplace(ns, CollectionInfo{cmOpTime, std::move(tempChunkManager)})
- .first;
- return emplacedEntryIt->second.cm;
- }
- }
-
- return ci.cm;
-}
-
-bool DBConfig::load(OperationContext* opCtx) {
- const auto currentReloadIteration = _reloadCount.load();
- stdx::lock_guard<stdx::mutex> lk(_lock);
- return _loadIfNeeded(opCtx, currentReloadIteration);
-}
-
-bool DBConfig::_loadIfNeeded(OperationContext* opCtx, Counter reloadIteration) {
- if (reloadIteration != _reloadCount.load()) {
- return true;
- }
-
- const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx);
-
- auto status = catalogClient->getDatabase(opCtx, _name);
- if (status == ErrorCodes::NamespaceNotFound) {
- return false;
- }
-
- // All other errors are connectivity, etc so throw an exception.
- uassertStatusOK(status.getStatus());
-
- const auto& dbOpTimePair = status.getValue();
- const auto& dbt = dbOpTimePair.value;
- invariant(_name == dbt.getName());
- _primaryId = dbt.getPrimary();
-
- invariant(dbOpTimePair.opTime >= _configOpTime);
- _configOpTime = dbOpTimePair.opTime;
-
- // Load all collections
- std::vector<CollectionType> collections;
- repl::OpTime configOpTimeWhenLoadingColl;
- uassertStatusOK(
- catalogClient->getCollections(opCtx, &_name, &collections, &configOpTimeWhenLoadingColl));
-
- invariant(configOpTimeWhenLoadingColl >= _configOpTime);
-
- for (const auto& coll : collections) {
- auto collIter = _collections.find(coll.getNs().ns());
- if (collIter != _collections.end()) {
- invariant(configOpTimeWhenLoadingColl >= collIter->second.configOpTime);
- }
-
- _collections.erase(coll.getNs().ns());
-
- if (!coll.getDropped()) {
- std::unique_ptr<CollatorInterface> defaultCollator;
- if (!coll.getDefaultCollation().isEmpty()) {
- auto statusWithCollator = CollatorFactoryInterface::get(opCtx->getServiceContext())
- ->makeFromBSON(coll.getDefaultCollation());
-
- // The collation was validated upon collection creation.
- invariantOK(statusWithCollator.getStatus());
-
- defaultCollator = std::move(statusWithCollator.getValue());
- }
-
- std::unique_ptr<ChunkManager> manager(
- stdx::make_unique<ChunkManager>(coll.getNs(),
- coll.getEpoch(),
- ShardKeyPattern(coll.getKeyPattern()),
- std::move(defaultCollator),
- coll.getUnique()));
-
- // Do the blocking collection load
- manager->loadExistingRanges(opCtx, nullptr);
-
- // Collections with no chunks are unsharded, no matter what the collections entry says
- if (manager->numChunks()) {
- _collections.emplace(
- coll.getNs().ns(),
- CollectionInfo{configOpTimeWhenLoadingColl, std::move(manager)});
- }
- }
- }
-
- _reloadCount.fetchAndAdd(1);
-
- return true;
-}
-
-ShardId DBConfig::getPrimaryId() {
- stdx::lock_guard<stdx::mutex> lk(_lock);
- return _primaryId;
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h
deleted file mode 100644
index bbd63cf3b3b..00000000000
--- a/src/mongo/s/config.h
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Copyright (C) 2008-2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <string>
-
-#include "mongo/db/repl/optime.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/s/shard_id.h"
-#include "mongo/stdx/mutex.h"
-
-namespace mongo {
-
-class ChunkManager;
-struct CollectionInfo;
-class DatabaseType;
-class OperationContext;
-
-/**
- * Represents the cache entry for a database.
- */
-class DBConfig {
-public:
- DBConfig(const DatabaseType& dbt, repl::OpTime configOpTime);
- ~DBConfig();
-
- /**
- * The name of the database which this entry caches.
- */
- const std::string& name() const {
- return _name;
- }
-
- ShardId getPrimaryId();
-
- /**
- * Returns whether 'enableSharding' has been called for this database.
- */
- bool isShardingEnabled() const {
- return _shardingEnabled;
- }
-
- /**
- * Removes the specified namespace from the set of collections under this database entry so that
- * from then onwards it will be treated as unsharded.
- *
- * Note that this method doesn't do any writes to the config metadata, but simply drops the
- * specified namespace from the cache.
- */
- void markNSNotSharded(const std::string& ns);
-
- /**
- * @return whether or not the 'ns' collection is partitioned
- */
- bool isSharded(const std::string& ns);
-
- std::shared_ptr<ChunkManager> getChunkManager(OperationContext* opCtx,
- const std::string& ns,
- bool reload = false,
- bool forceReload = false);
- std::shared_ptr<ChunkManager> getChunkManagerIfExists(OperationContext* opCtx,
- const std::string& ns,
- bool reload = false,
- bool forceReload = false);
-
- /**
- * Returns true if it is successful at loading the DBConfig, false if the database is not found,
- * and throws on all other errors.
- */
- bool load(OperationContext* opCtx);
-
-protected:
- typedef std::map<std::string, CollectionInfo> CollectionInfoMap;
- typedef AtomicUInt64::WordType Counter;
-
- /**
- * Returns true if it is successful at loading the DBConfig, false if the database is not found,
- * and throws on all other errors.
- * Also returns true without reloading if reloadIteration is not equal to the _reloadCount.
- * This is to avoid multiple threads attempting to reload do duplicate work.
- */
- bool _loadIfNeeded(OperationContext* opCtx, Counter reloadIteration);
-
- // All member variables are labeled with one of the following codes indicating the
- // synchronization rules for accessing them.
- //
- // (L) Must hold _lock for access.
- // (S) Self synchronizing, no explicit locking needed.
- //
- // Mutex lock order:
- // _hitConfigServerLock -> _lock
- //
-
- // Name of the database which this entry caches
- const std::string _name;
-
- // Whether sharding is enabled for this database
- const bool _shardingEnabled;
-
- // Primary shard id
- ShardId _primaryId; // (L)
-
- // Set of collections and lock to protect access
- stdx::mutex _lock;
- CollectionInfoMap _collections; // (L)
-
- // OpTime of config server when the database definition was loaded.
- repl::OpTime _configOpTime; // (L)
-
- // Ensures that only one thread at a time loads collection configuration data from
- // the config server
- stdx::mutex _hitConfigServerLock;
-
- // Increments every time this performs a full reload. Since a full reload can take a very
- // long time for very large clusters, this can be used to minimize duplicate work when multiple
- // threads tries to perform full rerload at roughly the same time.
- AtomicUInt64 _reloadCount; // (S)
-};
-
-} // namespace mongo
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index d944954635a..3d4c384c506 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -52,7 +52,6 @@
#include "mongo/s/query/cluster_client_cursor_impl.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/store_possible_cursor.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/s/stale_exception.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
@@ -319,31 +318,34 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* opCtx,
<< query.getQueryRequest().getProj()};
}
+ auto const catalogCache = Grid::get(opCtx)->catalogCache();
+
// Re-target and re-send the initial find command to the shards until we have established the
// shard version.
for (size_t retries = 1; retries <= kMaxStaleConfigRetries; ++retries) {
- auto scopedCMStatus = ScopedChunkManager::get(opCtx, query.nss());
- if (scopedCMStatus == ErrorCodes::NamespaceNotFound) {
+ auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, query.nss());
+ if (routingInfoStatus == ErrorCodes::NamespaceNotFound) {
// If the database doesn't exist, we successfully return an empty result set without
// creating a cursor.
return CursorId(0);
- } else if (!scopedCMStatus.isOK()) {
- return scopedCMStatus.getStatus();
+ } else if (!routingInfoStatus.isOK()) {
+ return routingInfoStatus.getStatus();
}
- const auto& scopedCM = scopedCMStatus.getValue();
+ auto& routingInfo = routingInfoStatus.getValue();
auto cursorId = runQueryWithoutRetrying(opCtx,
query,
readPref,
- scopedCM.cm().get(),
- scopedCM.primary(),
+ routingInfo.cm().get(),
+ routingInfo.primary(),
results,
viewDefinition);
if (cursorId.isOK()) {
return cursorId;
}
- auto status = std::move(cursorId.getStatus());
+
+ const auto& status = cursorId.getStatus();
if (!ErrorCodes::isStaleShardingError(status.code()) &&
status != ErrorCodes::ShardNotFound) {
@@ -357,11 +359,7 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* opCtx,
<< " on attempt " << retries << " of " << kMaxStaleConfigRetries << ": "
<< redact(status);
- if (status == ErrorCodes::StaleEpoch) {
- Grid::get(opCtx)->catalogCache()->invalidate(query.nss().db().toString());
- } else {
- scopedCM.db()->getChunkManagerIfExists(opCtx, query.nss().ns(), true);
- }
+ catalogCache->onStaleConfigError(std::move(routingInfo));
}
return {ErrorCodes::StaleShardVersion,
diff --git a/src/mongo/s/sharding_raii.cpp b/src/mongo/s/sharding_raii.cpp
deleted file mode 100644
index b90f975ed35..00000000000
--- a/src/mongo/s/sharding_raii.cpp
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/sharding_raii.h"
-
-#include "mongo/base/status_with.h"
-#include "mongo/s/catalog/sharding_catalog_client.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_manager.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/grid.h"
-
-namespace mongo {
-
-using std::shared_ptr;
-
-ScopedShardDatabase::ScopedShardDatabase(std::shared_ptr<DBConfig> db) : _db(db) {
- invariant(_db);
-}
-
-ScopedShardDatabase::~ScopedShardDatabase() = default;
-
-StatusWith<ScopedShardDatabase> ScopedShardDatabase::getExisting(OperationContext* opCtx,
- StringData dbName) {
- auto dbStatus = Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName.toString());
- if (!dbStatus.isOK()) {
- return {dbStatus.getStatus().code(),
- str::stream() << "Database " << dbName << " was not found due to "
- << dbStatus.getStatus().toString()};
- }
-
- return {ScopedShardDatabase(std::move(dbStatus.getValue()))};
-}
-
-StatusWith<ScopedShardDatabase> ScopedShardDatabase::getOrCreate(OperationContext* opCtx,
- StringData dbName) {
- auto dbStatus = getExisting(opCtx, dbName);
- if (dbStatus.isOK()) {
- return dbStatus;
- }
-
- if (dbStatus == ErrorCodes::NamespaceNotFound) {
- auto statusCreateDb =
- Grid::get(opCtx)->catalogClient(opCtx)->createDatabase(opCtx, dbName.toString());
- if (statusCreateDb.isOK() || statusCreateDb == ErrorCodes::NamespaceExists) {
- return getExisting(opCtx, dbName);
- }
-
- return statusCreateDb;
- }
-
- return dbStatus.getStatus();
-}
-
-ScopedChunkManager::ScopedChunkManager(ScopedShardDatabase db, std::shared_ptr<ChunkManager> cm)
- : _db(std::move(db)), _cm(std::move(cm)) {}
-
-ScopedChunkManager::ScopedChunkManager(ScopedShardDatabase db, std::shared_ptr<Shard> primary)
- : _db(std::move(db)), _primary(std::move(primary)) {}
-
-ScopedChunkManager::~ScopedChunkManager() = default;
-
-StatusWith<ScopedChunkManager> ScopedChunkManager::get(OperationContext* opCtx,
- const NamespaceString& nss) {
- auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, nss.db());
- if (!scopedDbStatus.isOK()) {
- return scopedDbStatus.getStatus();
- }
-
- auto scopedDb = std::move(scopedDbStatus.getValue());
-
- auto cm = scopedDb.db()->getChunkManagerIfExists(opCtx, nss.ns());
- if (cm) {
- return {ScopedChunkManager(std::move(scopedDb), std::move(cm))};
- }
-
- auto shardStatus =
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, scopedDb.db()->getPrimaryId());
- if (!shardStatus.isOK()) {
- return {ErrorCodes::fromInt(40371),
- str::stream() << "The primary shard for collection " << nss.ns()
- << " could not be loaded due to error "
- << shardStatus.getStatus().toString()};
- }
-
- return {ScopedChunkManager(std::move(scopedDb), std::move(shardStatus.getValue()))};
-}
-
-StatusWith<ScopedChunkManager> ScopedChunkManager::getOrCreate(OperationContext* opCtx,
- const NamespaceString& nss) {
- auto scopedDbStatus = ScopedShardDatabase::getOrCreate(opCtx, nss.db());
- if (!scopedDbStatus.isOK()) {
- return scopedDbStatus.getStatus();
- }
-
- return ScopedChunkManager::get(opCtx, nss);
-}
-
-StatusWith<ScopedChunkManager> ScopedChunkManager::refreshAndGet(OperationContext* opCtx,
- const NamespaceString& nss) {
- auto scopedDbStatus = ScopedShardDatabase::getExisting(opCtx, nss.db());
- if (!scopedDbStatus.isOK()) {
- return scopedDbStatus.getStatus();
- }
-
- auto scopedDb = std::move(scopedDbStatus.getValue());
-
- try {
- std::shared_ptr<ChunkManager> cm =
- scopedDb.db()->getChunkManager(opCtx, nss.ns(), true, false);
-
- if (!cm) {
- return {ErrorCodes::NamespaceNotSharded,
- str::stream() << "Collection " << nss.ns()
- << " does not exist or is not sharded."};
- }
-
- if (cm->getChunkMap().empty()) {
- return {ErrorCodes::NamespaceNotSharded,
- str::stream() << "Collection " << nss.ns()
- << " is marked as sharded, but does not have any chunks. This "
- "most likely indicates a corrupted metadata or "
- "partially completed 'shardCollection' command."};
- }
-
- return {ScopedChunkManager(std::move(scopedDb), std::move(cm))};
- } catch (const AssertionException& e) {
- return e.toStatus();
- }
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/sharding_raii.h b/src/mongo/s/sharding_raii.h
deleted file mode 100644
index 0c54f281985..00000000000
--- a/src/mongo/s/sharding_raii.h
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/s/chunk_manager.h"
-#include "mongo/s/config.h"
-
-namespace mongo {
-
-class OperationContext;
-
-class ScopedShardDatabase {
- MONGO_DISALLOW_COPYING(ScopedShardDatabase);
-
-public:
- ScopedShardDatabase(ScopedShardDatabase&&) = default;
- ~ScopedShardDatabase();
-
- /**
- * Ensures that the specified database exists in the cache and if it does, returns it.
- * Otherwise, either returns NamespaceNotFound if the database does not exist, or any other
- * error code indicating why the database could not be loaded.
- */
- static StatusWith<ScopedShardDatabase> getExisting(OperationContext* opCtx, StringData dbName);
-
- /**
- * If the specified database exists already, loads it in the cache (if not already there) and
- * returns it. Otherwise, if it does not exis, this call will implicitly create it as
- * non-sharded.
- */
- static StatusWith<ScopedShardDatabase> getOrCreate(OperationContext* opCtx, StringData dbName);
-
- /**
- * Returns the underlying database cache entry.
- */
- DBConfig* db() const {
- return _db.get();
- }
-
- /**
- * This method is here only for compatibility with the legacy M/R code, which requires a shared
- * reference to the underlying database. It should not be used in new code.
- */
- std::shared_ptr<DBConfig> getSharedDbReference() const {
- return _db;
- }
-
-private:
- explicit ScopedShardDatabase(std::shared_ptr<DBConfig> db);
-
- // Reference to the corresponding database. Never null.
- std::shared_ptr<DBConfig> _db;
-};
-
-class ScopedChunkManager {
- MONGO_DISALLOW_COPYING(ScopedChunkManager);
-
-public:
- ScopedChunkManager(ScopedChunkManager&&) = default;
- ~ScopedChunkManager();
-
- /**
- * If the specified namespace is sharded, returns a ScopedChunkManager initialized with that
- * collection's routing information. If it is not, the object returned is initialized with the
- * database primary node on which the unsharded collection must reside.
- *
- * Returns NamespaceNotFound if the database does not exist, or any other error indicating
- * problem communicating with the config server.
- */
- static StatusWith<ScopedChunkManager> get(OperationContext* opCtx, const NamespaceString& nss);
-
- /**
- * If the database holding the specified namespace does not exist, creates it and then behaves
- * like the 'get' method above.
- */
- static StatusWith<ScopedChunkManager> getOrCreate(OperationContext* opCtx,
- const NamespaceString& nss);
-
- /**
- * If the specified database and collection do not exist in the cache, tries to load them from
- * the config server and returns a reference. If they are already in the cache, makes a call to
- * the config server to check if there are any incremental updates to the collection chunk
- * metadata and if so incorporates those. Otherwise, if it does not exist or any other error
- * occurs, passes that error back.
- */
- static StatusWith<ScopedChunkManager> refreshAndGet(OperationContext* opCtx,
- const NamespaceString& nss);
-
- /**
- * Returns the underlying database for which we hold reference.
- */
- DBConfig* db() const {
- return _db.db();
- }
-
- /**
- * If the collection is sharded, returns a chunk manager for it. Otherwise, nullptr.
- */
- std::shared_ptr<ChunkManager> cm() const {
- return _cm;
- }
-
- /**
- * If the collection is not sharded, returns its primary shard. Otherwise, nullptr.
- */
- std::shared_ptr<Shard> primary() const {
- return _primary;
- }
-
-private:
- ScopedChunkManager(ScopedShardDatabase db, std::shared_ptr<ChunkManager> cm);
-
- ScopedChunkManager(ScopedShardDatabase db, std::shared_ptr<Shard> primary);
-
- // Scoped reference to the owning database.
- ScopedShardDatabase _db;
-
- // Reference to the corresponding chunk manager (if sharded) or null
- std::shared_ptr<ChunkManager> _cm;
-
- // Reference to the primary of the database (if not sharded) or null
- std::shared_ptr<Shard> _primary;
-};
-
-} // namespace mongo