summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/SConscript15
-rw-r--r--src/mongo/db/s/metadata_loader.cpp179
-rw-r--r--src/mongo/db/s/metadata_loader.h52
-rw-r--r--src/mongo/db/s/metadata_loader_test.cpp90
-rw-r--r--src/mongo/db/s/shard_metadata_util.cpp356
-rw-r--r--src/mongo/db/s/shard_metadata_util.h114
-rw-r--r--src/mongo/db/s/shard_metadata_util_test.cpp314
-rw-r--r--src/mongo/s/SConscript3
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.cpp8
-rw-r--r--src/mongo/s/catalog/type_collection.h19
-rw-r--r--src/mongo/s/catalog/type_shard_collection.cpp160
-rw-r--r--src/mongo/s/catalog/type_shard_collection.h147
-rw-r--r--src/mongo/s/catalog/type_shard_collection_test.cpp133
13 files changed, 1398 insertions, 192 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 34cb36d2a65..b47c1383029 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -11,13 +11,23 @@ env.Library(
'metadata_loader.cpp',
],
LIBDEPS=[
+ 'shard_metadata_util',
'$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/client/clientdriver',
- '$BUILD_DIR/mongo/db/dbdirectclient',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/range_arithmetic',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_impl',
'$BUILD_DIR/mongo/db/service_context',
+ ],
+)
+
+env.Library(
+ target='shard_metadata_util',
+ source=[
+ 'shard_metadata_util.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/client/clientdriver',
+ '$BUILD_DIR/mongo/db/dbdirectclient',
'$BUILD_DIR/mongo/s/common',
],
)
@@ -207,6 +217,7 @@ env.CppUnitTest(
source=[
'collection_metadata_test.cpp',
'metadata_loader_test.cpp',
+ 'shard_metadata_util_test.cpp',
],
LIBDEPS=[
'metadata',
diff --git a/src/mongo/db/s/metadata_loader.cpp b/src/mongo/db/s/metadata_loader.cpp
index 8385ea5c3d6..bdb68ae0810 100644
--- a/src/mongo/db/s/metadata_loader.cpp
+++ b/src/mongo/db/s/metadata_loader.cpp
@@ -38,11 +38,13 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/shard_metadata_util.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/unique_message.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_shard_collection.h"
#include "mongo/s/chunk_diff.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/write_ops/batched_command_request.h"
@@ -102,38 +104,40 @@ Status MetadataLoader::makeCollectionMetadata(OperationContext* opCtx,
const string& shard,
const CollectionMetadata* oldMetadata,
CollectionMetadata* metadata) {
- Status initCollectionStatus = _initCollection(opCtx, catalogClient, ns, shard, metadata);
+ invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
+
+ bool isShardPrimary = false;
+ if (repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase_UNSAFE(opCtx, ns)) {
+ isShardPrimary = true;
+ }
+
+ Status initCollectionStatus =
+ _initCollection(opCtx, catalogClient, ns, metadata, isShardPrimary);
if (!initCollectionStatus.isOK()) {
return initCollectionStatus;
}
- return _initChunks(opCtx, catalogClient, ns, shard, oldMetadata, metadata);
+ return _initChunks(opCtx, catalogClient, ns, shard, oldMetadata, metadata, isShardPrimary);
}
+
Status MetadataLoader::_initCollection(OperationContext* opCtx,
ShardingCatalogClient* catalogClient,
const string& ns,
- const string& shard,
- CollectionMetadata* metadata) {
- // Get the config.collections entry for 'ns'.
- auto coll = catalogClient->getCollection(opCtx, ns);
- if (!coll.isOK()) {
- return coll.getStatus();
+ CollectionMetadata* metadata,
+ bool isShardPrimary) {
+ StatusWith<std::pair<BSONObj, OID>> statusWithShardKeyAndEpoch =
+ shardmetadatautil::getCollectionShardKeyAndEpoch(
+ opCtx, catalogClient, NamespaceString(ns), isShardPrimary);
+ if (!statusWithShardKeyAndEpoch.isOK()) {
+ return statusWithShardKeyAndEpoch.getStatus();
}
+ std::pair<BSONObj, OID> shardKeyAndEpoch = statusWithShardKeyAndEpoch.getValue();
- // Check that the collection hasn't been dropped: passing this check does not mean the
- // collection hasn't been dropped and recreated.
- const auto& collInfo = coll.getValue().value;
- if (collInfo.getDropped()) {
- return {ErrorCodes::NamespaceNotFound,
- str::stream() << "Could not load metadata because collection " << ns
- << " was dropped"};
- }
-
- metadata->_keyPattern = collInfo.getKeyPattern().toBSON();
+ metadata->_keyPattern = shardKeyAndEpoch.first;
metadata->fillKeyPatternFields();
- metadata->_shardVersion = ChunkVersion(0, 0, collInfo.getEpoch());
- metadata->_collVersion = ChunkVersion(0, 0, collInfo.getEpoch());
+ metadata->_shardVersion = ChunkVersion(0, 0, shardKeyAndEpoch.second);
+ metadata->_collVersion = ChunkVersion(0, 0, shardKeyAndEpoch.second);
return Status::OK();
}
@@ -143,7 +147,8 @@ Status MetadataLoader::_initChunks(OperationContext* opCtx,
const string& ns,
const string& shard,
const CollectionMetadata* oldMetadata,
- CollectionMetadata* metadata) {
+ CollectionMetadata* metadata,
+ bool isShardPrimary) {
const OID epoch = metadata->getCollVersion().epoch();
SCMConfigDiffTracker::MaxChunkVersionMap versionMap;
@@ -193,16 +198,16 @@ Status MetadataLoader::_initChunks(OperationContext* opCtx,
&chunks,
nullptr,
repl::ReadConcernLevel::kMajorityReadConcern);
-
if (!status.isOK()) {
return status;
}
- // If we are the primary, or a standalone, persist new chunks locally.
- status = _writeNewChunksIfPrimary(
- opCtx, NamespaceString(ns), chunks, metadata->_collVersion.epoch());
- if (!status.isOK()) {
- return status;
+ if (isShardPrimary) {
+ status = shardmetadatautil::writeNewChunks(
+ opCtx, NamespaceString(ns), chunks, metadata->_collVersion.epoch());
+ if (!status.isOK()) {
+ return status;
+ }
}
//
@@ -253,124 +258,4 @@ Status MetadataLoader::_initChunks(OperationContext* opCtx,
}
}
-Status MetadataLoader::_writeNewChunksIfPrimary(OperationContext* opCtx,
- const NamespaceString& nss,
- const std::vector<ChunkType>& chunks,
- const OID& currEpoch) {
- NamespaceString chunkMetadataNss(ChunkType::ConfigNS + "." + nss.ns());
-
- // Only do the write(s) if this is a primary or standalone. Otherwise, return OK.
- if (serverGlobalParams.clusterRole != ClusterRole::ShardServer ||
- !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase_UNSAFE(
- opCtx, chunkMetadataNss.ns())) {
- return Status::OK();
- }
-
- try {
- DBDirectClient client(opCtx);
-
- /**
- * Here are examples of the operations that can happen on the config server to update
- * the config.chunks collection. 'chunks' only includes the chunks that result from the
- * operations, which can be read from the config server, not any that were removed, so
- * we must delete any chunks that overlap with the new 'chunks'.
- *
- * CollectionVersion = 10.3
- *
- * moveChunk
- * {_id: 3, max: 5, version: 10.1} --> {_id: 3, max: 5, version: 11.0}
- *
- * splitChunk
- * {_id: 3, max: 9, version 10.3} --> {_id: 3, max: 5, version 10.4}
- * {_id: 5, max: 8, version 10.5}
- * {_id: 8, max: 9, version 10.6}
- *
- * mergeChunk
- * {_id: 10, max: 14, version 4.3} --> {_id: 10, max: 22, version 10.4}
- * {_id: 14, max: 19, version 7.1}
- * {_id: 19, max: 22, version 2.0}
- *
- */
- for (auto& chunk : chunks) {
- // Check for a different epoch.
- if (!chunk.getVersion().hasEqualEpoch(currEpoch)) {
- // This means the collection was dropped and recreated. Drop the chunk metadata
- // and return.
- rpc::UniqueReply commandResponse =
- client.runCommandWithMetadata(chunkMetadataNss.db().toString(),
- "drop",
- rpc::makeEmptyMetadata(),
- BSON("drop" << chunkMetadataNss.coll()));
- Status status = getStatusFromCommandResult(commandResponse->getCommandReply());
-
- // A NamespaceNotFound error is okay because it's possible that we find a new epoch
- // twice in a row before ever inserting documents.
- if (!status.isOK() && status.code() != ErrorCodes::NamespaceNotFound) {
- return status;
- }
-
- return Status{ErrorCodes::RemoteChangeDetected,
- str::stream() << "Invalid chunks found when reloading '"
- << nss.toString()
- << "'. Previous collection epoch was '"
- << currEpoch.toString()
- << "', but unexpectedly found a new epoch '"
- << chunk.getVersion().epoch().toString()
- << "'. Collection was dropped and recreated."};
- }
-
- // Delete any overlapping chunk ranges. Overlapping chunks will have a min value
- // ("_id") between (chunk.min, chunk.max].
- //
- // query: { "_id" : {"$gte": chunk.min, "$lt": chunk.max}}
- auto deleteDocs(stdx::make_unique<BatchedDeleteDocument>());
- deleteDocs->setQuery(BSON(ChunkType::minShardID << BSON(
- "$gte" << chunk.getMin() << "$lt" << chunk.getMax())));
- deleteDocs->setLimit(0);
-
- auto deleteRequest(stdx::make_unique<BatchedDeleteRequest>());
- deleteRequest->addToDeletes(deleteDocs.release());
-
- BatchedCommandRequest batchedDeleteRequest(deleteRequest.release());
- batchedDeleteRequest.setNS(chunkMetadataNss);
- const BSONObj deleteCmdObj = batchedDeleteRequest.toBSON();
-
- rpc::UniqueReply deleteCommandResponse =
- client.runCommandWithMetadata(chunkMetadataNss.db().toString(),
- deleteCmdObj.firstElementFieldName(),
- rpc::makeEmptyMetadata(),
- deleteCmdObj);
- auto deleteStatus =
- getStatusFromCommandResult(deleteCommandResponse->getCommandReply());
-
- if (!deleteStatus.isOK()) {
- return deleteStatus;
- }
-
- // Now the document can be expected to cleanly insert without overlap.
- auto insert(stdx::make_unique<BatchedInsertRequest>());
- insert->addToDocuments(chunk.toShardBSON());
-
- BatchedCommandRequest insertRequest(insert.release());
- insertRequest.setNS(chunkMetadataNss);
- const BSONObj insertCmdObj = insertRequest.toBSON();
-
- rpc::UniqueReply commandResponse =
- client.runCommandWithMetadata(chunkMetadataNss.db().toString(),
- insertCmdObj.firstElementFieldName(),
- rpc::makeEmptyMetadata(),
- insertCmdObj);
- auto insertStatus = getStatusFromCommandResult(commandResponse->getCommandReply());
-
- if (!insertStatus.isOK()) {
- return insertStatus;
- }
- }
-
- return Status::OK();
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/metadata_loader.h b/src/mongo/db/s/metadata_loader.h
index cfeb6348cfb..6ce6d901223 100644
--- a/src/mongo/db/s/metadata_loader.h
+++ b/src/mongo/db/s/metadata_loader.h
@@ -35,13 +35,12 @@
namespace mongo {
-class ShardingCatalogClient;
-class ChunkType;
class CollectionMetadata;
class CollectionType;
class NamespaceString;
class OID;
class OperationContext;
+class ShardingCatalogClient;
/**
* The MetadataLoader is responsible for interfacing with the config servers and previous
@@ -67,10 +66,11 @@ class OperationContext;
class MetadataLoader {
public:
/**
- * Fills a new metadata instance representing the chunkset of the collection 'ns'
- * (or its entirety, if not sharded) that lives on 'shard' with data from the config server.
- * Optionally, uses an 'oldMetadata' for the same 'ns'/'shard'; the contents of
- * 'oldMetadata' can help reducing the amount of data read from the config servers.
+ * Fills a new metadata instance representing the chunkset of the collection 'ns' (or its
+ * entirety, if not sharded) that lives on 'shard' with data either from the config server if
+ * primary or a shard persisted copy if secondary. Optionally, uses an 'oldMetadata' for the
+ * same 'ns'/'shard'; the contents of 'oldMetadata' can help reduce the amount of data read from
+ * the config servers.
*
* Locking note:
* + Must not be called in a DBLock, since this loads over the network
@@ -95,7 +95,8 @@ public:
private:
/**
* Returns OK and fills in the internal state of 'metadata' with general collection
- * information, not including chunks.
+ * information, not including chunks. Gets the collection information from the config server if
+ * 'isShardPrimary' is true, else from a shard persisted copy.
*
* If information about the collection can be accessed or is invalid, returns:
* @return NamespaceNotFound if the collection no longer exists
@@ -107,13 +108,14 @@ private:
static Status _initCollection(OperationContext* opCtx,
ShardingCatalogClient* catalogClient,
const std::string& ns,
- const std::string& shard,
- CollectionMetadata* metadata);
+ CollectionMetadata* metadata,
+ bool isShardPrimary);
/**
- * Returns OK and fills in the chunk state of 'metadata' to portray the chunks of the
- * collection 'ns' that sit in 'shard'. If provided, uses the contents of 'oldMetadata'
- * as a base (see description in initCollection above).
+ * Returns OK and fills in the chunk state of 'metadata' to portray the chunks of the collection
+ * 'ns' that sit in 'shard'. If provided, uses the contents of 'oldMetadata' as a base (see
+ * description in initCollection above). If 'isShardPrimary' is true, 'chunks' is persisted on
+ * the shard so that secondaries receive the new chunks through replication.
*
* If information about the chunks can be accessed or is invalid, returns:
* @return HostUnreachable if there was an error contacting the config servers
@@ -128,30 +130,8 @@ private:
const std::string& ns,
const std::string& shard,
const CollectionMetadata* oldMetadata,
- CollectionMetadata* metadata);
-
-
- /**
- * Takes a vector of 'chunks' and updates the config.chunks.ns collection specified by 'nss'.
- * Any chunk documents in config.chunks.ns that overlap with a chunk in 'chunks' is removed
- * as the new chunk document is inserted. If the epoch of any chunk in 'chunks' does not match
- * 'currEpoch', the chunk metadata is dropped.
- *
- * @nss - the regular collection namespace for which chunk metadata is being updated.
- * @chunks - a range of chunks retrieved from the config server, sorted in ascending chunk
- * version order.
- * @currEpoch - what this shard server knows to be the collection epoch.
- *
- * Returns:
- * - OK if not primary and no writes are needed.
- * - RemoteChangeDetected if the chunk version epoch of any chunk in 'chunks' is different than
- * 'currEpoch'
- * - Other errors in writes/reads to the config.chunks.ns collection fails.
- */
- static Status _writeNewChunksIfPrimary(OperationContext* opCtx,
- const NamespaceString& nss,
- const std::vector<ChunkType>& chunks,
- const OID& currEpoch);
+ CollectionMetadata* metadata,
+ bool isShardPrimary);
};
} // namespace mongo
diff --git a/src/mongo/db/s/metadata_loader_test.cpp b/src/mongo/db/s/metadata_loader_test.cpp
index b9d10773563..cbf69c5e185 100644
--- a/src/mongo/db/s/metadata_loader_test.cpp
+++ b/src/mongo/db/s/metadata_loader_test.cpp
@@ -42,6 +42,7 @@
#include "mongo/s/catalog/sharding_catalog_client_impl.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_shard_collection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/shard_server_test_fixture.h"
#include "mongo/stdx/memory.h"
@@ -96,6 +97,51 @@ protected:
}
}
+ void checkCollectionsEntryExists(const NamespaceString& nss,
+ const CollectionMetadata& metadata,
+ bool hasLastConsistentCollectionVersion) {
+ try {
+ DBDirectClient client(operationContext());
+ Query query BSON(ShardCollectionType::uuid() << nss.ns());
+ query.readPref(ReadPreference::Nearest, BSONArray());
+ std::unique_ptr<DBClientCursor> cursor =
+ client.query(CollectionType::ConfigNS.c_str(), query, 1);
+ ASSERT(cursor);
+ ASSERT(cursor->more());
+ BSONObj queryResult = cursor->nextSafe();
+
+ ShardCollectionType shardCollectionEntry =
+ assertGet(ShardCollectionType::fromBSON(queryResult));
+
+ BSONObjBuilder builder;
+ builder.append(ShardCollectionType::uuid(), nss.ns());
+ builder.append(ShardCollectionType::ns(), nss.ns());
+ builder.append(ShardCollectionType::keyPattern(), metadata.getKeyPattern());
+ if (hasLastConsistentCollectionVersion) {
+ metadata.getCollVersion().appendWithFieldForCommands(
+ &builder, ShardCollectionType::lastConsistentCollectionVersion());
+ }
+
+ ASSERT_BSONOBJ_EQ(shardCollectionEntry.toBSON(), builder.obj());
+ } catch (const DBException& ex) {
+ ASSERT(false);
+ }
+ }
+
+ void checkCollectionsEntryDoesNotExist(const NamespaceString& nss) {
+ try {
+ DBDirectClient client(operationContext());
+ Query query BSON(ShardCollectionType::uuid() << nss.ns());
+ query.readPref(ReadPreference::Nearest, BSONArray());
+ std::unique_ptr<DBClientCursor> cursor =
+ client.query(ShardCollectionType::ConfigNS.c_str(), query, 1);
+ ASSERT(cursor);
+ ASSERT(!cursor->more());
+ } catch (const DBException& ex) {
+ ASSERT(false);
+ }
+ }
+
void expectFindOnConfigSendCollectionDefault() {
CollectionType collType;
collType.setNs(kNss);
@@ -167,30 +213,43 @@ TEST_F(MetadataLoaderTest, DroppedColl) {
collType.setEpoch(OID());
collType.setDropped(true);
ASSERT_OK(collType.validate());
+
+ // The config.collections entry indicates that the collection was dropped, failing the refresh.
auto future = launchAsync([this] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready("Test");
+ auto opCtx = cc().makeOperationContext();
+
CollectionMetadata metadata;
- auto status = MetadataLoader::makeCollectionMetadata(operationContext(),
+ auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(),
catalogClient(),
kNss.ns(),
kShardId.toString(),
NULL, /* no old metadata */
&metadata);
ASSERT_EQUALS(status.code(), ErrorCodes::NamespaceNotFound);
+ checkCollectionsEntryDoesNotExist(kNss);
});
expectFindOnConfigSendBSONObjVector(std::vector<BSONObj>{collType.toBSON()});
future.timed_get(kFutureTimeout);
}
TEST_F(MetadataLoaderTest, EmptyColl) {
+ // Fail due to no config.collections entry found.
auto future = launchAsync([this] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready("Test");
+ auto opCtx = cc().makeOperationContext();
+
CollectionMetadata metadata;
- auto status = MetadataLoader::makeCollectionMetadata(operationContext(),
+ auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(),
catalogClient(),
kNss.ns(),
kShardId.toString(),
NULL, /* no old metadata */
&metadata);
ASSERT_EQUALS(status.code(), ErrorCodes::NamespaceNotFound);
+ checkCollectionsEntryDoesNotExist(kNss);
});
expectFindOnConfigSendErrorCode(ErrorCodes::NamespaceNotFound);
future.timed_get(kFutureTimeout);
@@ -198,15 +257,22 @@ TEST_F(MetadataLoaderTest, EmptyColl) {
TEST_F(MetadataLoaderTest, BadColl) {
BSONObj badCollToSend = BSON(CollectionType::fullNs(kNss.ns()));
+
+ // Providing an invalid config.collections document should fail the refresh.
auto future = launchAsync([this] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready("Test");
+ auto opCtx = cc().makeOperationContext();
+
CollectionMetadata metadata;
- auto status = MetadataLoader::makeCollectionMetadata(operationContext(),
+ auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(),
catalogClient(),
kNss.ns(),
kShardId.toString(),
NULL, /* no old metadata */
&metadata);
ASSERT_EQUALS(status.code(), ErrorCodes::NoSuchKey);
+ checkCollectionsEntryDoesNotExist(kNss);
});
expectFindOnConfigSendBSONObjVector(std::vector<BSONObj>{badCollToSend});
future.timed_get(kFutureTimeout);
@@ -218,15 +284,21 @@ TEST_F(MetadataLoaderTest, BadChunk) {
chunkInfo.setVersion(ChunkVersion(1, 0, getMaxCollVersion().epoch()));
ASSERT(!chunkInfo.validate().isOK());
+ // Providing an invalid config.chunks document should fail the refresh.
auto future = launchAsync([this] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready("Test");
+ auto opCtx = cc().makeOperationContext();
+
CollectionMetadata metadata;
- auto status = MetadataLoader::makeCollectionMetadata(operationContext(),
+ auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(),
catalogClient(),
kNss.ns(),
kShardId.toString(),
NULL, /* no old metadata */
&metadata);
ASSERT_EQUALS(status.code(), ErrorCodes::NoSuchKey);
+ checkCollectionsEntryExists(kNss, metadata, false);
});
expectFindOnConfigSendCollectionDefault();
@@ -235,6 +307,8 @@ TEST_F(MetadataLoaderTest, BadChunk) {
}
TEST_F(MetadataLoaderTest, NoChunksIsDropped) {
+ // Finding no chunks in config.chunks indicates that the collection was dropped, even if an
+ // entry was previously found in config.collestions indicating that it wasn't dropped.
auto future = launchAsync([this] {
ON_BLOCK_EXIT([&] { Client::destroy(); });
Client::initThreadIfNotAlready("Test");
@@ -269,6 +343,7 @@ TEST_F(MetadataLoaderTest, CheckNumChunk) {
chunkType.setVersion(ChunkVersion(1, 0, getMaxCollVersion().epoch()));
ASSERT(chunkType.validate().isOK());
+ // Check that finding no new chunks for the shard works smoothly.
auto future = launchAsync([this] {
ON_BLOCK_EXIT([&] { Client::destroy(); });
Client::initThreadIfNotAlready("Test");
@@ -287,6 +362,7 @@ TEST_F(MetadataLoaderTest, CheckNumChunk) {
ASSERT_EQUALS(0, metadata.getShardVersion().majorVersion());
checkCollectionMetadataChunksMatchPersistedChunks(kChunkMetadataNss, metadata, 1);
+ checkCollectionsEntryExists(kNss, metadata, true);
});
expectFindOnConfigSendCollectionDefault();
@@ -296,6 +372,7 @@ TEST_F(MetadataLoaderTest, CheckNumChunk) {
}
TEST_F(MetadataLoaderTest, SingleChunkCheckNumChunk) {
+ // Check that loading a single chunk for the shard works successfully.
auto future = launchAsync([this] {
ON_BLOCK_EXIT([&] { Client::destroy(); });
Client::initThreadIfNotAlready("Test");
@@ -314,6 +391,7 @@ TEST_F(MetadataLoaderTest, SingleChunkCheckNumChunk) {
ASSERT_EQUALS(getMaxCollVersion(), metadata.getShardVersion());
checkCollectionMetadataChunksMatchPersistedChunks(kChunkMetadataNss, metadata, 1);
+ checkCollectionsEntryExists(kNss, metadata, true);
});
expectFindOnConfigSendCollectionDefault();
@@ -323,6 +401,7 @@ TEST_F(MetadataLoaderTest, SingleChunkCheckNumChunk) {
}
TEST_F(MetadataLoaderTest, SeveralChunksCheckNumChunks) {
+ // Check that loading several chunks for the shard works successfully.
auto future = launchAsync([this] {
ON_BLOCK_EXIT([&] { Client::destroy(); });
Client::initThreadIfNotAlready("Test");
@@ -341,6 +420,7 @@ TEST_F(MetadataLoaderTest, SeveralChunksCheckNumChunks) {
ASSERT_EQUALS(getMaxCollVersion(), metadata.getShardVersion());
checkCollectionMetadataChunksMatchPersistedChunks(kChunkMetadataNss, metadata, 4);
+ checkCollectionsEntryExists(kNss, metadata, true);
});
expectFindOnConfigSendCollectionDefault();
@@ -350,6 +430,7 @@ TEST_F(MetadataLoaderTest, SeveralChunksCheckNumChunks) {
}
TEST_F(MetadataLoaderTest, CollectionMetadataSetUp) {
+ // Check that the CollectionMetadata is set up correctly.
auto future = launchAsync([this] {
ON_BLOCK_EXIT([&] { Client::destroy(); });
Client::initThreadIfNotAlready("Test");
@@ -367,6 +448,7 @@ TEST_F(MetadataLoaderTest, CollectionMetadataSetUp) {
ASSERT_TRUE(getMaxShardVersion().equals(metadata.getShardVersion()));
checkCollectionMetadataChunksMatchPersistedChunks(kChunkMetadataNss, metadata, 1);
+ checkCollectionsEntryExists(kNss, metadata, true);
});
expectFindOnConfigSendCollectionDefault();
diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp
new file mode 100644
index 00000000000..eebd0a59109
--- /dev/null
+++ b/src/mongo/db/s/shard_metadata_util.cpp
@@ -0,0 +1,356 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/shard_metadata_util.h"
+
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/unique_message.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_shard_collection.h"
+#include "mongo/s/chunk_version.h"
+#include "mongo/s/write_ops/batched_command_request.h"
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+namespace shardmetadatautil {
+
+namespace {
+
+const WriteConcernOptions kLocalWriteConcern(1,
+ WriteConcernOptions::SyncMode::UNSET,
+ Milliseconds(0));
+
+} // namespace
+
+StatusWith<std::pair<BSONObj, OID>> getCollectionShardKeyAndEpoch(
+ OperationContext* opCtx,
+ ShardingCatalogClient* catalogClient,
+ const NamespaceString& nss,
+ bool isShardPrimary) {
+ if (isShardPrimary) {
+ // Get the config.collections entry for 'nss'.
+ auto statusWithColl = catalogClient->getCollection(opCtx, nss.ns());
+ if (!statusWithColl.isOK()) {
+ if (statusWithColl.getStatus() == ErrorCodes::NamespaceNotFound) {
+ auto status = dropChunksAndDeleteCollectionsEntry(
+ opCtx, NamespaceString(ChunkType::ConfigNS + "." + nss.ns()), nss);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
+ return statusWithColl.getStatus();
+ }
+ auto collInfo = statusWithColl.getValue().value;
+
+ // Update the shard's config.collections entry so that secondaries receive any changes.
+ Status updateStatus = updateCollectionEntry(opCtx,
+ nss,
+ BSON(ShardCollectionType::uuid(nss.ns())),
+ ShardCollectionType(collInfo).toBSON());
+ if (!updateStatus.isOK()) {
+ return updateStatus;
+ }
+
+ return std::pair<BSONObj, OID>(collInfo.getKeyPattern().toBSON(), collInfo.getEpoch());
+ } else { // shard secondary
+ auto statusWithCollectionEntry = readShardCollectionEntry(opCtx, nss);
+ if (!statusWithCollectionEntry.isOK()) {
+ return statusWithCollectionEntry.getStatus();
+ }
+ ShardCollectionType shardCollTypeEntry = statusWithCollectionEntry.getValue();
+ if (!shardCollTypeEntry.isLastConsistentCollectionVersionSet()) {
+ // The collection has been dropped since the refresh began.
+ return {ErrorCodes::NamespaceNotFound,
+ str::stream() << "Could not load metadata because collection " << nss.ns()
+ << " was dropped"};
+ }
+
+ return std::pair<BSONObj, OID>(
+ shardCollTypeEntry.getKeyPattern().toBSON(),
+ shardCollTypeEntry.getLastConsistentCollectionVersionEpoch());
+ }
+}
+
+
+StatusWith<ShardCollectionType> readShardCollectionEntry(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ Query fullQuery(BSON(ShardCollectionType::uuid() << nss.ns()));
+ fullQuery.readPref(ReadPreference::SecondaryOnly, BSONArray());
+ try {
+ DBDirectClient client(opCtx);
+ std::unique_ptr<DBClientCursor> cursor =
+ client.query(ShardCollectionType::ConfigNS.c_str(), fullQuery, 1);
+ if (!cursor) {
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "Failed to establish a cursor for reading "
+ << ShardCollectionType::ConfigNS
+ << " from local storage");
+ }
+
+ if (!cursor->more()) {
+ // The collection has been dropped.
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "collection " << nss.ns() << " not found");
+ }
+
+ BSONObj document = cursor->nextSafe();
+ auto statusWithCollectionEntry = ShardCollectionType::fromBSON(document);
+ if (!statusWithCollectionEntry.isOK()) {
+ return statusWithCollectionEntry.getStatus();
+ }
+
+ // There should only be one entry per namespace!
+ invariant(!cursor->more());
+
+ return statusWithCollectionEntry.getValue();
+ } catch (const DBException& ex) {
+ return {ex.toStatus().code(),
+ str::stream() << "Failed to read the '" << nss.ns()
+ << "' entry locally from config.collections"
+ << causedBy(ex.toStatus())};
+ }
+}
+
+Status updateCollectionEntry(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& update) {
+ const BSONElement idField = query.getField("_id");
+ invariant(!idField.eoo());
+
+ // Want to modify the document, not replace it.
+ BSONObjBuilder updateBuilder;
+ updateBuilder.append("$set", update);
+
+ std::unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument());
+ updateDoc->setQuery(query);
+ updateDoc->setUpdateExpr(updateBuilder.obj());
+ updateDoc->setUpsert(true);
+
+ std::unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest());
+ updateRequest->addToUpdates(updateDoc.release());
+
+ BatchedCommandRequest request(updateRequest.release());
+ request.setNS(NamespaceString(CollectionType::ConfigNS));
+ request.setWriteConcern(kLocalWriteConcern.toBSON());
+ BSONObj cmdObj = request.toBSON();
+
+ try {
+ DBDirectClient client(opCtx);
+
+ rpc::UniqueReply commandResponse = client.runCommandWithMetadata(
+ "config", cmdObj.firstElementFieldName(), rpc::makeEmptyMetadata(), cmdObj);
+ BSONObj responseReply = commandResponse->getCommandReply().getOwned();
+
+ Status commandStatus = getStatusFromCommandResult(commandResponse->getCommandReply());
+ if (!commandStatus.isOK()) {
+ return commandStatus;
+ }
+
+ return Status::OK();
+ } catch (const DBException& ex) {
+ return {ex.toStatus().code(),
+ str::stream() << "Failed to locally update the '" << nss.ns()
+ << "' entry in config.collections"
+ << causedBy(ex.toStatus())};
+ }
+}
+
+Status writeNewChunks(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<ChunkType>& chunks,
+ const OID& currEpoch) {
+ NamespaceString chunkMetadataNss(ChunkType::ConfigNS + "." + nss.ns());
+
+ try {
+ DBDirectClient client(opCtx);
+
+ /**
+ * Here are examples of the operations that can happen on the config server to update
+ * the config.chunks collection. 'chunks' only includes the chunks that result from the
+ * operations, which can be read from the config server, not any that were removed, so
+ * we must delete any chunks that overlap with the new 'chunks'.
+ *
+ * CollectionVersion = 10.3
+ *
+ * moveChunk
+ * {_id: 3, max: 5, version: 10.1} --> {_id: 3, max: 5, version: 11.0}
+ *
+ * splitChunk
+ * {_id: 3, max: 9, version 10.3} --> {_id: 3, max: 5, version 10.4}
+ * {_id: 5, max: 8, version 10.5}
+ * {_id: 8, max: 9, version 10.6}
+ *
+ * mergeChunk
+ * {_id: 10, max: 14, version 4.3} --> {_id: 10, max: 22, version 10.4}
+ * {_id: 14, max: 19, version 7.1}
+ * {_id: 19, max: 22, version 2.0}
+ *
+ */
+ for (auto& chunk : chunks) {
+ // Check for a different epoch.
+ if (!chunk.getVersion().hasEqualEpoch(currEpoch)) {
+ // This means the collection was dropped and recreated. Drop the chunk metadata
+ // and return.
+ Status status = dropChunksAndDeleteCollectionsEntry(opCtx, chunkMetadataNss, nss);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return Status{ErrorCodes::RemoteChangeDetected,
+ str::stream() << "Invalid chunks found when reloading '"
+ << nss.toString()
+ << "'. Previous collection epoch was '"
+ << currEpoch.toString()
+ << "', but unexpectedly found a new epoch '"
+ << chunk.getVersion().epoch().toString()
+ << "'. Collection was dropped and recreated."};
+ }
+
+ // Delete any overlapping chunk ranges. Overlapping chunks will have a min value
+ // ("_id") between (chunk.min, chunk.max].
+ //
+ // query: { "_id" : {"$gte": chunk.min, "$lt": chunk.max}}
+ auto deleteDocs(stdx::make_unique<BatchedDeleteDocument>());
+ deleteDocs->setQuery(BSON(ChunkType::minShardID << BSON(
+ "$gte" << chunk.getMin() << "$lt" << chunk.getMax())));
+ deleteDocs->setLimit(0);
+
+ auto deleteRequest(stdx::make_unique<BatchedDeleteRequest>());
+ deleteRequest->addToDeletes(deleteDocs.release());
+
+ BatchedCommandRequest batchedDeleteRequest(deleteRequest.release());
+ batchedDeleteRequest.setNS(chunkMetadataNss);
+ const BSONObj deleteCmdObj = batchedDeleteRequest.toBSON();
+
+ rpc::UniqueReply deleteCommandResponse =
+ client.runCommandWithMetadata(chunkMetadataNss.db().toString(),
+ deleteCmdObj.firstElementFieldName(),
+ rpc::makeEmptyMetadata(),
+ deleteCmdObj);
+ auto deleteStatus =
+ getStatusFromCommandResult(deleteCommandResponse->getCommandReply());
+
+ if (!deleteStatus.isOK()) {
+ return deleteStatus;
+ }
+
+ // Now the document can be expected to cleanly insert without overlap.
+ auto insert(stdx::make_unique<BatchedInsertRequest>());
+ insert->addToDocuments(chunk.toShardBSON());
+
+ BatchedCommandRequest insertRequest(insert.release());
+ insertRequest.setNS(chunkMetadataNss);
+ const BSONObj insertCmdObj = insertRequest.toBSON();
+
+ rpc::UniqueReply commandResponse =
+ client.runCommandWithMetadata(chunkMetadataNss.db().toString(),
+ insertCmdObj.firstElementFieldName(),
+ rpc::makeEmptyMetadata(),
+ insertCmdObj);
+ auto insertStatus = getStatusFromCommandResult(commandResponse->getCommandReply());
+
+ if (!insertStatus.isOK()) {
+ return insertStatus;
+ }
+ }
+
+ // Must update the config.collections 'lastConsistentCollectionVersion' field so that
+ // secondaries can load the latest config.chunks.ns writes.
+ if (!chunks.empty()) {
+ BSONObjBuilder builder;
+ chunks.back().getVersion().appendWithFieldForCommands(
+ &builder, ShardCollectionType::lastConsistentCollectionVersion());
+ BSONObj update = builder.obj();
+
+ auto collUpdateStatus = updateCollectionEntry(
+ opCtx, nss, BSON(ShardCollectionType::uuid(nss.ns())), update);
+ if (!collUpdateStatus.isOK()) {
+ return collUpdateStatus;
+ }
+ }
+
+ return Status::OK();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+}
+
+Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx,
+ const NamespaceString& chunkMetadataNss,
+ const NamespaceString& collectionsEntryNss) {
+ try {
+ DBDirectClient client(opCtx);
+
+ // Drop the config.chunks.ns collection specified by 'chunkMetadataNss'.
+ rpc::UniqueReply commandResponse =
+ client.runCommandWithMetadata(chunkMetadataNss.db().toString(),
+ "drop",
+ rpc::makeEmptyMetadata(),
+ BSON("drop" << chunkMetadataNss.coll()));
+ Status status = getStatusFromCommandResult(commandResponse->getCommandReply());
+
+ if (!status.isOK() && status.code() != ErrorCodes::NamespaceNotFound) {
+ return status;
+ }
+
+ // Delete the config.collections entry matching 'collectionsEntryNss'.
+ auto deleteDocs(stdx::make_unique<BatchedDeleteDocument>());
+ deleteDocs->setQuery(BSON(ShardCollectionType::uuid << collectionsEntryNss.ns()));
+ deleteDocs->setLimit(0);
+
+ auto deleteRequest(stdx::make_unique<BatchedDeleteRequest>());
+ deleteRequest->addToDeletes(deleteDocs.release());
+
+ BatchedCommandRequest batchedDeleteRequest(deleteRequest.release());
+ batchedDeleteRequest.setNS(NamespaceString(ShardCollectionType::ConfigNS));
+ const BSONObj deleteCmdObj = batchedDeleteRequest.toBSON();
+
+ rpc::UniqueReply deleteCommandResponse = client.runCommandWithMetadata(
+ "config", deleteCmdObj.firstElementFieldName(), rpc::makeEmptyMetadata(), deleteCmdObj);
+ auto deleteStatus = getStatusFromCommandResult(deleteCommandResponse->getCommandReply());
+
+ if (!deleteStatus.isOK()) {
+ return deleteStatus;
+ }
+
+ return Status::OK();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+}
+
+} // namespace shardmetadatautil
+} // namespace mongo
diff --git a/src/mongo/db/s/shard_metadata_util.h b/src/mongo/db/s/shard_metadata_util.h
new file mode 100644
index 00000000000..2acbd138a1c
--- /dev/null
+++ b/src/mongo/db/s/shard_metadata_util.h
@@ -0,0 +1,114 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "mongo/base/status.h"
+
+namespace mongo {
+
+class BSONObj;
+class ChunkType;
+class CollectionMetadata;
+class ShardCollectionType;
+class NamespaceString;
+class OID;
+class OperationContext;
+class ShardingCatalogClient;
+template <typename T>
+class StatusWith;
+
+/**
+ * Function helpers to locally, using a DBDirectClient, read and write sharding metadata on a shard.
+ * Also retrieves metadata from the config server, copies of which are then persisted on the shard.
+ */
+namespace shardmetadatautil {
+
+/**
+ * Gets the config.collections for 'nss' entry either remotely from the config server if
+ * 'isShardPrimary' is true or locally from the shard if false. Additionally updates the shard's
+ * config.collections entry with the remotely retrieved metadata if 'isShardPrimary' is true.
+ */
+StatusWith<std::pair<BSONObj, OID>> getCollectionShardKeyAndEpoch(
+ OperationContext* opCtx,
+ ShardingCatalogClient* catalogClient,
+ const NamespaceString& nss,
+ bool isShardPrimary);
+
+/**
+ * Reads the shard server's config.collections entry identified by 'nss'.
+ */
+StatusWith<ShardCollectionType> readShardCollectionEntry(OperationContext* opCtx,
+ const NamespaceString& nss);
+
+/**
+ * Updates the config.collections entry matching 'query' with 'update' using local write concern.
+ * Only the fields specified in 'update' are modified.
+ * Sets upsert to true on the update operation in case the entry does not exist locally yet.
+ */
+Status updateCollectionEntry(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& update);
+
+/**
+ * Two threads running this function in parallel for the same collection can corrupt the collection
+ * data!
+ *
+ * Takes a vector of 'chunks' and updates the config.chunks.ns collection specified by 'nss'.
+ * Any chunk documents in config.chunks.ns that overlap with a chunk in 'chunks' is removed
+ * as the new chunk document is inserted. If the epoch of any chunk in 'chunks' does not match
+ * 'currEpoch', the chunk metadata is dropped and a RemoteChangeDetected error returned.
+ *
+ * @nss - the regular collection namespace for which chunk metadata is being updated.
+ * @chunks - chunks retrieved from the config server, sorted in ascending chunk version order
+ * @currEpoch - what this shard server expects to be the collection epoch.
+ *
+ * Returns:
+ * - RemoteChangeDetected if the chunk version epoch of any chunk in 'chunks' is different than
+ * 'currEpoch'
+ * - Other errors if unable to do local writes/reads to the config.chunks.ns colection.
+ */
+Status writeNewChunks(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<ChunkType>& chunks,
+ const OID& currEpoch);
+
+/**
+ * Locally on this shard, drops the config.chunks.ns corresponding to 'chunkMetadataNss' and then
+ * deletes the config.collections entry for 'collectionEntryNss'.
+ */
+Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx,
+ const NamespaceString& chunkMetadataNss,
+ const NamespaceString& collectionsEntryNss);
+
+} // namespace shardmetadatautil
+} // namespace mongo
diff --git a/src/mongo/db/s/shard_metadata_util_test.cpp b/src/mongo/db/s/shard_metadata_util_test.cpp
new file mode 100644
index 00000000000..79765f9720b
--- /dev/null
+++ b/src/mongo/db/s/shard_metadata_util_test.cpp
@@ -0,0 +1,314 @@
+/**
+ * Copyright (C) 2017 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/base/status.h"
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/s/shard_metadata_util.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_shard_collection.h"
+#include "mongo/s/shard_server_test_fixture.h"
+#include "mongo/s/write_ops/batched_command_request.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using unittest::assertGet;
+
+const HostAndPort kConfigHostAndPort = HostAndPort("dummy", 123);
+const NamespaceString kNss = NamespaceString("test.foo");
+const NamespaceString kChunkMetadataNss = NamespaceString("config.chunks.test.foo");
+const ShardId kShardId = ShardId("shard0");
+
+class ShardMetadataUtilTest : public ShardServerTestFixture {
+protected:
+ /**
+ * Call this after setUpChunks()! Both functions use the private _maxCollVersion variable. To
+ * set the lastConsistentCollectionVersion correctly this must be called second.
+ *
+ * Inserts a config.collections entry.
+ */
+ void setUpCollection() {
+ BSONObjBuilder builder;
+ builder.append(ShardCollectionType::uuid(), kNss.ns());
+ builder.append(ShardCollectionType::ns(), kNss.ns());
+ builder.append(ShardCollectionType::keyPattern(), _keyPattern.toBSON());
+ _maxCollVersion.appendWithFieldForCommands(
+ &builder, ShardCollectionType::lastConsistentCollectionVersion());
+ ShardCollectionType shardCollType = assertGet(ShardCollectionType::fromBSON(builder.obj()));
+
+ try {
+ DBDirectClient client(operationContext());
+ auto insert(stdx::make_unique<BatchedInsertRequest>());
+ insert->addToDocuments(shardCollType.toBSON());
+
+ BatchedCommandRequest insertRequest(insert.release());
+ insertRequest.setNS(NamespaceString(ShardCollectionType::ConfigNS));
+ const BSONObj insertCmdObj = insertRequest.toBSON();
+
+ rpc::UniqueReply commandResponse =
+ client.runCommandWithMetadata("config",
+ insertCmdObj.firstElementFieldName(),
+ rpc::makeEmptyMetadata(),
+ insertCmdObj);
+ ASSERT_OK(getStatusFromCommandResult(commandResponse->getCommandReply()));
+ } catch (const DBException& ex) {
+ ASSERT(false);
+ }
+ }
+
+ /**
+ * Helper to make a number of chunks that can then be manipulated in various ways in the tests.
+ * Chunks have the shard server's config.chunks.ns schema.
+ */
+ std::vector<ChunkType> makeFourChunks() {
+ std::vector<ChunkType> chunks;
+ BSONObj mins[] = {BSON("a" << MINKEY), BSON("a" << 10), BSON("a" << 50), BSON("a" << 100)};
+ BSONObj maxs[] = {BSON("a" << 10), BSON("a" << 50), BSON("a" << 100), BSON("a" << MAXKEY)};
+
+ for (int i = 0; i < 4; ++i) {
+ _maxCollVersion.incMajor();
+ BSONObj shardChunk = BSON(ChunkType::minShardID(mins[i])
+ << ChunkType::max(maxs[i])
+ << ChunkType::shard(kShardId.toString())
+ << ChunkType::DEPRECATED_lastmod(Date_t::fromMillisSinceEpoch(
+ _maxCollVersion.toLong())));
+
+ chunks.push_back(
+ assertGet(ChunkType::fromShardBSON(shardChunk, _maxCollVersion.epoch())));
+ }
+
+ return chunks;
+ }
+
+ /**
+ * Inserts 'chunks' into the config.chunks.ns collection 'chunkMetadataNss'.
+ */
+ void setUpChunks(const NamespaceString& chunkMetadataNss, const std::vector<ChunkType> chunks) {
+ try {
+ DBDirectClient client(operationContext());
+ auto insert(stdx::make_unique<BatchedInsertRequest>());
+
+ for (auto& chunk : chunks) {
+ insert->addToDocuments(chunk.toShardBSON());
+ }
+
+ BatchedCommandRequest insertRequest(insert.release());
+ insertRequest.setNS(NamespaceString(chunkMetadataNss));
+ const BSONObj insertCmdObj = insertRequest.toBSON();
+
+ rpc::UniqueReply commandResponse =
+ client.runCommandWithMetadata(chunkMetadataNss.db().toString(),
+ insertCmdObj.firstElementFieldName(),
+ rpc::makeEmptyMetadata(),
+ insertCmdObj);
+ ASSERT_OK(getStatusFromCommandResult(commandResponse->getCommandReply()));
+ } catch (const DBException& ex) {
+ ASSERT(false);
+ }
+ }
+
+ /**
+ * Sets up persisted chunk metadata. Inserts four chunks for kNss into kChunkMetadataNss and a
+ * corresponding collection entry in config.collections.
+ */
+ void setUpShardingMetadata() {
+ setUpChunks(kChunkMetadataNss, makeFourChunks());
+ setUpCollection();
+ }
+
+ /**
+ * Checks that 'nss' has no documents.
+ */
+ void checkCollectionIsEmpty(const NamespaceString& nss) {
+ try {
+ DBDirectClient client(operationContext());
+ ASSERT_EQUALS(client.count(nss.ns()), 0ULL);
+ } catch (const DBException& ex) {
+ ASSERT(false);
+ }
+ }
+
+ /**
+ * Checks that each chunk in 'chunks' has been written to 'chunkMetadataNss'.
+ */
+ void checkChunks(const NamespaceString& chunkMetadataNss,
+ const std::vector<ChunkType>& chunks) {
+ try {
+ DBDirectClient client(operationContext());
+ for (auto& chunk : chunks) {
+ Query query(BSON(ChunkType::minShardID() << chunk.getMin() << ChunkType::max()
+ << chunk.getMax()));
+ query.readPref(ReadPreference::Nearest, BSONArray());
+
+ std::unique_ptr<DBClientCursor> cursor =
+ client.query(chunkMetadataNss.ns(), query, 1);
+ ASSERT(cursor);
+
+ ASSERT(cursor->more());
+ BSONObj queryResult = cursor->nextSafe();
+ ChunkType foundChunk =
+ assertGet(ChunkType::fromShardBSON(queryResult, chunk.getVersion().epoch()));
+ ASSERT_BSONOBJ_EQ(chunk.getMin(), foundChunk.getMin());
+ ASSERT_BSONOBJ_EQ(chunk.getMax(), foundChunk.getMax());
+ ASSERT_EQUALS(chunk.getShard(), foundChunk.getShard());
+ ASSERT_EQUALS(chunk.getVersion(), foundChunk.getVersion());
+ }
+ } catch (const DBException& ex) {
+ ASSERT(false);
+ }
+ }
+
+ const ChunkVersion& getCollectionVersion() const {
+ return _maxCollVersion;
+ }
+
+ const KeyPattern& getKeyPattern() const {
+ return _keyPattern;
+ }
+
+private:
+ ChunkVersion _maxCollVersion{1, 0, OID::gen()};
+ const KeyPattern _keyPattern{BSON("a" << 1)};
+};
+
+TEST_F(ShardMetadataUtilTest, UpdateAndReadCollectionsDocument) {
+ // Insert document
+ BSONObj shardCollectionTypeObj =
+ BSON(ShardCollectionType::uuid(kNss.ns())
+ << ShardCollectionType::ns(kNss.ns())
+ << ShardCollectionType::keyPattern(getKeyPattern().toBSON()));
+ ASSERT_OK(shardmetadatautil::updateCollectionEntry(operationContext(),
+ kNss,
+ BSON(ShardCollectionType::uuid(kNss.ns())),
+ shardCollectionTypeObj));
+
+ ShardCollectionType readShardCollectionType =
+ assertGet(shardmetadatautil::readShardCollectionEntry(operationContext(), kNss));
+ ASSERT_BSONOBJ_EQ(shardCollectionTypeObj, readShardCollectionType.toBSON());
+
+ // Update document
+ BSONObjBuilder updateBuilder;
+ getCollectionVersion().appendWithFieldForCommands(
+ &updateBuilder, ShardCollectionType::lastConsistentCollectionVersion());
+ ASSERT_OK(shardmetadatautil::updateCollectionEntry(
+ operationContext(), kNss, BSON(ShardCollectionType::uuid(kNss.ns())), updateBuilder.obj()));
+
+ readShardCollectionType =
+ assertGet(shardmetadatautil::readShardCollectionEntry(operationContext(), kNss));
+
+ ShardCollectionType updatedShardCollectionType =
+ assertGet(ShardCollectionType::fromBSON(shardCollectionTypeObj));
+ updatedShardCollectionType.setLastConsistentCollectionVersion(getCollectionVersion());
+
+ ASSERT_BSONOBJ_EQ(updatedShardCollectionType.toBSON(), readShardCollectionType.toBSON());
+}
+
+TEST_F(ShardMetadataUtilTest, WriteNewChunks) {
+ std::vector<ChunkType> chunks = makeFourChunks();
+ shardmetadatautil::writeNewChunks(
+ operationContext(), kNss, chunks, getCollectionVersion().epoch());
+ checkChunks(kChunkMetadataNss, chunks);
+}
+
+TEST_F(ShardMetadataUtilTest, UpdateWithWriteNewChunks) {
+ // Load some chunk metadata.
+
+ std::vector<ChunkType> chunks = makeFourChunks();
+ ASSERT_OK(shardmetadatautil::writeNewChunks(
+ operationContext(), kNss, chunks, getCollectionVersion().epoch()));
+ checkChunks(kChunkMetadataNss, chunks);
+
+ // Load some changes and make sure it's applied correctly.
+ // Split the last chunk in two and move the new last chunk away.
+
+ std::vector<ChunkType> newChunks;
+ ChunkType lastChunk = chunks.back();
+ chunks.pop_back();
+ ChunkVersion collVersion = getCollectionVersion();
+
+ collVersion.incMinor(); // chunk only split
+ BSONObjBuilder splitChunkOneBuilder;
+ splitChunkOneBuilder.append(ChunkType::minShardID(), lastChunk.getMin());
+ {
+ BSONObjBuilder subMax(splitChunkOneBuilder.subobjStart(ChunkType::max()));
+ subMax.append("a", 10000);
+ }
+ splitChunkOneBuilder.append(ChunkType::shard(), lastChunk.getShard().toString());
+ collVersion.appendForChunk(&splitChunkOneBuilder);
+ ChunkType splitChunkOne =
+ assertGet(ChunkType::fromShardBSON(splitChunkOneBuilder.obj(), collVersion.epoch()));
+ newChunks.push_back(splitChunkOne);
+
+ collVersion.incMajor(); // chunk split and moved
+ BSONObjBuilder splitChunkTwoMovedBuilder;
+ {
+ BSONObjBuilder subMin(splitChunkTwoMovedBuilder.subobjStart(ChunkType::minShardID()));
+ subMin.append("a", 10000);
+ }
+ splitChunkTwoMovedBuilder.append(ChunkType::max(), lastChunk.getMax());
+ splitChunkTwoMovedBuilder.append(ChunkType::shard(), "altShard");
+ collVersion.appendForChunk(&splitChunkTwoMovedBuilder);
+ ChunkType splitChunkTwoMoved =
+ assertGet(ChunkType::fromShardBSON(splitChunkTwoMovedBuilder.obj(), collVersion.epoch()));
+ newChunks.push_back(splitChunkTwoMoved);
+
+ collVersion.incMinor(); // bump control chunk version
+ ChunkType frontChunkControl = chunks.front();
+ chunks.erase(chunks.begin());
+ frontChunkControl.setVersion(collVersion);
+ newChunks.push_back(frontChunkControl);
+
+ ASSERT_OK(shardmetadatautil::writeNewChunks(
+ operationContext(), kNss, newChunks, collVersion.epoch()));
+
+ chunks.push_back(splitChunkOne);
+ chunks.push_back(splitChunkTwoMoved);
+ chunks.push_back(frontChunkControl);
+ checkChunks(kChunkMetadataNss, chunks);
+}
+
+TEST_F(ShardMetadataUtilTest, DropChunksAndDeleteCollectionsEntry) {
+ setUpShardingMetadata();
+ ASSERT_OK(shardmetadatautil::dropChunksAndDeleteCollectionsEntry(
+ operationContext(), kChunkMetadataNss, kNss));
+ checkCollectionIsEmpty(NamespaceString(ShardCollectionType::ConfigNS));
+ checkCollectionIsEmpty(kChunkMetadataNss);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index f802a27739d..f6ef30c691d 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -73,6 +73,7 @@ env.Library(
'catalog/type_locks.cpp',
'catalog/type_mongos.cpp',
'catalog/type_shard.cpp',
+ 'catalog/type_shard_collection.cpp',
'catalog/type_tags.cpp',
'request_types/add_shard_request_type.cpp',
'request_types/add_shard_to_zone_request_type.cpp',
@@ -192,13 +193,13 @@ env.CppUnitTest(
'catalog/type_locks_test.cpp',
'catalog/type_mongos_test.cpp',
'catalog/type_shard_test.cpp',
+ 'catalog/type_shard_collection_test.cpp',
'catalog/type_tags_test.cpp',
'chunk_version_test.cpp',
'migration_secondary_throttle_options_test.cpp',
'move_chunk_request_test.cpp',
'set_shard_version_request_test.cpp',
'shard_key_pattern_test.cpp',
-# 'shard_id_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/query/query_test_service_context',
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
index f541dde581f..8c36fccae77 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
@@ -854,7 +854,13 @@ StatusWith<repl::OpTimeWith<CollectionType>> ShardingCatalogClientImpl::getColle
return parseStatus.getStatus();
}
- return repl::OpTimeWith<CollectionType>(parseStatus.getValue(), retOpTimePair.opTime);
+ auto collType = parseStatus.getValue();
+ if (collType.getDropped()) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ stream() << "collection " << collNs << " was dropped");
+ }
+
+ return repl::OpTimeWith<CollectionType>(collType, retOpTimePair.opTime);
}
Status ShardingCatalogClientImpl::getCollections(OperationContext* opCtx,
diff --git a/src/mongo/s/catalog/type_collection.h b/src/mongo/s/catalog/type_collection.h
index 06c51be3b55..b46937266e1 100644
--- a/src/mongo/s/catalog/type_collection.h
+++ b/src/mongo/s/catalog/type_collection.h
@@ -43,9 +43,26 @@ class StatusWith;
/**
- * This class represents the layout and contents of documents contained in the
+ * This class represents the layout and contents of documents contained in the config server's
* config.collections collection. All manipulation of documents coming from that collection
* should be done with this class.
+ *
+ * Expected config server config.collections collection format:
+ * {
+ * "_id" : "foo.bar",
+ * "lastmodEpoch" : ObjectId("58b6fd76132358839e409e47"),
+ * "lastmod" : ISODate("1970-02-19T17:02:47.296Z"),
+ * "dropped" : false,
+ * "key" : {
+ * "_id" : 1
+ * },
+ * "defaultCollation" : {
+ * "locale" : "fr_CA"
+ * },
+ * "unique" : false,
+ * "noBalance" : false
+ * }
+ *
*/
class CollectionType {
public:
diff --git a/src/mongo/s/catalog/type_shard_collection.cpp b/src/mongo/s/catalog/type_shard_collection.cpp
new file mode 100644
index 00000000000..e1c2a067be6
--- /dev/null
+++ b/src/mongo/s/catalog/type_shard_collection.cpp
@@ -0,0 +1,160 @@
+/**
+ * Copyright (C) 2017 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/catalog/type_shard_collection.h"
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+const std::string ShardCollectionType::ConfigNS = "config.collections";
+
+const BSONField<std::string> ShardCollectionType::uuid("_id");
+const BSONField<std::string> ShardCollectionType::ns("ns");
+const BSONField<BSONObj> ShardCollectionType::keyPattern("key");
+const BSONField<OID> ShardCollectionType::lastConsistentCollectionVersionEpoch(
+ "lastConsistentCollectionVersionEpoch");
+const BSONField<Date_t> ShardCollectionType::lastConsistentCollectionVersion(
+ "lastConsistentCollectionVersion");
+
+ShardCollectionType::ShardCollectionType(const CollectionType& collectionType)
+ : ShardCollectionType(
+ collectionType.getNs(), collectionType.getNs(), collectionType.getKeyPattern()) {}
+
+ShardCollectionType::ShardCollectionType(const NamespaceString& uuid,
+ const NamespaceString& ns,
+ const KeyPattern& keyPattern)
+ : _uuid(uuid), _ns(ns), _keyPattern(keyPattern.toBSON()) {}
+
+StatusWith<ShardCollectionType> ShardCollectionType::fromBSON(const BSONObj& source) {
+ NamespaceString uuidNss;
+ {
+ std::string uuidString;
+ Status status = bsonExtractStringField(source, uuid.name(), &uuidString);
+ if (!status.isOK()) {
+ return status;
+ }
+ uuidNss = NamespaceString{uuidString};
+ }
+
+ NamespaceString nsNss;
+ {
+ std::string nsString;
+ Status status = bsonExtractStringField(source, ns.name(), &nsString);
+ if (!status.isOK()) {
+ return status;
+ }
+ nsNss = NamespaceString{nsString};
+ }
+
+ BSONElement collKeyPattern;
+ Status status = bsonExtractTypedField(source, keyPattern.name(), Object, &collKeyPattern);
+ if (!status.isOK()) {
+ return status;
+ }
+ BSONObj obj = collKeyPattern.Obj();
+ if (obj.isEmpty()) {
+ return Status(ErrorCodes::ShardKeyNotFound,
+ str::stream() << "Empty shard key. Failed to parse: " << source.toString());
+ }
+ KeyPattern pattern(obj.getOwned());
+
+ ShardCollectionType shardCollectionType(uuidNss, nsNss, pattern);
+
+ {
+ auto statusWithChunkVersion = ChunkVersion::parseFromBSONWithFieldForCommands(
+ source, lastConsistentCollectionVersion.name());
+ if (statusWithChunkVersion.isOK()) {
+ ChunkVersion collVersion = std::move(statusWithChunkVersion.getValue());
+ shardCollectionType.setLastConsistentCollectionVersion(std::move(collVersion));
+ } else if (statusWithChunkVersion == ErrorCodes::NoSuchKey) {
+ // May not be set yet, which is okay.
+ } else {
+ return statusWithChunkVersion.getStatus();
+ }
+ }
+
+ return shardCollectionType;
+}
+
+bool ShardCollectionType::isLastConsistentCollectionVersionSet() const {
+ return _lastConsistentCollectionVersion.is_initialized();
+}
+
+BSONObj ShardCollectionType::toBSON() const {
+ BSONObjBuilder builder;
+
+ builder.append(uuid.name(), _uuid.toString());
+ builder.append(ns.name(), _ns.toString());
+ builder.append(keyPattern.name(), _keyPattern.toBSON());
+
+ if (_lastConsistentCollectionVersion) {
+ _lastConsistentCollectionVersion->appendWithFieldForCommands(
+ &builder, lastConsistentCollectionVersion.name());
+ }
+
+ return builder.obj();
+}
+
+std::string ShardCollectionType::toString() const {
+ return toBSON().toString();
+}
+
+void ShardCollectionType::setUUID(const NamespaceString& uuid) {
+ invariant(uuid.isValid());
+ _uuid = uuid;
+}
+
+void ShardCollectionType::setNs(const NamespaceString& ns) {
+ invariant(ns.isValid());
+ _ns = ns;
+}
+
+void ShardCollectionType::setKeyPattern(const KeyPattern& keyPattern) {
+ invariant(!keyPattern.toBSON().isEmpty());
+ _keyPattern = keyPattern;
+}
+
+void ShardCollectionType::setLastConsistentCollectionVersion(
+ const ChunkVersion& lastConsistentCollectionVersion) {
+ _lastConsistentCollectionVersion = lastConsistentCollectionVersion;
+}
+
+const OID ShardCollectionType::getLastConsistentCollectionVersionEpoch() const {
+ invariant(_lastConsistentCollectionVersion);
+ return _lastConsistentCollectionVersion->epoch();
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/catalog/type_shard_collection.h b/src/mongo/s/catalog/type_shard_collection.h
new file mode 100644
index 00000000000..ca9fa00d5b5
--- /dev/null
+++ b/src/mongo/s/catalog/type_shard_collection.h
@@ -0,0 +1,147 @@
+/**
+ * Copyright (C) 2017 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <string>
+
+#include "mongo/db/jsobj.h"
+#include "mongo/db/keypattern.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/s/chunk_version.h"
+
+namespace mongo {
+
+class CollectionType;
+class Status;
+template <typename T>
+class StatusWith;
+
+/**
+ * This class represents the layout and contents of documents contained in the shard server's
+ * config.collections collections. All manipulation of documents coming from that collection should
+ * be done with this class.
+ *
+ * Expected shard server config.collections collection format:
+ * {
+ * "_id" : "foo.bar", // will eventually become a UUID field, when it becomes available
+ * "ns" : "foo.bar",
+ * "key" : {
+ * "_id" : 1
+ * },
+ * "lastConsistentCollectionVersionEpoch" : ObjectId("58b6fd76132358839e409e47"),
+ * "lastConsistentCollectionVersion" : ISODate("1970-02-19T17:02:47.296Z")
+ * }
+ *
+ * The 'lastConsistentCollectionVersion' is written by shard primaries and used by shard
+ * secondaries. A secondary uses the value to refresh chunk metadata up to the chunk with that
+ * chunk version. Chunk metadata updates on the shard involve multiple chunks collection document
+ * writes, during which time the data can be inconsistent and should not be loaded.
+ */
+class ShardCollectionType {
+public:
+ // Name of the collections collection in the config server.
+ static const std::string ConfigNS;
+
+ static const BSONField<std::string> uuid;
+ static const BSONField<std::string> ns;
+ static const BSONField<BSONObj> keyPattern;
+ static const BSONField<OID> lastConsistentCollectionVersionEpoch;
+ static const BSONField<Date_t> lastConsistentCollectionVersion;
+
+ /**
+ * The MetadataLoader fetches CollectionType format documents from the config server and then
+ * writes them in ShardCollectionType format to the shard. This constructor facilitates the
+ * conversion from config to shard config.collections schema.
+ */
+ explicit ShardCollectionType(const CollectionType& collType);
+
+ /**
+ * Constructs a new ShardCollectionType object from BSON retrieved from a shard server. Also
+ * does
+ * validation of the contents.
+ */
+ static StatusWith<ShardCollectionType> fromBSON(const BSONObj& source);
+
+ /**
+ * Returns the BSON representation of the entry for the shard collection schema.
+ *
+ * This function only appends the fields and values relevant to shards that are SET on the
+ * ShardCollectionType object. No field is guaranteed to be appended.
+ */
+ BSONObj toBSON() const;
+
+ /**
+ * Returns a std::string representation of the current internal state.
+ */
+ std::string toString() const;
+
+ const NamespaceString& getUUID() const {
+ return _uuid;
+ }
+ void setUUID(const NamespaceString& uuid);
+
+ const NamespaceString& getNs() const {
+ return _ns;
+ }
+ void setNs(const NamespaceString& ns);
+
+ const KeyPattern& getKeyPattern() const {
+ return _keyPattern;
+ }
+ void setKeyPattern(const KeyPattern& keyPattern);
+
+ const ChunkVersion& getLastConsistentCollectionVersion() const {
+ return _lastConsistentCollectionVersion.get();
+ }
+ void setLastConsistentCollectionVersion(const ChunkVersion& lastConsistentCollectionVersion);
+
+ bool isLastConsistentCollectionVersionSet() const;
+
+ const OID getLastConsistentCollectionVersionEpoch() const;
+
+private:
+ ShardCollectionType(const NamespaceString& uuid,
+ const NamespaceString& ns,
+ const KeyPattern& keyPattern);
+
+ NamespaceString _uuid;
+
+ // The full namespace (with the database prefix).
+ NamespaceString _ns;
+
+ // Sharding key. If collection is dropped, this is no longer required.
+ KeyPattern _keyPattern;
+
+ // used by shard secondaries to safely refresh chunk metadata up to this version: higher
+ // versions may put the chunk metadata into an inconsistent state.
+ boost::optional<ChunkVersion> _lastConsistentCollectionVersion;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/catalog/type_shard_collection_test.cpp b/src/mongo/s/catalog/type_shard_collection_test.cpp
new file mode 100644
index 00000000000..d77d7fa4996
--- /dev/null
+++ b/src/mongo/s/catalog/type_shard_collection_test.cpp
@@ -0,0 +1,133 @@
+/**
+ * Copyright (C) 2017 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/catalog/type_shard_collection.h"
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/oid.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+namespace {
+
+using unittest::assertGet;
+
+const NamespaceString kNss = NamespaceString("db.coll");
+const BSONObj kKeyPattern = BSON("a" << 1);
+
+TEST(ShardCollectionType, ToFromShardBSONWithLastConsistentCollectionVersion) {
+ const ChunkVersion lastConsistent(1, 0, OID::gen());
+
+ BSONObjBuilder builder;
+ builder.append(ShardCollectionType::uuid.name(), kNss.ns());
+ builder.append(ShardCollectionType::ns.name(), kNss.ns());
+ builder.append(ShardCollectionType::keyPattern.name(), kKeyPattern);
+ lastConsistent.appendWithFieldForCommands(
+ &builder, ShardCollectionType::lastConsistentCollectionVersion.name());
+ BSONObj obj = builder.obj();
+
+ ShardCollectionType shardCollectionType = assertGet(ShardCollectionType::fromBSON(obj));
+
+ ASSERT_EQUALS(shardCollectionType.getUUID(), kNss);
+ ASSERT_EQUALS(shardCollectionType.getNs(), kNss);
+ ASSERT_BSONOBJ_EQ(shardCollectionType.getKeyPattern().toBSON(), kKeyPattern);
+ ASSERT_EQUALS(shardCollectionType.getLastConsistentCollectionVersion(), lastConsistent);
+
+ ASSERT_BSONOBJ_EQ(obj, shardCollectionType.toBSON());
+}
+
+TEST(ShardCollectionType, ToFromShardBSONWithoutLastConsistentCollectionVersion) {
+ BSONObjBuilder builder;
+ builder.append(ShardCollectionType::uuid.name(), kNss.ns());
+ builder.append(ShardCollectionType::ns.name(), kNss.ns());
+ builder.append(ShardCollectionType::keyPattern.name(), kKeyPattern);
+ BSONObj obj = builder.obj();
+
+ ShardCollectionType shardCollectionType = assertGet(ShardCollectionType::fromBSON(obj));
+
+ ASSERT_EQUALS(shardCollectionType.getUUID(), kNss);
+ ASSERT_EQUALS(shardCollectionType.getNs(), kNss);
+ ASSERT_BSONOBJ_EQ(shardCollectionType.getKeyPattern().toBSON(), kKeyPattern);
+ ASSERT_FALSE(shardCollectionType.isLastConsistentCollectionVersionSet());
+
+ ASSERT_BSONOBJ_EQ(obj, shardCollectionType.toBSON());
+}
+
+TEST(ShardCollectionType, FromEmptyBSON) {
+ StatusWith<ShardCollectionType> status = ShardCollectionType::fromBSON(BSONObj());
+ ASSERT_FALSE(status.isOK());
+}
+
+TEST(ShardCollectionType, FromBSONNoUUIDFails) {
+ BSONObj obj =
+ BSON(ShardCollectionType::ns(kNss.ns()) << ShardCollectionType::keyPattern(kKeyPattern));
+
+ StatusWith<ShardCollectionType> status = ShardCollectionType::fromBSON(obj);
+ ASSERT_EQUALS(status.getStatus().code(), ErrorCodes::NoSuchKey);
+}
+
+TEST(ShardCollectionType, FromBSONNoNSFails) {
+ BSONObj obj =
+ BSON(ShardCollectionType::uuid(kNss.ns()) << ShardCollectionType::keyPattern(kKeyPattern));
+
+ StatusWith<ShardCollectionType> status = ShardCollectionType::fromBSON(obj);
+ ASSERT_EQUALS(status.getStatus().code(), ErrorCodes::NoSuchKey);
+}
+
+TEST(ShardCollectionType, FromBSONNoShardKeyFails) {
+ BSONObj obj = BSON(ShardCollectionType::uuid(kNss.ns()) << ShardCollectionType::ns(kNss.ns()));
+
+ StatusWith<ShardCollectionType> status = ShardCollectionType::fromBSON(obj);
+ ASSERT_EQUALS(status.getStatus().code(), ErrorCodes::NoSuchKey);
+}
+
+TEST(ShardCollectionType, ConstructFromCollectionType) {
+ const OID oid = OID::gen();
+
+ BSONObj obj = BSON(CollectionType::fullNs(kNss.ns())
+ << CollectionType::epoch(oid)
+ << CollectionType::updatedAt(Date_t::fromMillisSinceEpoch(1))
+ << CollectionType::keyPattern(kKeyPattern));
+ CollectionType collectionType = assertGet(CollectionType::fromBSON(obj));
+ ASSERT_TRUE(collectionType.validate().isOK());
+
+ BSONObjBuilder shardCollectionTypeBuilder;
+ shardCollectionTypeBuilder.append(ShardCollectionType::uuid.name(), kNss.ns());
+ shardCollectionTypeBuilder.append(ShardCollectionType::ns.name(), kNss.ns());
+ shardCollectionTypeBuilder.append(ShardCollectionType::keyPattern.name(), kKeyPattern);
+
+ ASSERT_BSONOBJ_EQ(ShardCollectionType(collectionType).toBSON(),
+ shardCollectionTypeBuilder.obj());
+}
+
+} // namespace
+} // namespace mongo