summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/sharding/resharding_abort_command.js6
-rw-r--r--jstests/sharding/resharding_metrics.js21
-rw-r--r--jstests/sharding/resharding_metrics_increment.js141
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp316
4 files changed, 202 insertions, 282 deletions
diff --git a/jstests/sharding/resharding_abort_command.js b/jstests/sharding/resharding_abort_command.js
index 1b2d531295b..7ddbecccf03 100644
--- a/jstests/sharding/resharding_abort_command.js
+++ b/jstests/sharding/resharding_abort_command.js
@@ -173,12 +173,6 @@ const runAbortWithFailpoint = (failpointName, failpointNodeType, abortLocation,
const topology = DiscoverTopology.findConnectedNodes(mongos);
const configsvr = new Mongo(topology.configsvr.nodes[0]);
- const status = configsvr.getDB('admin').serverStatus({});
- // Resharding has not been attempted yet, so resharding metrics will not be reported. This means
- // shardingStatistics will be empty, and thus not reported. So we assert that the serverStatus
- // does not have shardingStatistics yet.
- assert(!status.hasOwnProperty('shardingStatistics'), status);
-
let expectedAbortErrorCodes = ErrorCodes.OK;
let expectedReshardingErrorCode = ErrorCodes.ReshardCollectionAborted;
diff --git a/jstests/sharding/resharding_metrics.js b/jstests/sharding/resharding_metrics.js
index 0a920fb91e2..f7fdbefabf2 100644
--- a/jstests/sharding/resharding_metrics.js
+++ b/jstests/sharding/resharding_metrics.js
@@ -48,6 +48,21 @@ const inputCollection = reshardingTest.createShardedCollection({
const recipientShardNames = reshardingTest.recipientShardNames;
const topology = DiscoverTopology.findConnectedNodes(inputCollection.getMongo());
+let allNodes = [];
+for (let [_, shardReplSet] of Object.entries(topology.shards)) {
+ allNodes.push(shardReplSet.primary);
+}
+allNodes.push(topology.configsvr.primary);
+allNodes.forEach((hostName) => {
+ const status = new Mongo(hostName).getDB('admin').serverStatus({});
+ if (hostName == topology.configsvr.primary) {
+ assert(!status.hasOwnProperty('shardingStatistics'));
+ return;
+ }
+ const shardingStats = status.shardingStatistics;
+ assert(!shardingStats.hasOwnProperty('resharding'));
+});
+
reshardingTest.withReshardingInBackground(
{
newShardKeyPattern: {newKey: 1},
@@ -91,12 +106,6 @@ reshardingTest.withReshardingInBackground(
tojson(curOpSection));
});
-let allNodes = [];
-for (let [_, shardReplSet] of Object.entries(topology.shards)) {
- allNodes.push(shardReplSet.primary);
-}
-allNodes.push(topology.configsvr.primary);
-
allNodes.forEach((hostName) => {
const serverStatus = getServerStatusSection(new Mongo(hostName));
diff --git a/jstests/sharding/resharding_metrics_increment.js b/jstests/sharding/resharding_metrics_increment.js
deleted file mode 100644
index a8b188f64e9..00000000000
--- a/jstests/sharding/resharding_metrics_increment.js
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Tests that resharding metrics section in server status
- * responds to statistical increments in an expected way.
- *
- * @tags: [
- * uses_atclustertime
- * ]
- */
-
-(function() {
-'use strict';
-
-load("jstests/libs/discover_topology.js");
-load("jstests/sharding/libs/resharding_test_fixture.js");
-
-const kNamespace = "reshardingDb.coll";
-
-// All of the keys in `expected` must be present in `dict`.
-// Furthermore, if the the `expected` key is mapped to a value other than `undefined`,
-// then the corresponding value in `dict` must be equal to that value.
-function verifyDict(dict, expected) {
- for (var key in expected) {
- assert(dict.hasOwnProperty(key), `Missing ${key} in ${tojson(dict)}`);
- const expectedValue = expected[key];
- if (expected[key] === undefined) {
- jsTest.log(`${key}: ${tojson(dict[key])}`);
- continue;
- } else if (key === "oplogEntriesFetched" || key === "oplogEntriesApplied") {
- // The fetcher writes no-op entries for each getMore that returns an empty batch. We
- // won't know how many getMores it called however, so we can only check that the metrics
- // are gte the number of writes we're aware of.
- assert.gte(dict[key], expected[key]);
- } else {
- assert.eq(dict[key],
- expected[key],
- `Expected the value for ${key} to be ${expectedValue}: ${tojson(dict)}`);
- }
- }
-}
-
-const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 2});
-reshardingTest.setup();
-
-const donorShardNames = reshardingTest.donorShardNames;
-const recipientShardNames = reshardingTest.recipientShardNames;
-
-const inputCollection = reshardingTest.createShardedCollection({
- ns: kNamespace,
- shardKeyPattern: {oldKey: 1},
- chunks: [
- {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]},
- {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]},
- ],
-});
-
-assert.commandWorked(inputCollection.insert([
- {_id: "stays on shard0", oldKey: -10, newKey: -10},
- {_id: "moves to shard0", oldKey: 10, newKey: -10},
- {_id: "moves to shard1", oldKey: -10, newKey: 10},
- {_id: "stays on shard1", oldKey: 10, newKey: 10},
-]));
-
-function awaitEstablishmentOfCloneTimestamp(inputCollection) {
- const mongos = inputCollection.getMongo();
- assert.soon(() => {
- const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({
- ns: inputCollection.getFullName()
- });
- return coordinatorDoc !== null && coordinatorDoc.cloneTimestamp !== undefined;
- });
-}
-
-// Start the resharding operation, and wait for it to establish a fetch
-// timestamp, which indicates the beginning of the apply phase. Then perform a
-// few late inserts to verify that those show up as both "fetched" and
-// "applied" in the metrics.
-reshardingTest.withReshardingInBackground( //
- {
- newShardKeyPattern: {newKey: 1},
- newChunks: [
- {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]},
- {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]},
- ],
- },
- (tempNs) => {
- jsTest.log(`tempNs: ${tempNs}`);
- awaitEstablishmentOfCloneTimestamp(inputCollection);
- for (var i = 0; i < 10; ++i)
- assert.commandWorked(
- inputCollection.insert({_id: `late ${i}`, oldKey: 10, newKey: 10}));
- });
-
-const mongos = inputCollection.getMongo();
-const topology = DiscoverTopology.findConnectedNodes(mongos);
-
-// There's one terminating "no-op" oplog entry from each donor marking the
-// boundary between the cloning phase and the applying phase. So there's a
-// baseline of 2 fetches/applies on each recipient (one "no-op" for each donor).
-// Additionally, recipientShard[1] gets the 10 late inserts above, so expect 12
-// oplogEntry applies for those late inserts.
-[{
- shardName: recipientShardNames[0],
- documents: 2, // These are the no-op final oplog entries, one from each source shard.
- fetched: 2,
- applied: 2,
- opcounters: {insert: 0, update: 0, delete: 0}
-},
- {
- shardName: recipientShardNames[1],
- documents: 2,
- fetched: 12,
- applied: 12,
- opcounters: {insert: 10, update: 0, delete: 0}
- },
-].forEach(e => {
- const mongo = new Mongo(topology.shards[e.shardName].primary);
- const doc = mongo.getDB('admin').serverStatus({});
-
- var sub = doc;
- ['shardingStatistics', 'resharding'].forEach(k => {
- assert(sub.hasOwnProperty(k), sub);
- sub = sub[k];
- });
-
- jsTest.log(`Resharding stats for ${mongo}: ${tojson(sub)}`);
-
- verifyDict(sub.active, {
- "documentsCopied": e.documents,
- "oplogEntriesFetched": e.fetched,
- "oplogEntriesApplied": e.applied,
- "insertsApplied": e.opcounters.insert,
- "updatesApplied": e.opcounters.update,
- "deletesApplied": e.opcounters.delete,
- });
-
- // bytesCopied is harder to pin down but it should be >0.
- assert.betweenIn(1, sub.active['bytesCopied'], 1024, 'bytesCopied');
-});
-
-reshardingTest.teardown();
-})();
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
index 93557d8b6eb..45679367673 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
@@ -33,9 +33,11 @@
#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/json.h"
+#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/hasher.h"
#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/resharding/resharding_collection_cloner.h"
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_util.h"
@@ -76,13 +78,11 @@ private:
class ReshardingCollectionClonerTest : public ShardServerTestFixtureWithCatalogCacheMock {
protected:
- std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
- ShardKeyPattern newShardKeyPattern,
- ShardId recipientShard,
- std::deque<DocumentSource::GetNextResult> sourceCollectionData,
- std::deque<DocumentSource::GetNextResult> configCacheChunksData) {
- auto tempNss = resharding::constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
-
+ void initializePipelineTest(
+ const ShardKeyPattern& newShardKeyPattern,
+ const ShardId& recipientShard,
+ const std::deque<DocumentSource::GetNextResult>& sourceCollectionData,
+ const std::deque<DocumentSource::GetNextResult>& configCacheChunksData) {
_metrics = ReshardingMetrics::makeInstance(_sourceUUID,
newShardKeyPattern.toBSON(),
_sourceNss,
@@ -90,22 +90,23 @@ protected:
getServiceContext()->getFastClockSource()->now(),
getServiceContext());
- ReshardingCollectionCloner cloner(_metrics.get(),
- std::move(newShardKeyPattern),
- _sourceNss,
- _sourceUUID,
- std::move(recipientShard),
- Timestamp(1, 0), /* dummy value */
- std::move(tempNss));
+ _cloner = std::make_unique<ReshardingCollectionCloner>(
+ _metrics.get(),
+ ShardKeyPattern(newShardKeyPattern.toBSON()),
+ _sourceNss,
+ _sourceUUID,
+ recipientShard,
+ Timestamp(1, 0), /* dummy value */
+ tempNss);
- auto pipeline = cloner.makePipeline(
- operationContext(),
- std::make_shared<MockMongoInterface>(std::move(configCacheChunksData)));
+ getCatalogCacheMock()->setChunkManagerReturnValue(
+ createChunkManager(newShardKeyPattern, configCacheChunksData));
- pipeline->addInitialSource(DocumentSourceMock::createForTest(
- std::move(sourceCollectionData), pipeline->getContext()));
+ _pipeline = _cloner->makePipeline(
+ operationContext(), std::make_shared<MockMongoInterface>(configCacheChunksData));
- return pipeline;
+ _pipeline->addInitialSource(
+ DocumentSourceMock::createForTest(sourceCollectionData, _pipeline->getContext()));
}
template <class T>
@@ -116,6 +117,11 @@ protected:
void setUp() override {
ShardServerTestFixtureWithCatalogCacheMock::setUp();
+
+ OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection(
+ operationContext());
+ uassertStatusOK(createCollection(
+ operationContext(), tempNss.db().toString(), BSON("create" << tempNss.coll())));
}
void tearDown() override {
@@ -157,87 +163,135 @@ protected:
boost::none);
}
-private:
+ void runPipelineTest(
+ ShardKeyPattern shardKey,
+ const ShardId& recipientShard,
+ std::deque<DocumentSource::GetNextResult> collectionData,
+ std::deque<DocumentSource::GetNextResult> configData,
+ int64_t expectedDocumentsCount,
+ std::function<void(std::unique_ptr<SeekableRecordCursor>)> verifyFunction) {
+ initializePipelineTest(shardKey, recipientShard, collectionData, configData);
+ auto opCtx = operationContext();
+ AutoGetCollection tempColl{opCtx, tempNss, MODE_IS};
+ while (_cloner->doOneBatch(operationContext(), *_pipeline)) {
+ ASSERT_EQ(tempColl->numRecords(opCtx), _metrics->getDocumentsCopiedCount());
+ ASSERT_EQ(tempColl->dataSize(opCtx), _metrics->getBytesCopiedCount());
+ }
+ ASSERT_EQ(tempColl->numRecords(operationContext()), expectedDocumentsCount);
+ ASSERT_EQ(_metrics->getDocumentsCopiedCount(), expectedDocumentsCount);
+ ASSERT_GT(tempColl->dataSize(opCtx), 0);
+ ASSERT_EQ(tempColl->dataSize(opCtx), _metrics->getBytesCopiedCount());
+ verifyFunction(tempColl->getCursor(opCtx));
+ }
+
+protected:
const NamespaceString _sourceNss = NamespaceString("test"_sd, "collection_being_resharded"_sd);
+ const NamespaceString tempNss =
+ resharding::constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
const UUID _sourceUUID = UUID::gen();
const ReshardingSourceId _sourceId{UUID::gen(), _myShardName};
const DatabaseVersion _sourceDbVersion{UUID::gen(), Timestamp(1, 1)};
std::unique_ptr<ReshardingMetrics> _metrics;
+ std::unique_ptr<ReshardingCollectionCloner> _cloner;
+ std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
};
TEST_F(ReshardingCollectionClonerTest, MinKeyChunk) {
ShardKeyPattern sk{fromjson("{x: 1}")};
+ std::deque<DocumentSource::GetNextResult> collectionData{
+ Doc(fromjson("{_id: 1, x: {$minKey: 1}}")),
+ Doc(fromjson("{_id: 2, x: -0.001}")),
+ Doc(fromjson("{_id: 3, x: NumberLong(0)}")),
+ Doc(fromjson("{_id: 4, x: 0.0}")),
+ Doc(fromjson("{_id: 5, x: 0.001}")),
+ Doc(fromjson("{_id: 6, x: {$maxKey: 1}}"))};
std::deque<DocumentSource::GetNextResult> configData{
Doc(fromjson("{_id: {x: {$minKey: 1}}, max: {x: 0.0}, shard: 'myShardName'}")),
Doc(fromjson("{_id: {x: 0.0}, max: {x: {$maxKey: 1}}, shard: 'shard2' }"))};
- getCatalogCacheMock()->setChunkManagerReturnValue(createChunkManager(sk, configData));
- auto pipeline = makePipeline(std::move(sk),
- _myShardName,
- {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")),
- Doc(fromjson("{_id: 2, x: -0.001}")),
- Doc(fromjson("{_id: 3, x: NumberLong(0)}")),
- Doc(fromjson("{_id: 4, x: 0.0}")),
- Doc(fromjson("{_id: 5, x: 0.001}")),
- Doc(fromjson("{_id: 6, x: {$maxKey: 1}}"))},
- std::move(configData));
-
- auto next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 1 << "x" << MINKEY << "$sortKey" << BSON_ARRAY(1)),
- next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001 << "$sortKey" << BSON_ARRAY(2)),
- next->toBson());
-
- ASSERT_FALSE(pipeline->getNext());
+ constexpr auto kExpectedCopiedCount = 2;
+ const auto verify = [](auto cursor) {
+ auto next = cursor->next();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 1 << "x" << MINKEY << "$sortKey" << BSON_ARRAY(1)),
+ next->data.toBson());
+
+ next = cursor->next();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001 << "$sortKey" << BSON_ARRAY(2)),
+ next->data.toBson());
+
+ ASSERT_FALSE(cursor->next());
+ };
+
+ runPipelineTest(std::move(sk),
+ _myShardName,
+ std::move(collectionData),
+ std::move(configData),
+ kExpectedCopiedCount,
+ verify);
}
TEST_F(ReshardingCollectionClonerTest, MaxKeyChunk) {
ShardKeyPattern sk{fromjson("{x: 1}")};
+ std::deque<DocumentSource::GetNextResult> collectionData{
+ Doc(fromjson("{_id: 1, x: {$minKey: 1}}")),
+ Doc(fromjson("{_id: 2, x: -0.001}")),
+ Doc(fromjson("{_id: 3, x: NumberLong(0)}")),
+ Doc(fromjson("{_id: 4, x: 0.0}")),
+ Doc(fromjson("{_id: 5, x: 0.001}")),
+ Doc(fromjson("{_id: 6, x: {$maxKey: 1}}"))};
std::deque<DocumentSource::GetNextResult> configData{
Doc(fromjson("{_id: {x: {$minKey: 1}}, max: {x: 0.0}, shard: 'myShardName'}")),
Doc(fromjson("{_id: {x: 0.0}, max: {x: {$maxKey: 1}}, shard: 'shard2' }")),
};
- getCatalogCacheMock()->setChunkManagerReturnValue(createChunkManager(sk, configData));
- auto pipeline = makePipeline(std::move(sk),
- ShardId("shard2"),
- {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")),
- Doc(fromjson("{_id: 2, x: -0.001}")),
- Doc(fromjson("{_id: 3, x: NumberLong(0)}")),
- Doc(fromjson("{_id: 4, x: 0.0}")),
- Doc(fromjson("{_id: 5, x: 0.001}")),
- Doc(fromjson("{_id: 6, x: {$maxKey: 1}}}"))},
- std::move(configData));
-
- auto next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << 0LL << "$sortKey" << BSON_ARRAY(3)),
- next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0 << "$sortKey" << BSON_ARRAY(4)),
- next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001 << "$sortKey" << BSON_ARRAY(5)),
- next->toBson());
-
- // TODO SERVER-67529: Enable after ChunkManager can handle documents with $maxKey.
- // next = pipeline->getNext();
- // ASSERT(next);
- // ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << MAXKEY << "$sortKey" << BSON_ARRAY(6)),
- // next->toBson());
-
- ASSERT_FALSE(pipeline->getNext());
+ // TODO SERVER-67529: Change expected documents to 4.
+ constexpr auto kExpectedCopiedCount = 3;
+ const auto verify = [](auto cursor) {
+ auto next = cursor->next();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << 0LL << "$sortKey" << BSON_ARRAY(3)),
+ next->data.toBson());
+
+ next = cursor->next();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0 << "$sortKey" << BSON_ARRAY(4)),
+ next->data.toBson());
+
+ next = cursor->next();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001 << "$sortKey" << BSON_ARRAY(5)),
+ next->data.toBson());
+
+ // TODO SERVER-67529: Enable after ChunkManager can handle documents with $maxKey.
+ // next = cursor->next();
+ // ASSERT(next);
+ // ASSERT_BSONOBJ_BINARY_EQ(
+ // BSON("_id" << 6 << "x" << MAXKEY << "$sortKey" << BSON_ARRAY(6)),
+ // next->data.toBson());
+
+ ASSERT_FALSE(cursor->next());
+ };
+
+ runPipelineTest(std::move(sk),
+ ShardId("shard2"),
+ std::move(collectionData),
+ std::move(configData),
+ kExpectedCopiedCount,
+ verify);
}
TEST_F(ReshardingCollectionClonerTest, HashedShardKey) {
ShardKeyPattern sk{fromjson("{x: 'hashed'}")};
+ std::deque<DocumentSource::GetNextResult> collectionData{
+ Doc(fromjson("{_id: 1, x: {$minKey: 1}}")),
+ Doc(fromjson("{_id: 2, x: -1}")),
+ Doc(fromjson("{_id: 3, x: -0.123}")),
+ Doc(fromjson("{_id: 4, x: 0}")),
+ Doc(fromjson("{_id: 5, x: NumberLong(0)}")),
+ Doc(fromjson("{_id: 6, x: 0.123}")),
+ Doc(fromjson("{_id: 7, x: 1}")),
+ Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))};
// Documents in a mock config.cache.chunks collection. Mocked collection boundaries:
// - [MinKey, hash(0)) : shard1
// - [hash(0), hash(0) + 1) : shard2
@@ -252,44 +306,50 @@ TEST_F(ReshardingCollectionClonerTest, HashedShardKey) {
Doc{{"_id", Doc{{"x", getHashedElementValue(0) + 1}}},
{"max", Doc{{"x", V(MAXKEY)}}},
{"shard", "shard3"_sd}}};
- getCatalogCacheMock()->setChunkManagerReturnValue(createChunkManager(sk, configData));
- auto pipeline = makePipeline(std::move(sk),
- ShardId("shard2"),
- {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")),
- Doc(fromjson("{_id: 2, x: -1}")),
- Doc(fromjson("{_id: 3, x: -0.123}")),
- Doc(fromjson("{_id: 4, x: 0}")),
- Doc(fromjson("{_id: 5, x: NumberLong(0)}")),
- Doc(fromjson("{_id: 6, x: 0.123}")),
- Doc(fromjson("{_id: 7, x: 1}")),
- Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))},
- std::move(configData));
-
- auto next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << -0.123 << "$sortKey" << BSON_ARRAY(3)),
- next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0 << "$sortKey" << BSON_ARRAY(4)),
- next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0LL << "$sortKey" << BSON_ARRAY(5)),
- next->toBson());
-
- next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << 0.123 << "$sortKey" << BSON_ARRAY(6)),
- next->toBson());
-
- ASSERT_FALSE(pipeline->getNext());
+ constexpr auto kExpectedCopiedCount = 4;
+ const auto verify = [](auto cursor) {
+ auto next = cursor->next();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << -0.123 << "$sortKey" << BSON_ARRAY(3)),
+ next->data.toBson());
+
+ next = cursor->next();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0 << "$sortKey" << BSON_ARRAY(4)),
+ next->data.toBson());
+
+ next = cursor->next();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0LL << "$sortKey" << BSON_ARRAY(5)),
+ next->data.toBson());
+
+ next = cursor->next();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << 0.123 << "$sortKey" << BSON_ARRAY(6)),
+ next->data.toBson());
+
+ ASSERT_FALSE(cursor->next());
+ };
+
+ runPipelineTest(std::move(sk),
+ ShardId("shard2"),
+ std::move(collectionData),
+ std::move(configData),
+ kExpectedCopiedCount,
+ verify);
}
TEST_F(ReshardingCollectionClonerTest, CompoundHashedShardKey) {
ShardKeyPattern sk{fromjson("{x: 'hashed', y: 1}")};
+ std::deque<DocumentSource::GetNextResult> collectionData{
+ Doc(fromjson("{_id: 1, x: {$minKey: 1}}")),
+ Doc(fromjson("{_id: 2, x: -1}")),
+ Doc(fromjson("{_id: 3, x: -0.123, y: -1}")),
+ Doc(fromjson("{_id: 4, x: 0, y: 0}")),
+ Doc(fromjson("{_id: 5, x: NumberLong(0), y: 1}")),
+ Doc(fromjson("{_id: 6, x: 0.123}")),
+ Doc(fromjson("{_id: 7, x: 1}")),
+ Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))};
// Documents in a mock config.cache.chunks collection. Mocked collection boundaries:
// - [{x: MinKey, y: MinKey}, {x: hash(0), y: 0}) : shard1
// - [{x: hash(0), y: 0}, {x: hash(0), y: 1}) : shard2
@@ -304,25 +364,23 @@ TEST_F(ReshardingCollectionClonerTest, CompoundHashedShardKey) {
Doc{{"_id", Doc{{"x", getHashedElementValue(0)}, {"y", 1}}},
{"max", Doc{{"x", V(MAXKEY)}, {"y", V(MAXKEY)}}},
{"shard", "shard3"_sd}}};
- getCatalogCacheMock()->setChunkManagerReturnValue(createChunkManager(sk, configData));
- auto pipeline = makePipeline(std::move(sk),
- ShardId("shard2"),
- {Doc(fromjson("{_id: 1, x: {$minKey: 1}}")),
- Doc(fromjson("{_id: 2, x: -1}")),
- Doc(fromjson("{_id: 3, x: -0.123, y: -1}")),
- Doc(fromjson("{_id: 4, x: 0, y: 0}")),
- Doc(fromjson("{_id: 5, x: NumberLong(0), y: 1}")),
- Doc(fromjson("{_id: 6, x: 0.123}")),
- Doc(fromjson("{_id: 7, x: 1}")),
- Doc(fromjson("{_id: 8, x: {$maxKey: 1}}"))},
- std::move(configData));
-
- auto next = pipeline->getNext();
- ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(
- BSON("_id" << 4 << "x" << 0 << "y" << 0 << "$sortKey" << BSON_ARRAY(4)), next->toBson());
-
- ASSERT_FALSE(pipeline->getNext());
+ constexpr auto kExpectedCopiedCount = 1;
+ const auto verify = [](auto cursor) {
+ auto next = cursor->next();
+ ASSERT(next);
+ ASSERT_BSONOBJ_BINARY_EQ(
+ BSON("_id" << 4 << "x" << 0 << "y" << 0 << "$sortKey" << BSON_ARRAY(4)),
+ next->data.toBson());
+
+ ASSERT_FALSE(cursor->next());
+ };
+
+ runPipelineTest(std::move(sk),
+ ShardId("shard2"),
+ std::move(collectionData),
+ std::move(configData),
+ kExpectedCopiedCount,
+ verify);
}
} // namespace