diff options
-rw-r--r-- | src/mongo/db/s/SConscript | 15 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_loader.cpp | 179 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_loader.h | 52 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_loader_test.cpp | 90 | ||||
-rw-r--r-- | src/mongo/db/s/shard_metadata_util.cpp | 356 | ||||
-rw-r--r-- | src/mongo/db/s/shard_metadata_util.h | 114 | ||||
-rw-r--r-- | src/mongo/db/s/shard_metadata_util_test.cpp | 314 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_collection.h | 19 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_shard_collection.cpp | 160 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_shard_collection.h | 147 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_shard_collection_test.cpp | 133 |
13 files changed, 1398 insertions, 192 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 34cb36d2a65..b47c1383029 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -11,13 +11,23 @@ env.Library( 'metadata_loader.cpp', ], LIBDEPS=[ + 'shard_metadata_util', '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/client/clientdriver', - '$BUILD_DIR/mongo/db/dbdirectclient', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/range_arithmetic', '$BUILD_DIR/mongo/db/repl/repl_coordinator_impl', '$BUILD_DIR/mongo/db/service_context', + ], +) + +env.Library( + target='shard_metadata_util', + source=[ + 'shard_metadata_util.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/client/clientdriver', + '$BUILD_DIR/mongo/db/dbdirectclient', '$BUILD_DIR/mongo/s/common', ], ) @@ -207,6 +217,7 @@ env.CppUnitTest( source=[ 'collection_metadata_test.cpp', 'metadata_loader_test.cpp', + 'shard_metadata_util_test.cpp', ], LIBDEPS=[ 'metadata', diff --git a/src/mongo/db/s/metadata_loader.cpp b/src/mongo/db/s/metadata_loader.cpp index 8385ea5c3d6..bdb68ae0810 100644 --- a/src/mongo/db/s/metadata_loader.cpp +++ b/src/mongo/db/s/metadata_loader.cpp @@ -38,11 +38,13 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/shard_metadata_util.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/unique_message.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_shard_collection.h" #include "mongo/s/chunk_diff.h" #include "mongo/s/chunk_version.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -102,38 +104,40 @@ Status MetadataLoader::makeCollectionMetadata(OperationContext* opCtx, const string& shard, const CollectionMetadata* oldMetadata, CollectionMetadata* metadata) { - Status initCollectionStatus = _initCollection(opCtx, catalogClient, ns, shard, metadata); + invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); + + bool isShardPrimary = false; + if (repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase_UNSAFE(opCtx, ns)) { + isShardPrimary = true; + } + + Status initCollectionStatus = + _initCollection(opCtx, catalogClient, ns, metadata, isShardPrimary); if (!initCollectionStatus.isOK()) { return initCollectionStatus; } - return _initChunks(opCtx, catalogClient, ns, shard, oldMetadata, metadata); + return _initChunks(opCtx, catalogClient, ns, shard, oldMetadata, metadata, isShardPrimary); } + Status MetadataLoader::_initCollection(OperationContext* opCtx, ShardingCatalogClient* catalogClient, const string& ns, - const string& shard, - CollectionMetadata* metadata) { - // Get the config.collections entry for 'ns'. - auto coll = catalogClient->getCollection(opCtx, ns); - if (!coll.isOK()) { - return coll.getStatus(); + CollectionMetadata* metadata, + bool isShardPrimary) { + StatusWith<std::pair<BSONObj, OID>> statusWithShardKeyAndEpoch = + shardmetadatautil::getCollectionShardKeyAndEpoch( + opCtx, catalogClient, NamespaceString(ns), isShardPrimary); + if (!statusWithShardKeyAndEpoch.isOK()) { + return statusWithShardKeyAndEpoch.getStatus(); } + std::pair<BSONObj, OID> shardKeyAndEpoch = statusWithShardKeyAndEpoch.getValue(); - // Check that the collection hasn't been dropped: passing this check does not mean the - // collection hasn't been dropped and recreated. - const auto& collInfo = coll.getValue().value; - if (collInfo.getDropped()) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "Could not load metadata because collection " << ns - << " was dropped"}; - } - - metadata->_keyPattern = collInfo.getKeyPattern().toBSON(); + metadata->_keyPattern = shardKeyAndEpoch.first; metadata->fillKeyPatternFields(); - metadata->_shardVersion = ChunkVersion(0, 0, collInfo.getEpoch()); - metadata->_collVersion = ChunkVersion(0, 0, collInfo.getEpoch()); + metadata->_shardVersion = ChunkVersion(0, 0, shardKeyAndEpoch.second); + metadata->_collVersion = ChunkVersion(0, 0, shardKeyAndEpoch.second); return Status::OK(); } @@ -143,7 +147,8 @@ Status MetadataLoader::_initChunks(OperationContext* opCtx, const string& ns, const string& shard, const CollectionMetadata* oldMetadata, - CollectionMetadata* metadata) { + CollectionMetadata* metadata, + bool isShardPrimary) { const OID epoch = metadata->getCollVersion().epoch(); SCMConfigDiffTracker::MaxChunkVersionMap versionMap; @@ -193,16 +198,16 @@ Status MetadataLoader::_initChunks(OperationContext* opCtx, &chunks, nullptr, repl::ReadConcernLevel::kMajorityReadConcern); - if (!status.isOK()) { return status; } - // If we are the primary, or a standalone, persist new chunks locally. - status = _writeNewChunksIfPrimary( - opCtx, NamespaceString(ns), chunks, metadata->_collVersion.epoch()); - if (!status.isOK()) { - return status; + if (isShardPrimary) { + status = shardmetadatautil::writeNewChunks( + opCtx, NamespaceString(ns), chunks, metadata->_collVersion.epoch()); + if (!status.isOK()) { + return status; + } } // @@ -253,124 +258,4 @@ Status MetadataLoader::_initChunks(OperationContext* opCtx, } } -Status MetadataLoader::_writeNewChunksIfPrimary(OperationContext* opCtx, - const NamespaceString& nss, - const std::vector<ChunkType>& chunks, - const OID& currEpoch) { - NamespaceString chunkMetadataNss(ChunkType::ConfigNS + "." + nss.ns()); - - // Only do the write(s) if this is a primary or standalone. Otherwise, return OK. - if (serverGlobalParams.clusterRole != ClusterRole::ShardServer || - !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase_UNSAFE( - opCtx, chunkMetadataNss.ns())) { - return Status::OK(); - } - - try { - DBDirectClient client(opCtx); - - /** - * Here are examples of the operations that can happen on the config server to update - * the config.chunks collection. 'chunks' only includes the chunks that result from the - * operations, which can be read from the config server, not any that were removed, so - * we must delete any chunks that overlap with the new 'chunks'. - * - * CollectionVersion = 10.3 - * - * moveChunk - * {_id: 3, max: 5, version: 10.1} --> {_id: 3, max: 5, version: 11.0} - * - * splitChunk - * {_id: 3, max: 9, version 10.3} --> {_id: 3, max: 5, version 10.4} - * {_id: 5, max: 8, version 10.5} - * {_id: 8, max: 9, version 10.6} - * - * mergeChunk - * {_id: 10, max: 14, version 4.3} --> {_id: 10, max: 22, version 10.4} - * {_id: 14, max: 19, version 7.1} - * {_id: 19, max: 22, version 2.0} - * - */ - for (auto& chunk : chunks) { - // Check for a different epoch. - if (!chunk.getVersion().hasEqualEpoch(currEpoch)) { - // This means the collection was dropped and recreated. Drop the chunk metadata - // and return. - rpc::UniqueReply commandResponse = - client.runCommandWithMetadata(chunkMetadataNss.db().toString(), - "drop", - rpc::makeEmptyMetadata(), - BSON("drop" << chunkMetadataNss.coll())); - Status status = getStatusFromCommandResult(commandResponse->getCommandReply()); - - // A NamespaceNotFound error is okay because it's possible that we find a new epoch - // twice in a row before ever inserting documents. - if (!status.isOK() && status.code() != ErrorCodes::NamespaceNotFound) { - return status; - } - - return Status{ErrorCodes::RemoteChangeDetected, - str::stream() << "Invalid chunks found when reloading '" - << nss.toString() - << "'. Previous collection epoch was '" - << currEpoch.toString() - << "', but unexpectedly found a new epoch '" - << chunk.getVersion().epoch().toString() - << "'. Collection was dropped and recreated."}; - } - - // Delete any overlapping chunk ranges. Overlapping chunks will have a min value - // ("_id") between (chunk.min, chunk.max]. - // - // query: { "_id" : {"$gte": chunk.min, "$lt": chunk.max}} - auto deleteDocs(stdx::make_unique<BatchedDeleteDocument>()); - deleteDocs->setQuery(BSON(ChunkType::minShardID << BSON( - "$gte" << chunk.getMin() << "$lt" << chunk.getMax()))); - deleteDocs->setLimit(0); - - auto deleteRequest(stdx::make_unique<BatchedDeleteRequest>()); - deleteRequest->addToDeletes(deleteDocs.release()); - - BatchedCommandRequest batchedDeleteRequest(deleteRequest.release()); - batchedDeleteRequest.setNS(chunkMetadataNss); - const BSONObj deleteCmdObj = batchedDeleteRequest.toBSON(); - - rpc::UniqueReply deleteCommandResponse = - client.runCommandWithMetadata(chunkMetadataNss.db().toString(), - deleteCmdObj.firstElementFieldName(), - rpc::makeEmptyMetadata(), - deleteCmdObj); - auto deleteStatus = - getStatusFromCommandResult(deleteCommandResponse->getCommandReply()); - - if (!deleteStatus.isOK()) { - return deleteStatus; - } - - // Now the document can be expected to cleanly insert without overlap. - auto insert(stdx::make_unique<BatchedInsertRequest>()); - insert->addToDocuments(chunk.toShardBSON()); - - BatchedCommandRequest insertRequest(insert.release()); - insertRequest.setNS(chunkMetadataNss); - const BSONObj insertCmdObj = insertRequest.toBSON(); - - rpc::UniqueReply commandResponse = - client.runCommandWithMetadata(chunkMetadataNss.db().toString(), - insertCmdObj.firstElementFieldName(), - rpc::makeEmptyMetadata(), - insertCmdObj); - auto insertStatus = getStatusFromCommandResult(commandResponse->getCommandReply()); - - if (!insertStatus.isOK()) { - return insertStatus; - } - } - - return Status::OK(); - } catch (const DBException& ex) { - return ex.toStatus(); - } -} - } // namespace mongo diff --git a/src/mongo/db/s/metadata_loader.h b/src/mongo/db/s/metadata_loader.h index cfeb6348cfb..6ce6d901223 100644 --- a/src/mongo/db/s/metadata_loader.h +++ b/src/mongo/db/s/metadata_loader.h @@ -35,13 +35,12 @@ namespace mongo { -class ShardingCatalogClient; -class ChunkType; class CollectionMetadata; class CollectionType; class NamespaceString; class OID; class OperationContext; +class ShardingCatalogClient; /** * The MetadataLoader is responsible for interfacing with the config servers and previous @@ -67,10 +66,11 @@ class OperationContext; class MetadataLoader { public: /** - * Fills a new metadata instance representing the chunkset of the collection 'ns' - * (or its entirety, if not sharded) that lives on 'shard' with data from the config server. - * Optionally, uses an 'oldMetadata' for the same 'ns'/'shard'; the contents of - * 'oldMetadata' can help reducing the amount of data read from the config servers. + * Fills a new metadata instance representing the chunkset of the collection 'ns' (or its + * entirety, if not sharded) that lives on 'shard' with data either from the config server if + * primary or a shard persisted copy if secondary. Optionally, uses an 'oldMetadata' for the + * same 'ns'/'shard'; the contents of 'oldMetadata' can help reduce the amount of data read from + * the config servers. * * Locking note: * + Must not be called in a DBLock, since this loads over the network @@ -95,7 +95,8 @@ public: private: /** * Returns OK and fills in the internal state of 'metadata' with general collection - * information, not including chunks. + * information, not including chunks. Gets the collection information from the config server if + * 'isShardPrimary' is true, else from a shard persisted copy. * * If information about the collection can be accessed or is invalid, returns: * @return NamespaceNotFound if the collection no longer exists @@ -107,13 +108,14 @@ private: static Status _initCollection(OperationContext* opCtx, ShardingCatalogClient* catalogClient, const std::string& ns, - const std::string& shard, - CollectionMetadata* metadata); + CollectionMetadata* metadata, + bool isShardPrimary); /** - * Returns OK and fills in the chunk state of 'metadata' to portray the chunks of the - * collection 'ns' that sit in 'shard'. If provided, uses the contents of 'oldMetadata' - * as a base (see description in initCollection above). + * Returns OK and fills in the chunk state of 'metadata' to portray the chunks of the collection + * 'ns' that sit in 'shard'. If provided, uses the contents of 'oldMetadata' as a base (see + * description in initCollection above). If 'isShardPrimary' is true, 'chunks' is persisted on + * the shard so that secondaries receive the new chunks through replication. * * If information about the chunks can be accessed or is invalid, returns: * @return HostUnreachable if there was an error contacting the config servers @@ -128,30 +130,8 @@ private: const std::string& ns, const std::string& shard, const CollectionMetadata* oldMetadata, - CollectionMetadata* metadata); - - - /** - * Takes a vector of 'chunks' and updates the config.chunks.ns collection specified by 'nss'. - * Any chunk documents in config.chunks.ns that overlap with a chunk in 'chunks' is removed - * as the new chunk document is inserted. If the epoch of any chunk in 'chunks' does not match - * 'currEpoch', the chunk metadata is dropped. - * - * @nss - the regular collection namespace for which chunk metadata is being updated. - * @chunks - a range of chunks retrieved from the config server, sorted in ascending chunk - * version order. - * @currEpoch - what this shard server knows to be the collection epoch. - * - * Returns: - * - OK if not primary and no writes are needed. - * - RemoteChangeDetected if the chunk version epoch of any chunk in 'chunks' is different than - * 'currEpoch' - * - Other errors in writes/reads to the config.chunks.ns collection fails. - */ - static Status _writeNewChunksIfPrimary(OperationContext* opCtx, - const NamespaceString& nss, - const std::vector<ChunkType>& chunks, - const OID& currEpoch); + CollectionMetadata* metadata, + bool isShardPrimary); }; } // namespace mongo diff --git a/src/mongo/db/s/metadata_loader_test.cpp b/src/mongo/db/s/metadata_loader_test.cpp index b9d10773563..cbf69c5e185 100644 --- a/src/mongo/db/s/metadata_loader_test.cpp +++ b/src/mongo/db/s/metadata_loader_test.cpp @@ -42,6 +42,7 @@ #include "mongo/s/catalog/sharding_catalog_client_impl.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_shard_collection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/shard_server_test_fixture.h" #include "mongo/stdx/memory.h" @@ -96,6 +97,51 @@ protected: } } + void checkCollectionsEntryExists(const NamespaceString& nss, + const CollectionMetadata& metadata, + bool hasLastConsistentCollectionVersion) { + try { + DBDirectClient client(operationContext()); + Query query BSON(ShardCollectionType::uuid() << nss.ns()); + query.readPref(ReadPreference::Nearest, BSONArray()); + std::unique_ptr<DBClientCursor> cursor = + client.query(CollectionType::ConfigNS.c_str(), query, 1); + ASSERT(cursor); + ASSERT(cursor->more()); + BSONObj queryResult = cursor->nextSafe(); + + ShardCollectionType shardCollectionEntry = + assertGet(ShardCollectionType::fromBSON(queryResult)); + + BSONObjBuilder builder; + builder.append(ShardCollectionType::uuid(), nss.ns()); + builder.append(ShardCollectionType::ns(), nss.ns()); + builder.append(ShardCollectionType::keyPattern(), metadata.getKeyPattern()); + if (hasLastConsistentCollectionVersion) { + metadata.getCollVersion().appendWithFieldForCommands( + &builder, ShardCollectionType::lastConsistentCollectionVersion()); + } + + ASSERT_BSONOBJ_EQ(shardCollectionEntry.toBSON(), builder.obj()); + } catch (const DBException& ex) { + ASSERT(false); + } + } + + void checkCollectionsEntryDoesNotExist(const NamespaceString& nss) { + try { + DBDirectClient client(operationContext()); + Query query BSON(ShardCollectionType::uuid() << nss.ns()); + query.readPref(ReadPreference::Nearest, BSONArray()); + std::unique_ptr<DBClientCursor> cursor = + client.query(ShardCollectionType::ConfigNS.c_str(), query, 1); + ASSERT(cursor); + ASSERT(!cursor->more()); + } catch (const DBException& ex) { + ASSERT(false); + } + } + void expectFindOnConfigSendCollectionDefault() { CollectionType collType; collType.setNs(kNss); @@ -167,30 +213,43 @@ TEST_F(MetadataLoaderTest, DroppedColl) { collType.setEpoch(OID()); collType.setDropped(true); ASSERT_OK(collType.validate()); + + // The config.collections entry indicates that the collection was dropped, failing the refresh. auto future = launchAsync([this] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready("Test"); + auto opCtx = cc().makeOperationContext(); + CollectionMetadata metadata; - auto status = MetadataLoader::makeCollectionMetadata(operationContext(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), kNss.ns(), kShardId.toString(), NULL, /* no old metadata */ &metadata); ASSERT_EQUALS(status.code(), ErrorCodes::NamespaceNotFound); + checkCollectionsEntryDoesNotExist(kNss); }); expectFindOnConfigSendBSONObjVector(std::vector<BSONObj>{collType.toBSON()}); future.timed_get(kFutureTimeout); } TEST_F(MetadataLoaderTest, EmptyColl) { + // Fail due to no config.collections entry found. auto future = launchAsync([this] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready("Test"); + auto opCtx = cc().makeOperationContext(); + CollectionMetadata metadata; - auto status = MetadataLoader::makeCollectionMetadata(operationContext(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), kNss.ns(), kShardId.toString(), NULL, /* no old metadata */ &metadata); ASSERT_EQUALS(status.code(), ErrorCodes::NamespaceNotFound); + checkCollectionsEntryDoesNotExist(kNss); }); expectFindOnConfigSendErrorCode(ErrorCodes::NamespaceNotFound); future.timed_get(kFutureTimeout); @@ -198,15 +257,22 @@ TEST_F(MetadataLoaderTest, EmptyColl) { TEST_F(MetadataLoaderTest, BadColl) { BSONObj badCollToSend = BSON(CollectionType::fullNs(kNss.ns())); + + // Providing an invalid config.collections document should fail the refresh. auto future = launchAsync([this] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready("Test"); + auto opCtx = cc().makeOperationContext(); + CollectionMetadata metadata; - auto status = MetadataLoader::makeCollectionMetadata(operationContext(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), kNss.ns(), kShardId.toString(), NULL, /* no old metadata */ &metadata); ASSERT_EQUALS(status.code(), ErrorCodes::NoSuchKey); + checkCollectionsEntryDoesNotExist(kNss); }); expectFindOnConfigSendBSONObjVector(std::vector<BSONObj>{badCollToSend}); future.timed_get(kFutureTimeout); @@ -218,15 +284,21 @@ TEST_F(MetadataLoaderTest, BadChunk) { chunkInfo.setVersion(ChunkVersion(1, 0, getMaxCollVersion().epoch())); ASSERT(!chunkInfo.validate().isOK()); + // Providing an invalid config.chunks document should fail the refresh. auto future = launchAsync([this] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready("Test"); + auto opCtx = cc().makeOperationContext(); + CollectionMetadata metadata; - auto status = MetadataLoader::makeCollectionMetadata(operationContext(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), kNss.ns(), kShardId.toString(), NULL, /* no old metadata */ &metadata); ASSERT_EQUALS(status.code(), ErrorCodes::NoSuchKey); + checkCollectionsEntryExists(kNss, metadata, false); }); expectFindOnConfigSendCollectionDefault(); @@ -235,6 +307,8 @@ TEST_F(MetadataLoaderTest, BadChunk) { } TEST_F(MetadataLoaderTest, NoChunksIsDropped) { + // Finding no chunks in config.chunks indicates that the collection was dropped, even if an + // entry was previously found in config.collestions indicating that it wasn't dropped. auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); @@ -269,6 +343,7 @@ TEST_F(MetadataLoaderTest, CheckNumChunk) { chunkType.setVersion(ChunkVersion(1, 0, getMaxCollVersion().epoch())); ASSERT(chunkType.validate().isOK()); + // Check that finding no new chunks for the shard works smoothly. auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); @@ -287,6 +362,7 @@ TEST_F(MetadataLoaderTest, CheckNumChunk) { ASSERT_EQUALS(0, metadata.getShardVersion().majorVersion()); checkCollectionMetadataChunksMatchPersistedChunks(kChunkMetadataNss, metadata, 1); + checkCollectionsEntryExists(kNss, metadata, true); }); expectFindOnConfigSendCollectionDefault(); @@ -296,6 +372,7 @@ TEST_F(MetadataLoaderTest, CheckNumChunk) { } TEST_F(MetadataLoaderTest, SingleChunkCheckNumChunk) { + // Check that loading a single chunk for the shard works successfully. auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); @@ -314,6 +391,7 @@ TEST_F(MetadataLoaderTest, SingleChunkCheckNumChunk) { ASSERT_EQUALS(getMaxCollVersion(), metadata.getShardVersion()); checkCollectionMetadataChunksMatchPersistedChunks(kChunkMetadataNss, metadata, 1); + checkCollectionsEntryExists(kNss, metadata, true); }); expectFindOnConfigSendCollectionDefault(); @@ -323,6 +401,7 @@ TEST_F(MetadataLoaderTest, SingleChunkCheckNumChunk) { } TEST_F(MetadataLoaderTest, SeveralChunksCheckNumChunks) { + // Check that loading several chunks for the shard works successfully. auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); @@ -341,6 +420,7 @@ TEST_F(MetadataLoaderTest, SeveralChunksCheckNumChunks) { ASSERT_EQUALS(getMaxCollVersion(), metadata.getShardVersion()); checkCollectionMetadataChunksMatchPersistedChunks(kChunkMetadataNss, metadata, 4); + checkCollectionsEntryExists(kNss, metadata, true); }); expectFindOnConfigSendCollectionDefault(); @@ -350,6 +430,7 @@ TEST_F(MetadataLoaderTest, SeveralChunksCheckNumChunks) { } TEST_F(MetadataLoaderTest, CollectionMetadataSetUp) { + // Check that the CollectionMetadata is set up correctly. auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); @@ -367,6 +448,7 @@ TEST_F(MetadataLoaderTest, CollectionMetadataSetUp) { ASSERT_TRUE(getMaxShardVersion().equals(metadata.getShardVersion())); checkCollectionMetadataChunksMatchPersistedChunks(kChunkMetadataNss, metadata, 1); + checkCollectionsEntryExists(kNss, metadata, true); }); expectFindOnConfigSendCollectionDefault(); diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp new file mode 100644 index 00000000000..eebd0a59109 --- /dev/null +++ b/src/mongo/db/s/shard_metadata_util.cpp @@ -0,0 +1,356 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/shard_metadata_util.h" + +#include "mongo/client/dbclientinterface.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/unique_message.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_shard_collection.h" +#include "mongo/s/chunk_version.h" +#include "mongo/s/write_ops/batched_command_request.h" +#include "mongo/stdx/memory.h" + +namespace mongo { +namespace shardmetadatautil { + +namespace { + +const WriteConcernOptions kLocalWriteConcern(1, + WriteConcernOptions::SyncMode::UNSET, + Milliseconds(0)); + +} // namespace + +StatusWith<std::pair<BSONObj, OID>> getCollectionShardKeyAndEpoch( + OperationContext* opCtx, + ShardingCatalogClient* catalogClient, + const NamespaceString& nss, + bool isShardPrimary) { + if (isShardPrimary) { + // Get the config.collections entry for 'nss'. + auto statusWithColl = catalogClient->getCollection(opCtx, nss.ns()); + if (!statusWithColl.isOK()) { + if (statusWithColl.getStatus() == ErrorCodes::NamespaceNotFound) { + auto status = dropChunksAndDeleteCollectionsEntry( + opCtx, NamespaceString(ChunkType::ConfigNS + "." + nss.ns()), nss); + if (!status.isOK()) { + return status; + } + } + + return statusWithColl.getStatus(); + } + auto collInfo = statusWithColl.getValue().value; + + // Update the shard's config.collections entry so that secondaries receive any changes. + Status updateStatus = updateCollectionEntry(opCtx, + nss, + BSON(ShardCollectionType::uuid(nss.ns())), + ShardCollectionType(collInfo).toBSON()); + if (!updateStatus.isOK()) { + return updateStatus; + } + + return std::pair<BSONObj, OID>(collInfo.getKeyPattern().toBSON(), collInfo.getEpoch()); + } else { // shard secondary + auto statusWithCollectionEntry = readShardCollectionEntry(opCtx, nss); + if (!statusWithCollectionEntry.isOK()) { + return statusWithCollectionEntry.getStatus(); + } + ShardCollectionType shardCollTypeEntry = statusWithCollectionEntry.getValue(); + if (!shardCollTypeEntry.isLastConsistentCollectionVersionSet()) { + // The collection has been dropped since the refresh began. + return {ErrorCodes::NamespaceNotFound, + str::stream() << "Could not load metadata because collection " << nss.ns() + << " was dropped"}; + } + + return std::pair<BSONObj, OID>( + shardCollTypeEntry.getKeyPattern().toBSON(), + shardCollTypeEntry.getLastConsistentCollectionVersionEpoch()); + } +} + + +StatusWith<ShardCollectionType> readShardCollectionEntry(OperationContext* opCtx, + const NamespaceString& nss) { + Query fullQuery(BSON(ShardCollectionType::uuid() << nss.ns())); + fullQuery.readPref(ReadPreference::SecondaryOnly, BSONArray()); + try { + DBDirectClient client(opCtx); + std::unique_ptr<DBClientCursor> cursor = + client.query(ShardCollectionType::ConfigNS.c_str(), fullQuery, 1); + if (!cursor) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "Failed to establish a cursor for reading " + << ShardCollectionType::ConfigNS + << " from local storage"); + } + + if (!cursor->more()) { + // The collection has been dropped. + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << "collection " << nss.ns() << " not found"); + } + + BSONObj document = cursor->nextSafe(); + auto statusWithCollectionEntry = ShardCollectionType::fromBSON(document); + if (!statusWithCollectionEntry.isOK()) { + return statusWithCollectionEntry.getStatus(); + } + + // There should only be one entry per namespace! + invariant(!cursor->more()); + + return statusWithCollectionEntry.getValue(); + } catch (const DBException& ex) { + return {ex.toStatus().code(), + str::stream() << "Failed to read the '" << nss.ns() + << "' entry locally from config.collections" + << causedBy(ex.toStatus())}; + } +} + +Status updateCollectionEntry(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& update) { + const BSONElement idField = query.getField("_id"); + invariant(!idField.eoo()); + + // Want to modify the document, not replace it. + BSONObjBuilder updateBuilder; + updateBuilder.append("$set", update); + + std::unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument()); + updateDoc->setQuery(query); + updateDoc->setUpdateExpr(updateBuilder.obj()); + updateDoc->setUpsert(true); + + std::unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest()); + updateRequest->addToUpdates(updateDoc.release()); + + BatchedCommandRequest request(updateRequest.release()); + request.setNS(NamespaceString(CollectionType::ConfigNS)); + request.setWriteConcern(kLocalWriteConcern.toBSON()); + BSONObj cmdObj = request.toBSON(); + + try { + DBDirectClient client(opCtx); + + rpc::UniqueReply commandResponse = client.runCommandWithMetadata( + "config", cmdObj.firstElementFieldName(), rpc::makeEmptyMetadata(), cmdObj); + BSONObj responseReply = commandResponse->getCommandReply().getOwned(); + + Status commandStatus = getStatusFromCommandResult(commandResponse->getCommandReply()); + if (!commandStatus.isOK()) { + return commandStatus; + } + + return Status::OK(); + } catch (const DBException& ex) { + return {ex.toStatus().code(), + str::stream() << "Failed to locally update the '" << nss.ns() + << "' entry in config.collections" + << causedBy(ex.toStatus())}; + } +} + +Status writeNewChunks(OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<ChunkType>& chunks, + const OID& currEpoch) { + NamespaceString chunkMetadataNss(ChunkType::ConfigNS + "." + nss.ns()); + + try { + DBDirectClient client(opCtx); + + /** + * Here are examples of the operations that can happen on the config server to update + * the config.chunks collection. 'chunks' only includes the chunks that result from the + * operations, which can be read from the config server, not any that were removed, so + * we must delete any chunks that overlap with the new 'chunks'. + * + * CollectionVersion = 10.3 + * + * moveChunk + * {_id: 3, max: 5, version: 10.1} --> {_id: 3, max: 5, version: 11.0} + * + * splitChunk + * {_id: 3, max: 9, version 10.3} --> {_id: 3, max: 5, version 10.4} + * {_id: 5, max: 8, version 10.5} + * {_id: 8, max: 9, version 10.6} + * + * mergeChunk + * {_id: 10, max: 14, version 4.3} --> {_id: 10, max: 22, version 10.4} + * {_id: 14, max: 19, version 7.1} + * {_id: 19, max: 22, version 2.0} + * + */ + for (auto& chunk : chunks) { + // Check for a different epoch. + if (!chunk.getVersion().hasEqualEpoch(currEpoch)) { + // This means the collection was dropped and recreated. Drop the chunk metadata + // and return. + Status status = dropChunksAndDeleteCollectionsEntry(opCtx, chunkMetadataNss, nss); + if (!status.isOK()) { + return status; + } + + return Status{ErrorCodes::RemoteChangeDetected, + str::stream() << "Invalid chunks found when reloading '" + << nss.toString() + << "'. Previous collection epoch was '" + << currEpoch.toString() + << "', but unexpectedly found a new epoch '" + << chunk.getVersion().epoch().toString() + << "'. Collection was dropped and recreated."}; + } + + // Delete any overlapping chunk ranges. Overlapping chunks will have a min value + // ("_id") between (chunk.min, chunk.max]. + // + // query: { "_id" : {"$gte": chunk.min, "$lt": chunk.max}} + auto deleteDocs(stdx::make_unique<BatchedDeleteDocument>()); + deleteDocs->setQuery(BSON(ChunkType::minShardID << BSON( + "$gte" << chunk.getMin() << "$lt" << chunk.getMax()))); + deleteDocs->setLimit(0); + + auto deleteRequest(stdx::make_unique<BatchedDeleteRequest>()); + deleteRequest->addToDeletes(deleteDocs.release()); + + BatchedCommandRequest batchedDeleteRequest(deleteRequest.release()); + batchedDeleteRequest.setNS(chunkMetadataNss); + const BSONObj deleteCmdObj = batchedDeleteRequest.toBSON(); + + rpc::UniqueReply deleteCommandResponse = + client.runCommandWithMetadata(chunkMetadataNss.db().toString(), + deleteCmdObj.firstElementFieldName(), + rpc::makeEmptyMetadata(), + deleteCmdObj); + auto deleteStatus = + getStatusFromCommandResult(deleteCommandResponse->getCommandReply()); + + if (!deleteStatus.isOK()) { + return deleteStatus; + } + + // Now the document can be expected to cleanly insert without overlap. + auto insert(stdx::make_unique<BatchedInsertRequest>()); + insert->addToDocuments(chunk.toShardBSON()); + + BatchedCommandRequest insertRequest(insert.release()); + insertRequest.setNS(chunkMetadataNss); + const BSONObj insertCmdObj = insertRequest.toBSON(); + + rpc::UniqueReply commandResponse = + client.runCommandWithMetadata(chunkMetadataNss.db().toString(), + insertCmdObj.firstElementFieldName(), + rpc::makeEmptyMetadata(), + insertCmdObj); + auto insertStatus = getStatusFromCommandResult(commandResponse->getCommandReply()); + + if (!insertStatus.isOK()) { + return insertStatus; + } + } + + // Must update the config.collections 'lastConsistentCollectionVersion' field so that + // secondaries can load the latest config.chunks.ns writes. + if (!chunks.empty()) { + BSONObjBuilder builder; + chunks.back().getVersion().appendWithFieldForCommands( + &builder, ShardCollectionType::lastConsistentCollectionVersion()); + BSONObj update = builder.obj(); + + auto collUpdateStatus = updateCollectionEntry( + opCtx, nss, BSON(ShardCollectionType::uuid(nss.ns())), update); + if (!collUpdateStatus.isOK()) { + return collUpdateStatus; + } + } + + return Status::OK(); + } catch (const DBException& ex) { + return ex.toStatus(); + } +} + +Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx, + const NamespaceString& chunkMetadataNss, + const NamespaceString& collectionsEntryNss) { + try { + DBDirectClient client(opCtx); + + // Drop the config.chunks.ns collection specified by 'chunkMetadataNss'. + rpc::UniqueReply commandResponse = + client.runCommandWithMetadata(chunkMetadataNss.db().toString(), + "drop", + rpc::makeEmptyMetadata(), + BSON("drop" << chunkMetadataNss.coll())); + Status status = getStatusFromCommandResult(commandResponse->getCommandReply()); + + if (!status.isOK() && status.code() != ErrorCodes::NamespaceNotFound) { + return status; + } + + // Delete the config.collections entry matching 'collectionsEntryNss'. + auto deleteDocs(stdx::make_unique<BatchedDeleteDocument>()); + deleteDocs->setQuery(BSON(ShardCollectionType::uuid << collectionsEntryNss.ns())); + deleteDocs->setLimit(0); + + auto deleteRequest(stdx::make_unique<BatchedDeleteRequest>()); + deleteRequest->addToDeletes(deleteDocs.release()); + + BatchedCommandRequest batchedDeleteRequest(deleteRequest.release()); + batchedDeleteRequest.setNS(NamespaceString(ShardCollectionType::ConfigNS)); + const BSONObj deleteCmdObj = batchedDeleteRequest.toBSON(); + + rpc::UniqueReply deleteCommandResponse = client.runCommandWithMetadata( + "config", deleteCmdObj.firstElementFieldName(), rpc::makeEmptyMetadata(), deleteCmdObj); + auto deleteStatus = getStatusFromCommandResult(deleteCommandResponse->getCommandReply()); + + if (!deleteStatus.isOK()) { + return deleteStatus; + } + + return Status::OK(); + } catch (const DBException& ex) { + return ex.toStatus(); + } +} + +} // namespace shardmetadatautil +} // namespace mongo diff --git a/src/mongo/db/s/shard_metadata_util.h b/src/mongo/db/s/shard_metadata_util.h new file mode 100644 index 00000000000..2acbd138a1c --- /dev/null +++ b/src/mongo/db/s/shard_metadata_util.h @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> +#include <vector> + +#include "mongo/base/status.h" + +namespace mongo { + +class BSONObj; +class ChunkType; +class CollectionMetadata; +class ShardCollectionType; +class NamespaceString; +class OID; +class OperationContext; +class ShardingCatalogClient; +template <typename T> +class StatusWith; + +/** + * Function helpers to locally, using a DBDirectClient, read and write sharding metadata on a shard. + * Also retrieves metadata from the config server, copies of which are then persisted on the shard. + */ +namespace shardmetadatautil { + +/** + * Gets the config.collections for 'nss' entry either remotely from the config server if + * 'isShardPrimary' is true or locally from the shard if false. Additionally updates the shard's + * config.collections entry with the remotely retrieved metadata if 'isShardPrimary' is true. + */ +StatusWith<std::pair<BSONObj, OID>> getCollectionShardKeyAndEpoch( + OperationContext* opCtx, + ShardingCatalogClient* catalogClient, + const NamespaceString& nss, + bool isShardPrimary); + +/** + * Reads the shard server's config.collections entry identified by 'nss'. + */ +StatusWith<ShardCollectionType> readShardCollectionEntry(OperationContext* opCtx, + const NamespaceString& nss); + +/** + * Updates the config.collections entry matching 'query' with 'update' using local write concern. + * Only the fields specified in 'update' are modified. + * Sets upsert to true on the update operation in case the entry does not exist locally yet. + */ +Status updateCollectionEntry(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& update); + +/** + * Two threads running this function in parallel for the same collection can corrupt the collection + * data! + * + * Takes a vector of 'chunks' and updates the config.chunks.ns collection specified by 'nss'. + * Any chunk documents in config.chunks.ns that overlap with a chunk in 'chunks' is removed + * as the new chunk document is inserted. If the epoch of any chunk in 'chunks' does not match + * 'currEpoch', the chunk metadata is dropped and a RemoteChangeDetected error returned. + * + * @nss - the regular collection namespace for which chunk metadata is being updated. + * @chunks - chunks retrieved from the config server, sorted in ascending chunk version order + * @currEpoch - what this shard server expects to be the collection epoch. + * + * Returns: + * - RemoteChangeDetected if the chunk version epoch of any chunk in 'chunks' is different than + * 'currEpoch' + * - Other errors if unable to do local writes/reads to the config.chunks.ns colection. + */ +Status writeNewChunks(OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<ChunkType>& chunks, + const OID& currEpoch); + +/** + * Locally on this shard, drops the config.chunks.ns corresponding to 'chunkMetadataNss' and then + * deletes the config.collections entry for 'collectionEntryNss'. + */ +Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx, + const NamespaceString& chunkMetadataNss, + const NamespaceString& collectionsEntryNss); + +} // namespace shardmetadatautil +} // namespace mongo diff --git a/src/mongo/db/s/shard_metadata_util_test.cpp b/src/mongo/db/s/shard_metadata_util_test.cpp new file mode 100644 index 00000000000..79765f9720b --- /dev/null +++ b/src/mongo/db/s/shard_metadata_util_test.cpp @@ -0,0 +1,314 @@ +/** + * Copyright (C) 2017 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/base/status.h" +#include "mongo/client/dbclientinterface.h" +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/commands.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/s/shard_metadata_util.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_shard_collection.h" +#include "mongo/s/shard_server_test_fixture.h" +#include "mongo/s/write_ops/batched_command_request.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using std::string; +using std::unique_ptr; +using std::vector; +using unittest::assertGet; + +const HostAndPort kConfigHostAndPort = HostAndPort("dummy", 123); +const NamespaceString kNss = NamespaceString("test.foo"); +const NamespaceString kChunkMetadataNss = NamespaceString("config.chunks.test.foo"); +const ShardId kShardId = ShardId("shard0"); + +class ShardMetadataUtilTest : public ShardServerTestFixture { +protected: + /** + * Call this after setUpChunks()! Both functions use the private _maxCollVersion variable. To + * set the lastConsistentCollectionVersion correctly this must be called second. + * + * Inserts a config.collections entry. + */ + void setUpCollection() { + BSONObjBuilder builder; + builder.append(ShardCollectionType::uuid(), kNss.ns()); + builder.append(ShardCollectionType::ns(), kNss.ns()); + builder.append(ShardCollectionType::keyPattern(), _keyPattern.toBSON()); + _maxCollVersion.appendWithFieldForCommands( + &builder, ShardCollectionType::lastConsistentCollectionVersion()); + ShardCollectionType shardCollType = assertGet(ShardCollectionType::fromBSON(builder.obj())); + + try { + DBDirectClient client(operationContext()); + auto insert(stdx::make_unique<BatchedInsertRequest>()); + insert->addToDocuments(shardCollType.toBSON()); + + BatchedCommandRequest insertRequest(insert.release()); + insertRequest.setNS(NamespaceString(ShardCollectionType::ConfigNS)); + const BSONObj insertCmdObj = insertRequest.toBSON(); + + rpc::UniqueReply commandResponse = + client.runCommandWithMetadata("config", + insertCmdObj.firstElementFieldName(), + rpc::makeEmptyMetadata(), + insertCmdObj); + ASSERT_OK(getStatusFromCommandResult(commandResponse->getCommandReply())); + } catch (const DBException& ex) { + ASSERT(false); + } + } + + /** + * Helper to make a number of chunks that can then be manipulated in various ways in the tests. + * Chunks have the shard server's config.chunks.ns schema. + */ + std::vector<ChunkType> makeFourChunks() { + std::vector<ChunkType> chunks; + BSONObj mins[] = {BSON("a" << MINKEY), BSON("a" << 10), BSON("a" << 50), BSON("a" << 100)}; + BSONObj maxs[] = {BSON("a" << 10), BSON("a" << 50), BSON("a" << 100), BSON("a" << MAXKEY)}; + + for (int i = 0; i < 4; ++i) { + _maxCollVersion.incMajor(); + BSONObj shardChunk = BSON(ChunkType::minShardID(mins[i]) + << ChunkType::max(maxs[i]) + << ChunkType::shard(kShardId.toString()) + << ChunkType::DEPRECATED_lastmod(Date_t::fromMillisSinceEpoch( + _maxCollVersion.toLong()))); + + chunks.push_back( + assertGet(ChunkType::fromShardBSON(shardChunk, _maxCollVersion.epoch()))); + } + + return chunks; + } + + /** + * Inserts 'chunks' into the config.chunks.ns collection 'chunkMetadataNss'. + */ + void setUpChunks(const NamespaceString& chunkMetadataNss, const std::vector<ChunkType> chunks) { + try { + DBDirectClient client(operationContext()); + auto insert(stdx::make_unique<BatchedInsertRequest>()); + + for (auto& chunk : chunks) { + insert->addToDocuments(chunk.toShardBSON()); + } + + BatchedCommandRequest insertRequest(insert.release()); + insertRequest.setNS(NamespaceString(chunkMetadataNss)); + const BSONObj insertCmdObj = insertRequest.toBSON(); + + rpc::UniqueReply commandResponse = + client.runCommandWithMetadata(chunkMetadataNss.db().toString(), + insertCmdObj.firstElementFieldName(), + rpc::makeEmptyMetadata(), + insertCmdObj); + ASSERT_OK(getStatusFromCommandResult(commandResponse->getCommandReply())); + } catch (const DBException& ex) { + ASSERT(false); + } + } + + /** + * Sets up persisted chunk metadata. Inserts four chunks for kNss into kChunkMetadataNss and a + * corresponding collection entry in config.collections. + */ + void setUpShardingMetadata() { + setUpChunks(kChunkMetadataNss, makeFourChunks()); + setUpCollection(); + } + + /** + * Checks that 'nss' has no documents. + */ + void checkCollectionIsEmpty(const NamespaceString& nss) { + try { + DBDirectClient client(operationContext()); + ASSERT_EQUALS(client.count(nss.ns()), 0ULL); + } catch (const DBException& ex) { + ASSERT(false); + } + } + + /** + * Checks that each chunk in 'chunks' has been written to 'chunkMetadataNss'. + */ + void checkChunks(const NamespaceString& chunkMetadataNss, + const std::vector<ChunkType>& chunks) { + try { + DBDirectClient client(operationContext()); + for (auto& chunk : chunks) { + Query query(BSON(ChunkType::minShardID() << chunk.getMin() << ChunkType::max() + << chunk.getMax())); + query.readPref(ReadPreference::Nearest, BSONArray()); + + std::unique_ptr<DBClientCursor> cursor = + client.query(chunkMetadataNss.ns(), query, 1); + ASSERT(cursor); + + ASSERT(cursor->more()); + BSONObj queryResult = cursor->nextSafe(); + ChunkType foundChunk = + assertGet(ChunkType::fromShardBSON(queryResult, chunk.getVersion().epoch())); + ASSERT_BSONOBJ_EQ(chunk.getMin(), foundChunk.getMin()); + ASSERT_BSONOBJ_EQ(chunk.getMax(), foundChunk.getMax()); + ASSERT_EQUALS(chunk.getShard(), foundChunk.getShard()); + ASSERT_EQUALS(chunk.getVersion(), foundChunk.getVersion()); + } + } catch (const DBException& ex) { + ASSERT(false); + } + } + + const ChunkVersion& getCollectionVersion() const { + return _maxCollVersion; + } + + const KeyPattern& getKeyPattern() const { + return _keyPattern; + } + +private: + ChunkVersion _maxCollVersion{1, 0, OID::gen()}; + const KeyPattern _keyPattern{BSON("a" << 1)}; +}; + +TEST_F(ShardMetadataUtilTest, UpdateAndReadCollectionsDocument) { + // Insert document + BSONObj shardCollectionTypeObj = + BSON(ShardCollectionType::uuid(kNss.ns()) + << ShardCollectionType::ns(kNss.ns()) + << ShardCollectionType::keyPattern(getKeyPattern().toBSON())); + ASSERT_OK(shardmetadatautil::updateCollectionEntry(operationContext(), + kNss, + BSON(ShardCollectionType::uuid(kNss.ns())), + shardCollectionTypeObj)); + + ShardCollectionType readShardCollectionType = + assertGet(shardmetadatautil::readShardCollectionEntry(operationContext(), kNss)); + ASSERT_BSONOBJ_EQ(shardCollectionTypeObj, readShardCollectionType.toBSON()); + + // Update document + BSONObjBuilder updateBuilder; + getCollectionVersion().appendWithFieldForCommands( + &updateBuilder, ShardCollectionType::lastConsistentCollectionVersion()); + ASSERT_OK(shardmetadatautil::updateCollectionEntry( + operationContext(), kNss, BSON(ShardCollectionType::uuid(kNss.ns())), updateBuilder.obj())); + + readShardCollectionType = + assertGet(shardmetadatautil::readShardCollectionEntry(operationContext(), kNss)); + + ShardCollectionType updatedShardCollectionType = + assertGet(ShardCollectionType::fromBSON(shardCollectionTypeObj)); + updatedShardCollectionType.setLastConsistentCollectionVersion(getCollectionVersion()); + + ASSERT_BSONOBJ_EQ(updatedShardCollectionType.toBSON(), readShardCollectionType.toBSON()); +} + +TEST_F(ShardMetadataUtilTest, WriteNewChunks) { + std::vector<ChunkType> chunks = makeFourChunks(); + shardmetadatautil::writeNewChunks( + operationContext(), kNss, chunks, getCollectionVersion().epoch()); + checkChunks(kChunkMetadataNss, chunks); +} + +TEST_F(ShardMetadataUtilTest, UpdateWithWriteNewChunks) { + // Load some chunk metadata. + + std::vector<ChunkType> chunks = makeFourChunks(); + ASSERT_OK(shardmetadatautil::writeNewChunks( + operationContext(), kNss, chunks, getCollectionVersion().epoch())); + checkChunks(kChunkMetadataNss, chunks); + + // Load some changes and make sure it's applied correctly. + // Split the last chunk in two and move the new last chunk away. + + std::vector<ChunkType> newChunks; + ChunkType lastChunk = chunks.back(); + chunks.pop_back(); + ChunkVersion collVersion = getCollectionVersion(); + + collVersion.incMinor(); // chunk only split + BSONObjBuilder splitChunkOneBuilder; + splitChunkOneBuilder.append(ChunkType::minShardID(), lastChunk.getMin()); + { + BSONObjBuilder subMax(splitChunkOneBuilder.subobjStart(ChunkType::max())); + subMax.append("a", 10000); + } + splitChunkOneBuilder.append(ChunkType::shard(), lastChunk.getShard().toString()); + collVersion.appendForChunk(&splitChunkOneBuilder); + ChunkType splitChunkOne = + assertGet(ChunkType::fromShardBSON(splitChunkOneBuilder.obj(), collVersion.epoch())); + newChunks.push_back(splitChunkOne); + + collVersion.incMajor(); // chunk split and moved + BSONObjBuilder splitChunkTwoMovedBuilder; + { + BSONObjBuilder subMin(splitChunkTwoMovedBuilder.subobjStart(ChunkType::minShardID())); + subMin.append("a", 10000); + } + splitChunkTwoMovedBuilder.append(ChunkType::max(), lastChunk.getMax()); + splitChunkTwoMovedBuilder.append(ChunkType::shard(), "altShard"); + collVersion.appendForChunk(&splitChunkTwoMovedBuilder); + ChunkType splitChunkTwoMoved = + assertGet(ChunkType::fromShardBSON(splitChunkTwoMovedBuilder.obj(), collVersion.epoch())); + newChunks.push_back(splitChunkTwoMoved); + + collVersion.incMinor(); // bump control chunk version + ChunkType frontChunkControl = chunks.front(); + chunks.erase(chunks.begin()); + frontChunkControl.setVersion(collVersion); + newChunks.push_back(frontChunkControl); + + ASSERT_OK(shardmetadatautil::writeNewChunks( + operationContext(), kNss, newChunks, collVersion.epoch())); + + chunks.push_back(splitChunkOne); + chunks.push_back(splitChunkTwoMoved); + chunks.push_back(frontChunkControl); + checkChunks(kChunkMetadataNss, chunks); +} + +TEST_F(ShardMetadataUtilTest, DropChunksAndDeleteCollectionsEntry) { + setUpShardingMetadata(); + ASSERT_OK(shardmetadatautil::dropChunksAndDeleteCollectionsEntry( + operationContext(), kChunkMetadataNss, kNss)); + checkCollectionIsEmpty(NamespaceString(ShardCollectionType::ConfigNS)); + checkCollectionIsEmpty(kChunkMetadataNss); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index f802a27739d..f6ef30c691d 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -73,6 +73,7 @@ env.Library( 'catalog/type_locks.cpp', 'catalog/type_mongos.cpp', 'catalog/type_shard.cpp', + 'catalog/type_shard_collection.cpp', 'catalog/type_tags.cpp', 'request_types/add_shard_request_type.cpp', 'request_types/add_shard_to_zone_request_type.cpp', @@ -192,13 +193,13 @@ env.CppUnitTest( 'catalog/type_locks_test.cpp', 'catalog/type_mongos_test.cpp', 'catalog/type_shard_test.cpp', + 'catalog/type_shard_collection_test.cpp', 'catalog/type_tags_test.cpp', 'chunk_version_test.cpp', 'migration_secondary_throttle_options_test.cpp', 'move_chunk_request_test.cpp', 'set_shard_version_request_test.cpp', 'shard_key_pattern_test.cpp', -# 'shard_id_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/query/query_test_service_context', diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index f541dde581f..8c36fccae77 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -854,7 +854,13 @@ StatusWith<repl::OpTimeWith<CollectionType>> ShardingCatalogClientImpl::getColle return parseStatus.getStatus(); } - return repl::OpTimeWith<CollectionType>(parseStatus.getValue(), retOpTimePair.opTime); + auto collType = parseStatus.getValue(); + if (collType.getDropped()) { + return Status(ErrorCodes::NamespaceNotFound, + stream() << "collection " << collNs << " was dropped"); + } + + return repl::OpTimeWith<CollectionType>(collType, retOpTimePair.opTime); } Status ShardingCatalogClientImpl::getCollections(OperationContext* opCtx, diff --git a/src/mongo/s/catalog/type_collection.h b/src/mongo/s/catalog/type_collection.h index 06c51be3b55..b46937266e1 100644 --- a/src/mongo/s/catalog/type_collection.h +++ b/src/mongo/s/catalog/type_collection.h @@ -43,9 +43,26 @@ class StatusWith; /** - * This class represents the layout and contents of documents contained in the + * This class represents the layout and contents of documents contained in the config server's * config.collections collection. All manipulation of documents coming from that collection * should be done with this class. + * + * Expected config server config.collections collection format: + * { + * "_id" : "foo.bar", + * "lastmodEpoch" : ObjectId("58b6fd76132358839e409e47"), + * "lastmod" : ISODate("1970-02-19T17:02:47.296Z"), + * "dropped" : false, + * "key" : { + * "_id" : 1 + * }, + * "defaultCollation" : { + * "locale" : "fr_CA" + * }, + * "unique" : false, + * "noBalance" : false + * } + * */ class CollectionType { public: diff --git a/src/mongo/s/catalog/type_shard_collection.cpp b/src/mongo/s/catalog/type_shard_collection.cpp new file mode 100644 index 00000000000..e1c2a067be6 --- /dev/null +++ b/src/mongo/s/catalog/type_shard_collection.cpp @@ -0,0 +1,160 @@ +/** + * Copyright (C) 2017 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/catalog/type_shard_collection.h" + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +const std::string ShardCollectionType::ConfigNS = "config.collections"; + +const BSONField<std::string> ShardCollectionType::uuid("_id"); +const BSONField<std::string> ShardCollectionType::ns("ns"); +const BSONField<BSONObj> ShardCollectionType::keyPattern("key"); +const BSONField<OID> ShardCollectionType::lastConsistentCollectionVersionEpoch( + "lastConsistentCollectionVersionEpoch"); +const BSONField<Date_t> ShardCollectionType::lastConsistentCollectionVersion( + "lastConsistentCollectionVersion"); + +ShardCollectionType::ShardCollectionType(const CollectionType& collectionType) + : ShardCollectionType( + collectionType.getNs(), collectionType.getNs(), collectionType.getKeyPattern()) {} + +ShardCollectionType::ShardCollectionType(const NamespaceString& uuid, + const NamespaceString& ns, + const KeyPattern& keyPattern) + : _uuid(uuid), _ns(ns), _keyPattern(keyPattern.toBSON()) {} + +StatusWith<ShardCollectionType> ShardCollectionType::fromBSON(const BSONObj& source) { + NamespaceString uuidNss; + { + std::string uuidString; + Status status = bsonExtractStringField(source, uuid.name(), &uuidString); + if (!status.isOK()) { + return status; + } + uuidNss = NamespaceString{uuidString}; + } + + NamespaceString nsNss; + { + std::string nsString; + Status status = bsonExtractStringField(source, ns.name(), &nsString); + if (!status.isOK()) { + return status; + } + nsNss = NamespaceString{nsString}; + } + + BSONElement collKeyPattern; + Status status = bsonExtractTypedField(source, keyPattern.name(), Object, &collKeyPattern); + if (!status.isOK()) { + return status; + } + BSONObj obj = collKeyPattern.Obj(); + if (obj.isEmpty()) { + return Status(ErrorCodes::ShardKeyNotFound, + str::stream() << "Empty shard key. Failed to parse: " << source.toString()); + } + KeyPattern pattern(obj.getOwned()); + + ShardCollectionType shardCollectionType(uuidNss, nsNss, pattern); + + { + auto statusWithChunkVersion = ChunkVersion::parseFromBSONWithFieldForCommands( + source, lastConsistentCollectionVersion.name()); + if (statusWithChunkVersion.isOK()) { + ChunkVersion collVersion = std::move(statusWithChunkVersion.getValue()); + shardCollectionType.setLastConsistentCollectionVersion(std::move(collVersion)); + } else if (statusWithChunkVersion == ErrorCodes::NoSuchKey) { + // May not be set yet, which is okay. + } else { + return statusWithChunkVersion.getStatus(); + } + } + + return shardCollectionType; +} + +bool ShardCollectionType::isLastConsistentCollectionVersionSet() const { + return _lastConsistentCollectionVersion.is_initialized(); +} + +BSONObj ShardCollectionType::toBSON() const { + BSONObjBuilder builder; + + builder.append(uuid.name(), _uuid.toString()); + builder.append(ns.name(), _ns.toString()); + builder.append(keyPattern.name(), _keyPattern.toBSON()); + + if (_lastConsistentCollectionVersion) { + _lastConsistentCollectionVersion->appendWithFieldForCommands( + &builder, lastConsistentCollectionVersion.name()); + } + + return builder.obj(); +} + +std::string ShardCollectionType::toString() const { + return toBSON().toString(); +} + +void ShardCollectionType::setUUID(const NamespaceString& uuid) { + invariant(uuid.isValid()); + _uuid = uuid; +} + +void ShardCollectionType::setNs(const NamespaceString& ns) { + invariant(ns.isValid()); + _ns = ns; +} + +void ShardCollectionType::setKeyPattern(const KeyPattern& keyPattern) { + invariant(!keyPattern.toBSON().isEmpty()); + _keyPattern = keyPattern; +} + +void ShardCollectionType::setLastConsistentCollectionVersion( + const ChunkVersion& lastConsistentCollectionVersion) { + _lastConsistentCollectionVersion = lastConsistentCollectionVersion; +} + +const OID ShardCollectionType::getLastConsistentCollectionVersionEpoch() const { + invariant(_lastConsistentCollectionVersion); + return _lastConsistentCollectionVersion->epoch(); +} + +} // namespace mongo diff --git a/src/mongo/s/catalog/type_shard_collection.h b/src/mongo/s/catalog/type_shard_collection.h new file mode 100644 index 00000000000..ca9fa00d5b5 --- /dev/null +++ b/src/mongo/s/catalog/type_shard_collection.h @@ -0,0 +1,147 @@ +/** + * Copyright (C) 2017 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <string> + +#include "mongo/db/jsobj.h" +#include "mongo/db/keypattern.h" +#include "mongo/db/namespace_string.h" +#include "mongo/s/chunk_version.h" + +namespace mongo { + +class CollectionType; +class Status; +template <typename T> +class StatusWith; + +/** + * This class represents the layout and contents of documents contained in the shard server's + * config.collections collections. All manipulation of documents coming from that collection should + * be done with this class. + * + * Expected shard server config.collections collection format: + * { + * "_id" : "foo.bar", // will eventually become a UUID field, when it becomes available + * "ns" : "foo.bar", + * "key" : { + * "_id" : 1 + * }, + * "lastConsistentCollectionVersionEpoch" : ObjectId("58b6fd76132358839e409e47"), + * "lastConsistentCollectionVersion" : ISODate("1970-02-19T17:02:47.296Z") + * } + * + * The 'lastConsistentCollectionVersion' is written by shard primaries and used by shard + * secondaries. A secondary uses the value to refresh chunk metadata up to the chunk with that + * chunk version. Chunk metadata updates on the shard involve multiple chunks collection document + * writes, during which time the data can be inconsistent and should not be loaded. + */ +class ShardCollectionType { +public: + // Name of the collections collection in the config server. + static const std::string ConfigNS; + + static const BSONField<std::string> uuid; + static const BSONField<std::string> ns; + static const BSONField<BSONObj> keyPattern; + static const BSONField<OID> lastConsistentCollectionVersionEpoch; + static const BSONField<Date_t> lastConsistentCollectionVersion; + + /** + * The MetadataLoader fetches CollectionType format documents from the config server and then + * writes them in ShardCollectionType format to the shard. This constructor facilitates the + * conversion from config to shard config.collections schema. + */ + explicit ShardCollectionType(const CollectionType& collType); + + /** + * Constructs a new ShardCollectionType object from BSON retrieved from a shard server. Also + * does + * validation of the contents. + */ + static StatusWith<ShardCollectionType> fromBSON(const BSONObj& source); + + /** + * Returns the BSON representation of the entry for the shard collection schema. + * + * This function only appends the fields and values relevant to shards that are SET on the + * ShardCollectionType object. No field is guaranteed to be appended. + */ + BSONObj toBSON() const; + + /** + * Returns a std::string representation of the current internal state. + */ + std::string toString() const; + + const NamespaceString& getUUID() const { + return _uuid; + } + void setUUID(const NamespaceString& uuid); + + const NamespaceString& getNs() const { + return _ns; + } + void setNs(const NamespaceString& ns); + + const KeyPattern& getKeyPattern() const { + return _keyPattern; + } + void setKeyPattern(const KeyPattern& keyPattern); + + const ChunkVersion& getLastConsistentCollectionVersion() const { + return _lastConsistentCollectionVersion.get(); + } + void setLastConsistentCollectionVersion(const ChunkVersion& lastConsistentCollectionVersion); + + bool isLastConsistentCollectionVersionSet() const; + + const OID getLastConsistentCollectionVersionEpoch() const; + +private: + ShardCollectionType(const NamespaceString& uuid, + const NamespaceString& ns, + const KeyPattern& keyPattern); + + NamespaceString _uuid; + + // The full namespace (with the database prefix). + NamespaceString _ns; + + // Sharding key. If collection is dropped, this is no longer required. + KeyPattern _keyPattern; + + // used by shard secondaries to safely refresh chunk metadata up to this version: higher + // versions may put the chunk metadata into an inconsistent state. + boost::optional<ChunkVersion> _lastConsistentCollectionVersion; +}; + +} // namespace mongo diff --git a/src/mongo/s/catalog/type_shard_collection_test.cpp b/src/mongo/s/catalog/type_shard_collection_test.cpp new file mode 100644 index 00000000000..d77d7fa4996 --- /dev/null +++ b/src/mongo/s/catalog/type_shard_collection_test.cpp @@ -0,0 +1,133 @@ +/** + * Copyright (C) 2017 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/catalog/type_shard_collection.h" + +#include "mongo/base/status_with.h" +#include "mongo/bson/oid.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/time_support.h" + +namespace mongo { +namespace { + +using unittest::assertGet; + +const NamespaceString kNss = NamespaceString("db.coll"); +const BSONObj kKeyPattern = BSON("a" << 1); + +TEST(ShardCollectionType, ToFromShardBSONWithLastConsistentCollectionVersion) { + const ChunkVersion lastConsistent(1, 0, OID::gen()); + + BSONObjBuilder builder; + builder.append(ShardCollectionType::uuid.name(), kNss.ns()); + builder.append(ShardCollectionType::ns.name(), kNss.ns()); + builder.append(ShardCollectionType::keyPattern.name(), kKeyPattern); + lastConsistent.appendWithFieldForCommands( + &builder, ShardCollectionType::lastConsistentCollectionVersion.name()); + BSONObj obj = builder.obj(); + + ShardCollectionType shardCollectionType = assertGet(ShardCollectionType::fromBSON(obj)); + + ASSERT_EQUALS(shardCollectionType.getUUID(), kNss); + ASSERT_EQUALS(shardCollectionType.getNs(), kNss); + ASSERT_BSONOBJ_EQ(shardCollectionType.getKeyPattern().toBSON(), kKeyPattern); + ASSERT_EQUALS(shardCollectionType.getLastConsistentCollectionVersion(), lastConsistent); + + ASSERT_BSONOBJ_EQ(obj, shardCollectionType.toBSON()); +} + +TEST(ShardCollectionType, ToFromShardBSONWithoutLastConsistentCollectionVersion) { + BSONObjBuilder builder; + builder.append(ShardCollectionType::uuid.name(), kNss.ns()); + builder.append(ShardCollectionType::ns.name(), kNss.ns()); + builder.append(ShardCollectionType::keyPattern.name(), kKeyPattern); + BSONObj obj = builder.obj(); + + ShardCollectionType shardCollectionType = assertGet(ShardCollectionType::fromBSON(obj)); + + ASSERT_EQUALS(shardCollectionType.getUUID(), kNss); + ASSERT_EQUALS(shardCollectionType.getNs(), kNss); + ASSERT_BSONOBJ_EQ(shardCollectionType.getKeyPattern().toBSON(), kKeyPattern); + ASSERT_FALSE(shardCollectionType.isLastConsistentCollectionVersionSet()); + + ASSERT_BSONOBJ_EQ(obj, shardCollectionType.toBSON()); +} + +TEST(ShardCollectionType, FromEmptyBSON) { + StatusWith<ShardCollectionType> status = ShardCollectionType::fromBSON(BSONObj()); + ASSERT_FALSE(status.isOK()); +} + +TEST(ShardCollectionType, FromBSONNoUUIDFails) { + BSONObj obj = + BSON(ShardCollectionType::ns(kNss.ns()) << ShardCollectionType::keyPattern(kKeyPattern)); + + StatusWith<ShardCollectionType> status = ShardCollectionType::fromBSON(obj); + ASSERT_EQUALS(status.getStatus().code(), ErrorCodes::NoSuchKey); +} + +TEST(ShardCollectionType, FromBSONNoNSFails) { + BSONObj obj = + BSON(ShardCollectionType::uuid(kNss.ns()) << ShardCollectionType::keyPattern(kKeyPattern)); + + StatusWith<ShardCollectionType> status = ShardCollectionType::fromBSON(obj); + ASSERT_EQUALS(status.getStatus().code(), ErrorCodes::NoSuchKey); +} + +TEST(ShardCollectionType, FromBSONNoShardKeyFails) { + BSONObj obj = BSON(ShardCollectionType::uuid(kNss.ns()) << ShardCollectionType::ns(kNss.ns())); + + StatusWith<ShardCollectionType> status = ShardCollectionType::fromBSON(obj); + ASSERT_EQUALS(status.getStatus().code(), ErrorCodes::NoSuchKey); +} + +TEST(ShardCollectionType, ConstructFromCollectionType) { + const OID oid = OID::gen(); + + BSONObj obj = BSON(CollectionType::fullNs(kNss.ns()) + << CollectionType::epoch(oid) + << CollectionType::updatedAt(Date_t::fromMillisSinceEpoch(1)) + << CollectionType::keyPattern(kKeyPattern)); + CollectionType collectionType = assertGet(CollectionType::fromBSON(obj)); + ASSERT_TRUE(collectionType.validate().isOK()); + + BSONObjBuilder shardCollectionTypeBuilder; + shardCollectionTypeBuilder.append(ShardCollectionType::uuid.name(), kNss.ns()); + shardCollectionTypeBuilder.append(ShardCollectionType::ns.name(), kNss.ns()); + shardCollectionTypeBuilder.append(ShardCollectionType::keyPattern.name(), kKeyPattern); + + ASSERT_BSONOBJ_EQ(ShardCollectionType(collectionType).toBSON(), + shardCollectionTypeBuilder.obj()); +} + +} // namespace +} // namespace mongo |