diff options
-rw-r--r-- | jstests/sharding/resharding_abort_command.js | 6 | ||||
-rw-r--r-- | jstests/sharding/resharding_metrics.js | 21 | ||||
-rw-r--r-- | jstests/sharding/resharding_metrics_increment.js | 141 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp | 316 |
4 files changed, 202 insertions, 282 deletions
diff --git a/jstests/sharding/resharding_abort_command.js b/jstests/sharding/resharding_abort_command.js index 1b2d531295b..7ddbecccf03 100644 --- a/jstests/sharding/resharding_abort_command.js +++ b/jstests/sharding/resharding_abort_command.js @@ -173,12 +173,6 @@ const runAbortWithFailpoint = (failpointName, failpointNodeType, abortLocation, const topology = DiscoverTopology.findConnectedNodes(mongos); const configsvr = new Mongo(topology.configsvr.nodes[0]); - const status = configsvr.getDB('admin').serverStatus({}); - // Resharding has not been attempted yet, so resharding metrics will not be reported. This means - // shardingStatistics will be empty, and thus not reported. So we assert that the serverStatus - // does not have shardingStatistics yet. - assert(!status.hasOwnProperty('shardingStatistics'), status); - let expectedAbortErrorCodes = ErrorCodes.OK; let expectedReshardingErrorCode = ErrorCodes.ReshardCollectionAborted; diff --git a/jstests/sharding/resharding_metrics.js b/jstests/sharding/resharding_metrics.js index 0a920fb91e2..f7fdbefabf2 100644 --- a/jstests/sharding/resharding_metrics.js +++ b/jstests/sharding/resharding_metrics.js @@ -48,6 +48,21 @@ const inputCollection = reshardingTest.createShardedCollection({ const recipientShardNames = reshardingTest.recipientShardNames; const topology = DiscoverTopology.findConnectedNodes(inputCollection.getMongo()); +let allNodes = []; +for (let [_, shardReplSet] of Object.entries(topology.shards)) { + allNodes.push(shardReplSet.primary); +} +allNodes.push(topology.configsvr.primary); +allNodes.forEach((hostName) => { + const status = new Mongo(hostName).getDB('admin').serverStatus({}); + if (hostName == topology.configsvr.primary) { + assert(!status.hasOwnProperty('shardingStatistics')); + return; + } + const shardingStats = status.shardingStatistics; + assert(!shardingStats.hasOwnProperty('resharding')); +}); + reshardingTest.withReshardingInBackground( { newShardKeyPattern: {newKey: 1}, @@ -91,12 +106,6 @@ reshardingTest.withReshardingInBackground( tojson(curOpSection)); }); -let allNodes = []; -for (let [_, shardReplSet] of Object.entries(topology.shards)) { - allNodes.push(shardReplSet.primary); -} -allNodes.push(topology.configsvr.primary); - allNodes.forEach((hostName) => { const serverStatus = getServerStatusSection(new Mongo(hostName)); diff --git a/jstests/sharding/resharding_metrics_increment.js b/jstests/sharding/resharding_metrics_increment.js deleted file mode 100644 index a8b188f64e9..00000000000 --- a/jstests/sharding/resharding_metrics_increment.js +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Tests that resharding metrics section in server status - * responds to statistical increments in an expected way. - * - * @tags: [ - * uses_atclustertime - * ] - */ - -(function() { -'use strict'; - -load("jstests/libs/discover_topology.js"); -load("jstests/sharding/libs/resharding_test_fixture.js"); - -const kNamespace = "reshardingDb.coll"; - -// All of the keys in `expected` must be present in `dict`. -// Furthermore, if the the `expected` key is mapped to a value other than `undefined`, -// then the corresponding value in `dict` must be equal to that value. -function verifyDict(dict, expected) { - for (var key in expected) { - assert(dict.hasOwnProperty(key), `Missing ${key} in ${tojson(dict)}`); - const expectedValue = expected[key]; - if (expected[key] === undefined) { - jsTest.log(`${key}: ${tojson(dict[key])}`); - continue; - } else if (key === "oplogEntriesFetched" || key === "oplogEntriesApplied") { - // The fetcher writes no-op entries for each getMore that returns an empty batch. We - // won't know how many getMores it called however, so we can only check that the metrics - // are gte the number of writes we're aware of. - assert.gte(dict[key], expected[key]); - } else { - assert.eq(dict[key], - expected[key], - `Expected the value for ${key} to be ${expectedValue}: ${tojson(dict)}`); - } - } -} - -const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 2}); -reshardingTest.setup(); - -const donorShardNames = reshardingTest.donorShardNames; -const recipientShardNames = reshardingTest.recipientShardNames; - -const inputCollection = reshardingTest.createShardedCollection({ - ns: kNamespace, - shardKeyPattern: {oldKey: 1}, - chunks: [ - {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]}, - {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]}, - ], -}); - -assert.commandWorked(inputCollection.insert([ - {_id: "stays on shard0", oldKey: -10, newKey: -10}, - {_id: "moves to shard0", oldKey: 10, newKey: -10}, - {_id: "moves to shard1", oldKey: -10, newKey: 10}, - {_id: "stays on shard1", oldKey: 10, newKey: 10}, -])); - -function awaitEstablishmentOfCloneTimestamp(inputCollection) { - const mongos = inputCollection.getMongo(); - assert.soon(() => { - const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ - ns: inputCollection.getFullName() - }); - return coordinatorDoc !== null && coordinatorDoc.cloneTimestamp !== undefined; - }); -} - -// Start the resharding operation, and wait for it to establish a fetch -// timestamp, which indicates the beginning of the apply phase. Then perform a -// few late inserts to verify that those show up as both "fetched" and -// "applied" in the metrics. -reshardingTest.withReshardingInBackground( // - { - newShardKeyPattern: {newKey: 1}, - newChunks: [ - {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]}, - {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]}, - ], - }, - (tempNs) => { - jsTest.log(`tempNs: ${tempNs}`); - awaitEstablishmentOfCloneTimestamp(inputCollection); - for (var i = 0; i < 10; ++i) - assert.commandWorked( - inputCollection.insert({_id: `late ${i}`, oldKey: 10, newKey: 10})); - }); - -const mongos = inputCollection.getMongo(); -const topology = DiscoverTopology.findConnectedNodes(mongos); - -// There's one terminating "no-op" oplog entry from each donor marking the -// boundary between the cloning phase and the applying phase. So there's a -// baseline of 2 fetches/applies on each recipient (one "no-op" for each donor). -// Additionally, recipientShard[1] gets the 10 late inserts above, so expect 12 -// oplogEntry applies for those late inserts. -[{ - shardName: recipientShardNames[0], - documents: 2, // These are the no-op final oplog entries, one from each source shard. - fetched: 2, - applied: 2, - opcounters: {insert: 0, update: 0, delete: 0} -}, - { - shardName: recipientShardNames[1], - documents: 2, - fetched: 12, - applied: 12, - opcounters: {insert: 10, update: 0, delete: 0} - }, -].forEach(e => { - const mongo = new Mongo(topology.shards[e.shardName].primary); - const doc = mongo.getDB('admin').serverStatus({}); - - var sub = doc; - ['shardingStatistics', 'resharding'].forEach(k => { - assert(sub.hasOwnProperty(k), sub); - sub = sub[k]; - }); - - jsTest.log(`Resharding stats for ${mongo}: ${tojson(sub)}`); - - verifyDict(sub.active, { - "documentsCopied": e.documents, - "oplogEntriesFetched": e.fetched, - "oplogEntriesApplied": e.applied, - "insertsApplied": e.opcounters.insert, - "updatesApplied": e.opcounters.update, - "deletesApplied": e.opcounters.delete, - }); - - // bytesCopied is harder to pin down but it should be >0. - assert.betweenIn(1, sub.active['bytesCopied'], 1024, 'bytesCopied'); -}); - -reshardingTest.teardown(); -})(); diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp index 93557d8b6eb..45679367673 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp @@ -33,9 +33,11 @@ #include "mongo/bson/bsonmisc.h" #include "mongo/bson/json.h" +#include "mongo/db/catalog/create_collection.h" #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/hasher.h" #include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/resharding/resharding_collection_cloner.h" #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_util.h" @@ -76,13 +78,11 @@ private: class ReshardingCollectionClonerTest : public ShardServerTestFixtureWithCatalogCacheMock { protected: - std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( - ShardKeyPattern newShardKeyPattern, - ShardId recipientShard, - std::deque<DocumentSource::GetNextResult> sourceCollectionData, - std::deque<DocumentSource::GetNextResult> configCacheChunksData) { - auto tempNss = resharding::constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); - + void initializePipelineTest( + const ShardKeyPattern& newShardKeyPattern, + const ShardId& recipientShard, + const std::deque<DocumentSource::GetNextResult>& sourceCollectionData, + const std::deque<DocumentSource::GetNextResult>& configCacheChunksData) { _metrics = ReshardingMetrics::makeInstance(_sourceUUID, newShardKeyPattern.toBSON(), _sourceNss, @@ -90,22 +90,23 @@ protected: getServiceContext()->getFastClockSource()->now(), getServiceContext()); - ReshardingCollectionCloner cloner(_metrics.get(), - std::move(newShardKeyPattern), - _sourceNss, - _sourceUUID, - std::move(recipientShard), - Timestamp(1, 0), /* dummy value */ - std::move(tempNss)); + _cloner = std::make_unique<ReshardingCollectionCloner>( + _metrics.get(), + ShardKeyPattern(newShardKeyPattern.toBSON()), + _sourceNss, + _sourceUUID, + recipientShard, + Timestamp(1, 0), /* dummy value */ + tempNss); - auto pipeline = cloner.makePipeline( - operationContext(), - std::make_shared<MockMongoInterface>(std::move(configCacheChunksData))); + getCatalogCacheMock()->setChunkManagerReturnValue( + createChunkManager(newShardKeyPattern, configCacheChunksData)); - pipeline->addInitialSource(DocumentSourceMock::createForTest( - std::move(sourceCollectionData), pipeline->getContext())); + _pipeline = _cloner->makePipeline( + operationContext(), std::make_shared<MockMongoInterface>(configCacheChunksData)); - return pipeline; + _pipeline->addInitialSource( + DocumentSourceMock::createForTest(sourceCollectionData, _pipeline->getContext())); } template <class T> @@ -116,6 +117,11 @@ protected: void setUp() override { ShardServerTestFixtureWithCatalogCacheMock::setUp(); + + OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection( + operationContext()); + uassertStatusOK(createCollection( + operationContext(), tempNss.db().toString(), BSON("create" << tempNss.coll()))); } void tearDown() override { @@ -157,87 +163,135 @@ protected: boost::none); } -private: + void runPipelineTest( + ShardKeyPattern shardKey, + const ShardId& recipientShard, + std::deque<DocumentSource::GetNextResult> collectionData, + std::deque<DocumentSource::GetNextResult> configData, + int64_t expectedDocumentsCount, + std::function<void(std::unique_ptr<SeekableRecordCursor>)> verifyFunction) { + initializePipelineTest(shardKey, recipientShard, collectionData, configData); + auto opCtx = operationContext(); + AutoGetCollection tempColl{opCtx, tempNss, MODE_IS}; + while (_cloner->doOneBatch(operationContext(), *_pipeline)) { + ASSERT_EQ(tempColl->numRecords(opCtx), _metrics->getDocumentsCopiedCount()); + ASSERT_EQ(tempColl->dataSize(opCtx), _metrics->getBytesCopiedCount()); + } + ASSERT_EQ(tempColl->numRecords(operationContext()), expectedDocumentsCount); + ASSERT_EQ(_metrics->getDocumentsCopiedCount(), expectedDocumentsCount); + ASSERT_GT(tempColl->dataSize(opCtx), 0); + ASSERT_EQ(tempColl->dataSize(opCtx), _metrics->getBytesCopiedCount()); + verifyFunction(tempColl->getCursor(opCtx)); + } + +protected: const NamespaceString _sourceNss = NamespaceString("test"_sd, "collection_being_resharded"_sd); + const NamespaceString tempNss = + resharding::constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); const UUID _sourceUUID = UUID::gen(); const ReshardingSourceId _sourceId{UUID::gen(), _myShardName}; const DatabaseVersion _sourceDbVersion{UUID::gen(), Timestamp(1, 1)}; std::unique_ptr<ReshardingMetrics> _metrics; + std::unique_ptr<ReshardingCollectionCloner> _cloner; + std::unique_ptr<Pipeline, PipelineDeleter> _pipeline; }; TEST_F(ReshardingCollectionClonerTest, MinKeyChunk) { ShardKeyPattern sk{fromjson("{x: 1}")}; + std::deque<DocumentSource::GetNextResult> collectionData{ + Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), + Doc(fromjson("{_id: 2, x: -0.001}")), + Doc(fromjson("{_id: 3, x: NumberLong(0)}")), + Doc(fromjson("{_id: 4, x: 0.0}")), + Doc(fromjson("{_id: 5, x: 0.001}")), + Doc(fromjson("{_id: 6, x: {$maxKey: 1}}"))}; std::deque<DocumentSource::GetNextResult> configData{ Doc(fromjson("{_id: {x: {$minKey: 1}}, max: {x: 0.0}, shard: 'myShardName'}")), Doc(fromjson("{_id: {x: 0.0}, max: {x: {$maxKey: 1}}, shard: 'shard2' }"))}; - getCatalogCacheMock()->setChunkManagerReturnValue(createChunkManager(sk, configData)); - auto pipeline = makePipeline(std::move(sk), - _myShardName, - {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), - Doc(fromjson("{_id: 2, x: -0.001}")), - Doc(fromjson("{_id: 3, x: NumberLong(0)}")), - Doc(fromjson("{_id: 4, x: 0.0}")), - Doc(fromjson("{_id: 5, x: 0.001}")), - Doc(fromjson("{_id: 6, x: {$maxKey: 1}}"))}, - std::move(configData)); - - auto next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 1 << "x" << MINKEY << "$sortKey" << BSON_ARRAY(1)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001 << "$sortKey" << BSON_ARRAY(2)), - next->toBson()); - - ASSERT_FALSE(pipeline->getNext()); + constexpr auto kExpectedCopiedCount = 2; + const auto verify = [](auto cursor) { + auto next = cursor->next(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 1 << "x" << MINKEY << "$sortKey" << BSON_ARRAY(1)), + next->data.toBson()); + + next = cursor->next(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001 << "$sortKey" << BSON_ARRAY(2)), + next->data.toBson()); + + ASSERT_FALSE(cursor->next()); + }; + + runPipelineTest(std::move(sk), + _myShardName, + std::move(collectionData), + std::move(configData), + kExpectedCopiedCount, + verify); } TEST_F(ReshardingCollectionClonerTest, MaxKeyChunk) { ShardKeyPattern sk{fromjson("{x: 1}")}; + std::deque<DocumentSource::GetNextResult> collectionData{ + Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), + Doc(fromjson("{_id: 2, x: -0.001}")), + Doc(fromjson("{_id: 3, x: NumberLong(0)}")), + Doc(fromjson("{_id: 4, x: 0.0}")), + Doc(fromjson("{_id: 5, x: 0.001}")), + Doc(fromjson("{_id: 6, x: {$maxKey: 1}}"))}; std::deque<DocumentSource::GetNextResult> configData{ Doc(fromjson("{_id: {x: {$minKey: 1}}, max: {x: 0.0}, shard: 'myShardName'}")), Doc(fromjson("{_id: {x: 0.0}, max: {x: {$maxKey: 1}}, shard: 'shard2' }")), }; - getCatalogCacheMock()->setChunkManagerReturnValue(createChunkManager(sk, configData)); - auto pipeline = makePipeline(std::move(sk), - ShardId("shard2"), - {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), - Doc(fromjson("{_id: 2, x: -0.001}")), - Doc(fromjson("{_id: 3, x: NumberLong(0)}")), - Doc(fromjson("{_id: 4, x: 0.0}")), - Doc(fromjson("{_id: 5, x: 0.001}")), - Doc(fromjson("{_id: 6, x: {$maxKey: 1}}}"))}, - std::move(configData)); - - auto next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << 0LL << "$sortKey" << BSON_ARRAY(3)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0 << "$sortKey" << BSON_ARRAY(4)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001 << "$sortKey" << BSON_ARRAY(5)), - next->toBson()); - - // TODO SERVER-67529: Enable after ChunkManager can handle documents with $maxKey. - // next = pipeline->getNext(); - // ASSERT(next); - // ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << MAXKEY << "$sortKey" << BSON_ARRAY(6)), - // next->toBson()); - - ASSERT_FALSE(pipeline->getNext()); + // TODO SERVER-67529: Change expected documents to 4. + constexpr auto kExpectedCopiedCount = 3; + const auto verify = [](auto cursor) { + auto next = cursor->next(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << 0LL << "$sortKey" << BSON_ARRAY(3)), + next->data.toBson()); + + next = cursor->next(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0 << "$sortKey" << BSON_ARRAY(4)), + next->data.toBson()); + + next = cursor->next(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001 << "$sortKey" << BSON_ARRAY(5)), + next->data.toBson()); + + // TODO SERVER-67529: Enable after ChunkManager can handle documents with $maxKey. + // next = cursor->next(); + // ASSERT(next); + // ASSERT_BSONOBJ_BINARY_EQ( + // BSON("_id" << 6 << "x" << MAXKEY << "$sortKey" << BSON_ARRAY(6)), + // next->data.toBson()); + + ASSERT_FALSE(cursor->next()); + }; + + runPipelineTest(std::move(sk), + ShardId("shard2"), + std::move(collectionData), + std::move(configData), + kExpectedCopiedCount, + verify); } TEST_F(ReshardingCollectionClonerTest, HashedShardKey) { ShardKeyPattern sk{fromjson("{x: 'hashed'}")}; + std::deque<DocumentSource::GetNextResult> collectionData{ + Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), + Doc(fromjson("{_id: 2, x: -1}")), + Doc(fromjson("{_id: 3, x: -0.123}")), + Doc(fromjson("{_id: 4, x: 0}")), + Doc(fromjson("{_id: 5, x: NumberLong(0)}")), + Doc(fromjson("{_id: 6, x: 0.123}")), + Doc(fromjson("{_id: 7, x: 1}")), + Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))}; // Documents in a mock config.cache.chunks collection. Mocked collection boundaries: // - [MinKey, hash(0)) : shard1 // - [hash(0), hash(0) + 1) : shard2 @@ -252,44 +306,50 @@ TEST_F(ReshardingCollectionClonerTest, HashedShardKey) { Doc{{"_id", Doc{{"x", getHashedElementValue(0) + 1}}}, {"max", Doc{{"x", V(MAXKEY)}}}, {"shard", "shard3"_sd}}}; - getCatalogCacheMock()->setChunkManagerReturnValue(createChunkManager(sk, configData)); - auto pipeline = makePipeline(std::move(sk), - ShardId("shard2"), - {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), - Doc(fromjson("{_id: 2, x: -1}")), - Doc(fromjson("{_id: 3, x: -0.123}")), - Doc(fromjson("{_id: 4, x: 0}")), - Doc(fromjson("{_id: 5, x: NumberLong(0)}")), - Doc(fromjson("{_id: 6, x: 0.123}")), - Doc(fromjson("{_id: 7, x: 1}")), - Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))}, - std::move(configData)); - - auto next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << -0.123 << "$sortKey" << BSON_ARRAY(3)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0 << "$sortKey" << BSON_ARRAY(4)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0LL << "$sortKey" << BSON_ARRAY(5)), - next->toBson()); - - next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << 0.123 << "$sortKey" << BSON_ARRAY(6)), - next->toBson()); - - ASSERT_FALSE(pipeline->getNext()); + constexpr auto kExpectedCopiedCount = 4; + const auto verify = [](auto cursor) { + auto next = cursor->next(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << -0.123 << "$sortKey" << BSON_ARRAY(3)), + next->data.toBson()); + + next = cursor->next(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0 << "$sortKey" << BSON_ARRAY(4)), + next->data.toBson()); + + next = cursor->next(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0LL << "$sortKey" << BSON_ARRAY(5)), + next->data.toBson()); + + next = cursor->next(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << 0.123 << "$sortKey" << BSON_ARRAY(6)), + next->data.toBson()); + + ASSERT_FALSE(cursor->next()); + }; + + runPipelineTest(std::move(sk), + ShardId("shard2"), + std::move(collectionData), + std::move(configData), + kExpectedCopiedCount, + verify); } TEST_F(ReshardingCollectionClonerTest, CompoundHashedShardKey) { ShardKeyPattern sk{fromjson("{x: 'hashed', y: 1}")}; + std::deque<DocumentSource::GetNextResult> collectionData{ + Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), + Doc(fromjson("{_id: 2, x: -1}")), + Doc(fromjson("{_id: 3, x: -0.123, y: -1}")), + Doc(fromjson("{_id: 4, x: 0, y: 0}")), + Doc(fromjson("{_id: 5, x: NumberLong(0), y: 1}")), + Doc(fromjson("{_id: 6, x: 0.123}")), + Doc(fromjson("{_id: 7, x: 1}")), + Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))}; // Documents in a mock config.cache.chunks collection. Mocked collection boundaries: // - [{x: MinKey, y: MinKey}, {x: hash(0), y: 0}) : shard1 // - [{x: hash(0), y: 0}, {x: hash(0), y: 1}) : shard2 @@ -304,25 +364,23 @@ TEST_F(ReshardingCollectionClonerTest, CompoundHashedShardKey) { Doc{{"_id", Doc{{"x", getHashedElementValue(0)}, {"y", 1}}}, {"max", Doc{{"x", V(MAXKEY)}, {"y", V(MAXKEY)}}}, {"shard", "shard3"_sd}}}; - getCatalogCacheMock()->setChunkManagerReturnValue(createChunkManager(sk, configData)); - auto pipeline = makePipeline(std::move(sk), - ShardId("shard2"), - {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")), - Doc(fromjson("{_id: 2, x: -1}")), - Doc(fromjson("{_id: 3, x: -0.123, y: -1}")), - Doc(fromjson("{_id: 4, x: 0, y: 0}")), - Doc(fromjson("{_id: 5, x: NumberLong(0), y: 1}")), - Doc(fromjson("{_id: 6, x: 0.123}")), - Doc(fromjson("{_id: 7, x: 1}")), - Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))}, - std::move(configData)); - - auto next = pipeline->getNext(); - ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ( - BSON("_id" << 4 << "x" << 0 << "y" << 0 << "$sortKey" << BSON_ARRAY(4)), next->toBson()); - - ASSERT_FALSE(pipeline->getNext()); + constexpr auto kExpectedCopiedCount = 1; + const auto verify = [](auto cursor) { + auto next = cursor->next(); + ASSERT(next); + ASSERT_BSONOBJ_BINARY_EQ( + BSON("_id" << 4 << "x" << 0 << "y" << 0 << "$sortKey" << BSON_ARRAY(4)), + next->data.toBson()); + + ASSERT_FALSE(cursor->next()); + }; + + runPipelineTest(std::move(sk), + ShardId("shard2"), + std::move(collectionData), + std::move(configData), + kExpectedCopiedCount, + verify); } } // namespace |