diff options
author | Matt Walak <matt.walak@mongodb.com> | 2021-08-02 19:15:01 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-11-03 07:24:09 +0000 |
commit | a8659cae814ae75d4ed80d3078ad59b0ee6de8da (patch) | |
tree | f5911621a9e3e8860f3e7e656df77ddd604109a2 | |
parent | e2a54677d723750c26c334f024cdb408b9c094d3 (diff) | |
download | mongo-a8659cae814ae75d4ed80d3078ad59b0ee6de8da.tar.gz |
SERVER-57700 Measure latency/throughput of resharding::data_copy::fillBatchForInsert in ReshardingCollectionCloner::doOneBatch
(cherry picked from commit 98a21127ae4957594276a35228a95d8d79d6ad25)
9 files changed, 317 insertions, 184 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 3799d1e9334..b9cf36950d5 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -60,6 +60,8 @@ last-continuous: test_file: jstests/sharding/test_resharding_test_fixture_shutdown_retry_needed.js - ticket: SERVER-59505 test_file: jstests/core/timeseries/timeseries_find.js + - ticket: SERVER-57700 + test_file: jstests/sharding/resharding_histogram_metrics.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: @@ -272,6 +274,8 @@ last-lts: test_file: jstests/aggregation/range.js - ticket: SERVER-59923 test_file: jstests/sharding/test_resharding_test_fixture_shutdown_retry_needed.js + - ticket: SERVER-57700 + test_file: jstests/sharding/resharding_histogram_metrics.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: diff --git a/jstests/sharding/resharding_batch_oplog_latency_histogram_metrics.js b/jstests/sharding/resharding_batch_oplog_latency_histogram_metrics.js deleted file mode 100644 index b4779fd8cb0..00000000000 --- a/jstests/sharding/resharding_batch_oplog_latency_histogram_metrics.js +++ /dev/null @@ -1,134 +0,0 @@ -// -// Test to verify that latency metrics are collected in both currentOp and cumulativeOp -// when batches of oplogs are applied during resharding. -// -// @tags: [ -// requires_fcv_51, -// uses_atclustertime, -// ] -// - -(function() { -'use strict'; - -load('jstests/libs/discover_topology.js'); -load('jstests/sharding/libs/resharding_test_fixture.js'); - -const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 2, reshardInPlace: true}); -reshardingTest.setup(); - -const kHistogramTag = "oplogBatchApplyLatencyMillis"; -const kDbName = "reshardingDb"; -const collName = "coll"; -const ns = kDbName + "." + collName; - -const donorShardNames = reshardingTest.donorShardNames; -const testColl = reshardingTest.createShardedCollection({ - ns: ns, - shardKeyPattern: {x: 1, s: 1}, - chunks: [ - {min: {x: MinKey, s: MinKey}, max: {x: 5, s: 5}, shard: donorShardNames[0]}, - {min: {x: 5, s: 5}, max: {x: MaxKey, s: MaxKey}, shard: donorShardNames[1]}, - ], -}); - -function getCumulativeOpReport(mongo) { - const stats = mongo.getDB('admin').serverStatus({}); - assert(stats.hasOwnProperty('shardingStatistics'), stats); - const shardingStats = stats.shardingStatistics; - assert(shardingStats.hasOwnProperty('resharding'), - `Missing resharding section in ${tojson(shardingStats)}`); - - return shardingStats.resharding; -} - -function getCurrentOpReport(mongo, role) { - return mongo.getDB("admin").currentOp( - {ns: ns, desc: {$regex: 'Resharding' + role + 'Service.*'}}); -} - -const mongos = testColl.getMongo(); -const topology = DiscoverTopology.findConnectedNodes(mongos); -const recipientShardNames = reshardingTest.recipientShardNames; -const docsToInsert = [ - {_id: 1, x: 0, s: 6, y: 0}, // Stays on shard0. - {_id: 2, x: 0, s: 0, y: 6}, // Moves to shard1. - {_id: 3, x: 6, s: 6, y: 0}, // Moves to shard0. - {_id: 4, x: 6, s: 0, y: 6}, // Stays on shard1. -]; - -// First test that histogram metrics appear in currentOp. -let batchAppliesInFirstReshard = 0; -reshardingTest.withReshardingInBackground( - { - newShardKeyPattern: {y: 1, s: 1}, - newChunks: [ - {min: {y: MinKey, s: MinKey}, max: {y: 5, s: 5}, shard: recipientShardNames[0]}, - {min: {y: 5, s: 5}, max: {y: MaxKey, s: MaxKey}, shard: recipientShardNames[1]}, - ], - }, - () => { - reshardingTest.awaitCloneTimestampChosen(); - assert.commandWorked(testColl.insertMany(docsToInsert)); - }, - { - postCheckConsistencyFn: () => { - reshardingTest.recipientShardNames.forEach(function(shardName) { - const report = - getCurrentOpReport(new Mongo(topology.shards[shardName].primary), "Recipient"); - assert(report.inprog.length === 1, - `expected report.inprog.length === 1, - instead found ${report.inprog.length}`); - const op = report.inprog[0]; - assert(op.hasOwnProperty(kHistogramTag), - `Missing ${kHistogramTag} in ${tojson(op)}`); - let batchAppliesThisRecipient = op[kHistogramTag]["ops"]; - batchAppliesInFirstReshard += batchAppliesThisRecipient; - }); - - assert(batchAppliesInFirstReshard > 0, - `Expected greater than 0 recorded batch applies, - got ${batchAppliesInFirstReshard} instead.`); - } - }); - -// Next test that histogram metrics accumulate in cumulativeOp. -const collName_2 = "coll2"; -const ns_2 = kDbName + "." + collName_2; - -const testColl_2 = reshardingTest.createShardedCollection({ - ns: ns_2, - shardKeyPattern: {x: 1, s: 1}, - chunks: [ - {min: {x: MinKey, s: MinKey}, max: {x: 5, s: 5}, shard: donorShardNames[0]}, - {min: {x: 5, s: 5}, max: {x: MaxKey, s: MaxKey}, shard: donorShardNames[1]}, - ], -}); - -reshardingTest.withReshardingInBackground( - { - newShardKeyPattern: {y: 1, s: 1}, - newChunks: [ - {min: {y: MinKey, s: MinKey}, max: {y: 5, s: 5}, shard: recipientShardNames[0]}, - {min: {y: 5, s: 5}, max: {y: MaxKey, s: MaxKey}, shard: recipientShardNames[1]}, - ], - }, - () => { - reshardingTest.awaitCloneTimestampChosen(); - assert.commandWorked(testColl_2.insertMany(docsToInsert)); - }); - -let cumulativeBatchApplies = 0; -reshardingTest.recipientShardNames.forEach(function(shardName) { - let report = getCumulativeOpReport(new Mongo(topology.shards[shardName].primary)); - assert(report.hasOwnProperty(kHistogramTag)); - let cumulativeBatchAppliesThisRecipient = report[kHistogramTag]["ops"]; - cumulativeBatchApplies += cumulativeBatchAppliesThisRecipient; -}); - -assert(cumulativeBatchApplies > batchAppliesInFirstReshard, `Expected batch oplog applies to accumluate. - Instead found ${cumulativeBatchApplies} cumulative applies, compared to ${batchAppliesInFirstReshard} - from first reshard operation.`); - -reshardingTest.teardown(); -})(); diff --git a/jstests/sharding/resharding_histogram_metrics.js b/jstests/sharding/resharding_histogram_metrics.js new file mode 100644 index 00000000000..2f93ac17ca1 --- /dev/null +++ b/jstests/sharding/resharding_histogram_metrics.js @@ -0,0 +1,195 @@ +// Test to verify that latency metrics are collected in both currentOp and cumulativeOp +// during resharding. +// +// @tags: [ +// requires_fcv_50, +// uses_atclustertime, +// ] +// + +(function() { +'use strict'; + +load('jstests/libs/discover_topology.js'); +load('jstests/sharding/libs/resharding_test_fixture.js'); + +const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 2, reshardInPlace: true}); +reshardingTest.setup(); + +const kOplogApplierApplyBatchLatencyMillis = "oplogApplierApplyBatchLatencyMillis"; +const kCollClonerFillBatchForInsertLatencyMillis = "collClonerFillBatchForInsertLatencyMillis"; +const kDocumentsCopied = "documentsCopied"; +const kDbName = "reshardingDb"; +const collName = "coll"; +const ns = kDbName + "." + collName; + +const donorShardNames = reshardingTest.donorShardNames; +const testColl = reshardingTest.createShardedCollection({ + ns: ns, + shardKeyPattern: {x: 1, s: 1}, + chunks: [ + {min: {x: MinKey, s: MinKey}, max: {x: 5, s: 5}, shard: donorShardNames[0]}, + {min: {x: 5, s: 5}, max: {x: MaxKey, s: MaxKey}, shard: donorShardNames[1]}, + ], +}); + +function setParameter(conn, field, value) { + var cmd = {setParameter: 1}; + cmd[field] = value; + return conn.adminCommand(cmd); +} + +function getCumulativeOpReport(mongo) { + const stats = mongo.getDB('admin').serverStatus({}); + assert(stats.hasOwnProperty('shardingStatistics'), stats); + const shardingStats = stats.shardingStatistics; + assert(shardingStats.hasOwnProperty('resharding'), + `Missing resharding section in ${tojson(shardingStats)}`); + + return shardingStats.resharding; +} + +function getCurrentOpReport(mongo, role) { + return mongo.getDB("admin").currentOp( + {ns: ns, desc: {$regex: 'Resharding' + role + 'Service.*'}}); +} + +function getReshardingMetricsReport(mongo, role) { + if (role === "Cumulative") { + return getCumulativeOpReport(mongo); + } else { + const report = getCurrentOpReport(mongo, role); + assert(report.inprog.length === 1, + `expected report.inprog.length === 1, + instead found ${report.inprog.length}`); + return report.inprog[0]; + } +} + +const mongos = testColl.getMongo(); +const topology = DiscoverTopology.findConnectedNodes(mongos); +const recipientShardNames = reshardingTest.recipientShardNames; + +recipientShardNames.forEach(function(shardName) { + const mongo = new Mongo(topology.shards[shardName].primary); + + // Batches from resharding::data_copy::fillBatchForInsert are filled with documents + // until a document pushes the batch over the expected size. Setting the server + // parameter 'reshardingCollectionClonerBatchSizeInBytes' to the minimum value of 1 + // ensures that the first document added to the batch will exceed this value, forcing + // every batch to contain only 1 document. + assert.commandWorked(setParameter(mongo, "reshardingCollectionClonerBatchSizeInBytes", 1)); +}); + +const docsToInsertBeforeResharding = [ + {_id: 1, x: 0, s: 6, y: 0}, // Stays on shard0. + {_id: 2, x: 0, s: 0, y: 6}, // Moves to shard1. + {_id: 3, x: 6, s: 6, y: 0}, // Moves to shard0. + {_id: 4, x: 6, s: 0, y: 6}, // Stays on shard1. +]; +const docsToInsertDuringResharding = [ + {_id: 5, x: 0, s: 6, y: 0}, // Stays on shard0. + {_id: 6, x: 0, s: 0, y: 6}, // Moves to shard1. + {_id: 7, x: 6, s: 6, y: 0}, // Moves to shard0. + {_id: 8, x: 6, s: 0, y: 6}, // Stays on shard1. +]; +assert.commandWorked(testColl.insertMany(docsToInsertBeforeResharding)); + +// First test that histogram metrics appear in currentOp. +let firstReshardBatchApplies = 0; +reshardingTest.withReshardingInBackground( + { + newShardKeyPattern: {y: 1, s: 1}, + newChunks: [ + {min: {y: MinKey, s: MinKey}, max: {y: 5, s: 5}, shard: recipientShardNames[0]}, + {min: {y: 5, s: 5}, max: {y: MaxKey, s: MaxKey}, shard: recipientShardNames[1]}, + ], + }, + () => { + reshardingTest.awaitCloneTimestampChosen(); + assert.commandWorked(testColl.insertMany(docsToInsertDuringResharding)); + }, + { + postCheckConsistencyFn: () => { + recipientShardNames.forEach(function(shardName) { + const mongo = new Mongo(topology.shards[shardName].primary); + + const reshardingMetrics = getReshardingMetricsReport(mongo, "Recipient"); + const oplogApplierApplyBatchHist = + reshardingMetrics[kOplogApplierApplyBatchLatencyMillis]; + const collClonerFillBatchForInsertHist = + reshardingMetrics[kCollClonerFillBatchForInsertLatencyMillis]; + + // We expect 1 batch insert per document on each shard, plus 1 empty batch + // to discover no documents are left. + const expectedBatchInserts = reshardingMetrics[kDocumentsCopied] + 1; + const receivedBatchInserts = collClonerFillBatchForInsertHist["ops"]; + assert(expectedBatchInserts == receivedBatchInserts, + `expected ${expectedBatchInserts} batch inserts, + received ${receivedBatchInserts}`); + + firstReshardBatchApplies += oplogApplierApplyBatchHist["ops"]; + }); + + assert(firstReshardBatchApplies > 0, + `Expected greater than 0 recorded batch applies, + got ${firstReshardBatchApplies} instead.`); + } + }); + +// Next test that histogram metrics accumulate in cumulativeOp. +const collName_2 = "coll2"; +const ns_2 = kDbName + "." + collName_2; + +const testColl_2 = reshardingTest.createShardedCollection({ + ns: ns_2, + shardKeyPattern: {x: 1, s: 1}, + chunks: [ + {min: {x: MinKey, s: MinKey}, max: {x: 5, s: 5}, shard: donorShardNames[0]}, + {min: {x: 5, s: 5}, max: {x: MaxKey, s: MaxKey}, shard: donorShardNames[1]}, + ], +}); + +assert.commandWorked(testColl_2.insertMany(docsToInsertBeforeResharding)); +reshardingTest.withReshardingInBackground( + { + newShardKeyPattern: {y: 1, s: 1}, + newChunks: [ + {min: {y: MinKey, s: MinKey}, max: {y: 5, s: 5}, shard: recipientShardNames[0]}, + {min: {y: 5, s: 5}, max: {y: MaxKey, s: MaxKey}, shard: recipientShardNames[1]}, + ], + }, + () => { + reshardingTest.awaitCloneTimestampChosen(); + assert.commandWorked(testColl_2.insertMany(docsToInsertDuringResharding)); + }); + +let cumulativeBatchApplies = 0; +let cumulativeBatchInserts = 0; +let totalDocumentsCopied = 0; +recipientShardNames.forEach(function(shardName) { + const mongo = new Mongo(topology.shards[shardName].primary); + + const reshardingMetrics = getReshardingMetricsReport(mongo, "Cumulative"); + const oplogApplierApplyBatchHist = reshardingMetrics[kOplogApplierApplyBatchLatencyMillis]; + const collClonerFillBatchForInsertHist = + reshardingMetrics[kCollClonerFillBatchForInsertLatencyMillis]; + + cumulativeBatchApplies += oplogApplierApplyBatchHist["ops"]; + cumulativeBatchInserts += collClonerFillBatchForInsertHist["ops"]; + totalDocumentsCopied += reshardingMetrics[kDocumentsCopied]; +}); + +// We expect the cumulative number of batch inserts to be equal to the total number of documents +// copied during cloning plus one empty batch for each recipient for both resharding operations. +const expectedCumulativeBatchInserts = totalDocumentsCopied + 2 * recipientShardNames.length; + +assert(cumulativeBatchApplies > firstReshardBatchApplies, `Expected batch oplog applies + to accumluate. Instead found ${cumulativeBatchApplies} cumulative applies, + compared to ${firstReshardBatchApplies} from first reshard operation.`); +assert(cumulativeBatchInserts == expectedCumulativeBatchInserts, `Expected + ${expectedCumulativeBatchInserts} cumulative batch inserts. Instead found + ${cumulativeBatchInserts}`); + +reshardingTest.teardown(); +})(); diff --git a/jstests/sharding/resharding_metrics.js b/jstests/sharding/resharding_metrics.js index 7a20d105c0e..772b7a4321b 100644 --- a/jstests/sharding/resharding_metrics.js +++ b/jstests/sharding/resharding_metrics.js @@ -141,7 +141,8 @@ function verifyCurrentOpOutput(reshardingTest, inputCollection) { "totalApplyTimeElapsedSecs": undefined, "recipientState": undefined, "opStatus": "running", - "oplogBatchApplyLatencyMillis": undefined, + "oplogApplierApplyBatchLatencyMillis": undefined, + "collClonerFillBatchForInsertLatencyMillis": undefined, }; reshardingTest.recipientShardNames.forEach(function(shardName) { diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index c18064466a9..0225c21cc69 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -259,8 +259,12 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_restartP bool ReshardingCollectionCloner::doOneBatch(OperationContext* opCtx, Pipeline& pipeline) { pipeline.reattachToOperationContext(opCtx); ON_BLOCK_EXIT([&pipeline] { pipeline.detachFromOperationContext(); }); + + Timer latencyTimer; auto batch = resharding::data_copy::fillBatchForInsert( pipeline, resharding::gReshardingCollectionClonerBatchSizeInBytes.load()); + _env->metrics()->onCollClonerFillBatchForInsert( + duration_cast<Milliseconds>(latencyTimer.elapsed())); if (batch.empty()) { return false; diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 1ff1abe993c..be231bce447 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -70,7 +70,9 @@ constexpr auto kLastOpEndingChunkImbalance = "lastOpEndingChunkImbalance"; constexpr auto kOpCounters = "opcounters"; constexpr auto kMinRemainingOperationTime = "minShardRemainingOperationTimeEstimatedMillis"; constexpr auto kMaxRemainingOperationTime = "maxShardRemainingOperationTimeEstimatedMillis"; -constexpr auto kOplogBatchApplyLatencyMillis = "oplogBatchApplyLatencyMillis"; +constexpr auto kOplogApplierApplyBatchLatencyMillis = "oplogApplierApplyBatchLatencyMillis"; +constexpr auto kCollClonerFillBatchForInsertLatencyMillis = + "collClonerFillBatchForInsertLatencyMillis"; using MetricsPtr = std::unique_ptr<ReshardingMetrics>; @@ -94,6 +96,13 @@ Milliseconds remainingTime(Milliseconds elapsedTime, double elapsedWork, double return Milliseconds(Milliseconds::rep(remainingMsec)); } +void appendHistogram(BSONObjBuilder* bob, + const IntegerHistogram<kLatencyHistogramBucketsCount>& hist) { + BSONObjBuilder histogramBuilder; + hist.append(histogramBuilder, false); + bob->appendElements(histogramBuilder.obj()); +} + static StringData serializeState(boost::optional<RecipientStateEnum> e) { return RecipientState_serializer(*e); } @@ -221,8 +230,11 @@ public: int64_t chunkImbalanceCount = 0; - IntegerHistogram<kLatencyHistogramBucketsCount> oplogBatchApplyLatencyMillis = - IntegerHistogram<kLatencyHistogramBucketsCount>(kOplogBatchApplyLatencyMillis, + IntegerHistogram<kLatencyHistogramBucketsCount> oplogApplierApplyBatchLatencyMillis = + IntegerHistogram<kLatencyHistogramBucketsCount>(kOplogApplierApplyBatchLatencyMillis, + latencyHistogramBuckets); + IntegerHistogram<kLatencyHistogramBucketsCount> collClonerFillBatchForInsertLatencyMillis = + IntegerHistogram<kLatencyHistogramBucketsCount>(kCollClonerFillBatchForInsertLatencyMillis, latencyHistogramBuckets); // The ops done by resharding to keep up with the client writes. @@ -307,13 +319,8 @@ void ReshardingMetrics::OperationMetrics::appendCurrentOpMetrics(BSONObjBuilder* serializeState(recipientState.get_value_or(RecipientStateEnum::kUnused))); bob->append(kOpStatus, ReshardingOperationStatus_serializer(opStatus)); - { - BSONObjBuilder histogramBuilder; - oplogBatchApplyLatencyMillis.append(histogramBuilder, false); - BSONObj histogram = histogramBuilder.obj(); - bob->appendElements(histogram); - } - + appendHistogram(bob, oplogApplierApplyBatchLatencyMillis); + appendHistogram(bob, collClonerFillBatchForInsertLatencyMillis); break; case Role::kCoordinator: bob->append(kCoordinatorState, serializeState(coordinatorState)); @@ -568,14 +575,27 @@ void ReshardingMetrics::gotInserts(int n) noexcept { _cumulativeOp->gotInserts(n); } -void ReshardingMetrics::onApplyOplogBatch(Milliseconds latency) { +void ReshardingMetrics::onOplogApplierApplyBatch(Milliseconds latency) { stdx::lock_guard<Latch> lk(_mutex); invariant(_currentOp, kNoOperationInProgress); invariant(checkState(*_currentOp->recipientState, {RecipientStateEnum::kApplying, RecipientStateEnum::kError})); - _currentOp->oplogBatchApplyLatencyMillis.increment(durationCount<Milliseconds>(latency)); - _cumulativeOp->oplogBatchApplyLatencyMillis.increment(durationCount<Milliseconds>(latency)); + _currentOp->oplogApplierApplyBatchLatencyMillis.increment(durationCount<Milliseconds>(latency)); + _cumulativeOp->oplogApplierApplyBatchLatencyMillis.increment( + durationCount<Milliseconds>(latency)); +} + +void ReshardingMetrics::onCollClonerFillBatchForInsert(Milliseconds latency) { + stdx::lock_guard<Latch> lk(_mutex); + invariant(_currentOp, kNoOperationInProgress); + invariant(checkState(*_currentOp->recipientState, + {RecipientStateEnum::kCloning, RecipientStateEnum::kError})); + + _currentOp->collClonerFillBatchForInsertLatencyMillis.increment( + durationCount<Milliseconds>(latency)); + _cumulativeOp->collClonerFillBatchForInsertLatencyMillis.increment( + durationCount<Milliseconds>(latency)); } void ReshardingMetrics::gotInsert() noexcept { @@ -760,12 +780,8 @@ void ReshardingMetrics::serializeCumulativeOpMetrics(BSONObjBuilder* bob) const bob->append(kMaxRemainingOperationTime, getRemainingOperationTime(ops.maxRemainingOperationTime)); - { - BSONObjBuilder histogramBuilder; - ops.oplogBatchApplyLatencyMillis.append(histogramBuilder, false); - BSONObj histogram = histogramBuilder.obj(); - bob->appendElements(histogram); - } + appendHistogram(bob, ops.oplogApplierApplyBatchLatencyMillis); + appendHistogram(bob, ops.collClonerFillBatchForInsertLatencyMillis); } Date_t ReshardingMetrics::_now() const { diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h index c8ce05a88dd..480dd25ed8a 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.h +++ b/src/mongo/db/s/resharding/resharding_metrics.h @@ -104,8 +104,12 @@ public: void enterCriticalSection(Date_t start); void leaveCriticalSection(Date_t end); - // Records latency and throughput of batch oplog applies during resharding. - void onApplyOplogBatch(Milliseconds latency); + // Records latency and throughput of calls to ReshardingOplogApplier::_applyBatch + void onOplogApplierApplyBatch(Milliseconds latency); + + // Records latency and throughput of calls to resharding::data_copy::fillBatchForInsert + // in ReshardingCollectionCloner::doOneBatch + void onCollClonerFillBatchForInsert(Milliseconds latency); // Allows updating "oplog entries to apply" metrics when the recipient is in applying state. void onOplogEntriesFetched(int64_t entries) noexcept; diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp index 77b8be78fb2..4b979936f51 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp @@ -49,7 +49,9 @@ namespace { using namespace fmt::literals; constexpr auto kOpTimeRemaining = "remainingOperationTimeEstimatedSecs"_sd; -constexpr auto kOplogBatchApplyLatencyMillis = "oplogBatchApplyLatencyMillis"; +constexpr auto kOplogApplierApplyBatchLatencyMillis = "oplogApplierApplyBatchLatencyMillis"; +constexpr auto kCollClonerFillBatchForInsertLatencyMillis = + "collClonerFillBatchForInsertLatencyMillis"; class ReshardingMetricsTest : public ServiceContextTest { public: @@ -129,15 +131,18 @@ public: << fmt::format("{}: {}", errMsg, report.toString()); }; - void appendExpectedHistogramResult(BSONObjBuilder& builder, + void appendExpectedHistogramResult(BSONObjBuilder* bob, + std::string tag, const std::vector<int64_t>& latencies) { - IntegerHistogram<kLatencyHistogramBucketsCount> hist(kOplogBatchApplyLatencyMillis, - latencyHistogramBuckets); + IntegerHistogram<kLatencyHistogramBucketsCount> hist(tag, latencyHistogramBuckets); for (size_t i = 0; i < latencies.size(); i++) { hist.increment(latencies[i]); } - hist.append(builder, false); + BSONObjBuilder histogramBuilder; + hist.append(histogramBuilder, false); + BSONObj histogram = histogramBuilder.obj(); + bob->appendElements(histogram); } private: @@ -589,21 +594,34 @@ TEST_F(ReshardingMetricsTest, CurrentOpReportForRecipient) { BSONObjBuilder expectedBuilder(std::move(expectedPrefix)); - // Append histogram data for batch oplog applies. - appendExpectedHistogramResult(expectedBuilder, {}); - BSONObj expected = expectedBuilder.obj(); + // Append histogram latency data. + appendExpectedHistogramResult(&expectedBuilder, kOplogApplierApplyBatchLatencyMillis, {}); + appendExpectedHistogramResult(&expectedBuilder, kCollClonerFillBatchForInsertLatencyMillis, {}); + + BSONObj expected = expectedBuilder.done(); const auto report = getMetrics()->reportForCurrentOp(options); ASSERT_BSONOBJ_EQ(expected, report); } -TEST_F(ReshardingMetricsTest, TestOplogBatchLatencyHistogramForRecipient) { - const std::vector<int64_t> latencies_1{3, 427, 0, 6004, 320, 10056, 12300, 105, 70}; - const std::vector<int64_t> latencies_2{800, 20, 5, 1025, 10567}; +TEST_F(ReshardingMetricsTest, TestHistogramMetricsForRecipient) { + const std::vector<int64_t> applyLatencies_1{3, 427, 0, 6004, 320, 10056, 12300, 105, 70}; + const std::vector<int64_t> applyLatencies_2{800, 20, 5, 1025, 10567}; + const std::vector<int64_t> insertLatencies_1{120, 7, 110, 50, 0, 16500, 77000, 667, 7980}; + const std::vector<int64_t> insertLatencies_2{12450, 2400, 760, 57, 2}; + + const auto combineLatencies = [](std::vector<int64_t>* allLatencies, + const std::vector<int64_t>& latencies_1, + const std::vector<int64_t>& latencies_2) { + allLatencies->insert(allLatencies->end(), latencies_1.begin(), latencies_1.end()); + allLatencies->insert(allLatencies->end(), latencies_2.begin(), latencies_2.end()); + }; + + std::vector<int64_t> allApplyLatencies; + combineLatencies(&allApplyLatencies, applyLatencies_1, applyLatencies_2); + std::vector<int64_t> allInsertLatencies; + combineLatencies(&allInsertLatencies, insertLatencies_1, insertLatencies_2); - std::vector<int64_t> allLatencies; - allLatencies.insert(allLatencies.end(), latencies_1.begin(), latencies_1.end()); - allLatencies.insert(allLatencies.end(), latencies_2.begin(), latencies_2.end()); const ReshardingMetrics::ReporterOptions options( ReshardingMetrics::Role::kRecipient, @@ -612,31 +630,54 @@ TEST_F(ReshardingMetricsTest, TestOplogBatchLatencyHistogramForRecipient) { BSON("id" << 1), false); - // First test that histogram metrics appear in currentOp. Next, test that histogram metrics - // accumulate in cumulativeOp. - const size_t kNumTests = 2; - std::vector<int64_t> testLatencies[kNumTests] = {latencies_1, latencies_2}; - std::vector<int64_t> expectedLatencies[kNumTests] = {latencies_1, allLatencies}; + // Test that all histogram metrics appear in both currentOp and cumulativeOp. + const size_t kNumTests = 4; + std::vector<int64_t> testLatencies[kNumTests] = { + applyLatencies_1, applyLatencies_2, insertLatencies_1, insertLatencies_2}; + std::vector<int64_t> expectedLatencies[kNumTests] = { + applyLatencies_1, allApplyLatencies, insertLatencies_1, allInsertLatencies}; OpReportType testReportTypes[kNumTests] = {OpReportType::CurrentOpReportRecipientRole, + OpReportType::CumulativeReport, + OpReportType::CurrentOpReportRecipientRole, OpReportType::CumulativeReport}; + std::string histogramTag[kNumTests] = {kOplogApplierApplyBatchLatencyMillis, + kOplogApplierApplyBatchLatencyMillis, + kCollClonerFillBatchForInsertLatencyMillis, + kCollClonerFillBatchForInsertLatencyMillis}; auto testLatencyHistogram = [&](std::vector<int64_t> latencies, OpReportType reportType, - std::vector<int64_t> expectedLatencies) { + std::vector<int64_t> expectedLatencies, + std::string histogramTag) { + LOGV2(57700, + "TestHistogramMetricsForRecipient test case", + "reportType"_attr = reportType, + "histogramTag"_attr = histogramTag); + startOperation(ReshardingMetrics::Role::kRecipient); - getMetrics()->setRecipientState(RecipientStateEnum::kApplying); + RecipientStateEnum state = (histogramTag.compare(kOplogApplierApplyBatchLatencyMillis) == 0 + ? RecipientStateEnum::kApplying + : RecipientStateEnum::kCloning); + getMetrics()->setRecipientState(state); + for (size_t i = 0; i < latencies.size(); i++) { - getMetrics()->onApplyOplogBatch(Milliseconds(latencies[i])); + if (histogramTag.compare(kOplogApplierApplyBatchLatencyMillis) == 0) { + getMetrics()->onOplogApplierApplyBatch(Milliseconds(latencies[i])); + } else if (histogramTag.compare(kCollClonerFillBatchForInsertLatencyMillis) == 0) { + getMetrics()->onCollClonerFillBatchForInsert(Milliseconds(latencies[i])); + } else { + MONGO_UNREACHABLE; + } } const auto report = getReport(reportType); - const auto buckets = report[kOplogBatchApplyLatencyMillis]; + const auto buckets = report[histogramTag]; BSONObjBuilder expectedBuilder; - appendExpectedHistogramResult(expectedBuilder, expectedLatencies); - const auto expectedHist = expectedBuilder.obj(); - const auto expectedBuckets = expectedHist[kOplogBatchApplyLatencyMillis]; + appendExpectedHistogramResult(&expectedBuilder, histogramTag, expectedLatencies); + const auto expectedHist = expectedBuilder.done(); + const auto expectedBuckets = expectedHist[histogramTag]; ASSERT_EQ(buckets.woCompare(expectedBuckets), 0); @@ -645,7 +686,8 @@ TEST_F(ReshardingMetricsTest, TestOplogBatchLatencyHistogramForRecipient) { }; for (size_t i = 0; i < kNumTests; i++) { - testLatencyHistogram(testLatencies[i], testReportTypes[i], expectedLatencies[i]); + testLatencyHistogram( + testLatencies[i], testReportTypes[i], expectedLatencies[i], histogramTag[i]); } } diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index 06ce65384c8..ad2f2e399ad 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -116,7 +116,8 @@ SemiFuture<void> ReshardingOplogApplier::_applyBatch( return status; }) .onCompletion([this, latencyTimer](Status status) { - _env->metrics()->onApplyOplogBatch(duration_cast<Milliseconds>(latencyTimer.elapsed())); + _env->metrics()->onOplogApplierApplyBatch( + duration_cast<Milliseconds>(latencyTimer.elapsed())); return status; }) .semi(); |