diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2021-02-04 15:04:16 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-04 18:20:31 +0000 |
commit | 62144f007125d311751b8221c2337dfa6a84f357 (patch) | |
tree | 86b3de034d36250c34f376f97b59688e48e921c9 | |
parent | 3b4c40b136da419512bf6501655473db552efb11 (diff) | |
download | mongo-62144f007125d311751b8221c2337dfa6a84f357.tar.gz |
SERVER-50978 Add currentOp reporting for ReshardingDonorService Instances
-rw-r--r-- | jstests/sharding/resharding_metrics.js | 158 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_service.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_service.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_metrics.cpp | 142 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_metrics.h | 24 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_metrics_test.cpp | 45 |
6 files changed, 323 insertions, 65 deletions
diff --git a/jstests/sharding/resharding_metrics.js b/jstests/sharding/resharding_metrics.js index f8a0b1031a5..051a809b379 100644 --- a/jstests/sharding/resharding_metrics.js +++ b/jstests/sharding/resharding_metrics.js @@ -1,42 +1,152 @@ /** * Tests the basic functionality of the resharding metrics section in server status. * - * @tags: [requires_fcv_47] + * @tags: [ + * requires_fcv_49, + * uses_atclustertime, + * ] */ (function() { 'use strict'; -const kDbName = "resharding_metrics"; +load("jstests/libs/discover_topology.js"); +load("jstests/sharding/libs/resharding_test_fixture.js"); -function testMetricsArePresent(mongo) { - const stats = mongo.getDB(kDbName).serverStatus({}); - assert(stats.hasOwnProperty('shardingStatistics'), stats); - const shardingStats = stats.shardingStatistics; - assert(shardingStats.hasOwnProperty('resharding'), - `Missing resharding section in ${tojson(shardingStats)}`); +const kNamespace = "reshardingDb.coll"; - function verifyMetric(metrics, tag, expectedValue) { - assert(metrics.hasOwnProperty(tag), `Missing ${tag} in ${tojson(metrics)}`); - assert.eq(metrics[tag], +function verifyMetrics(metrics, expected) { + for (var key in expected) { + assert(metrics.hasOwnProperty(key), `Missing ${key} in ${tojson(metrics)}`); + const expectedValue = expected[key]; + // The contract for this method is to treat `undefined` as an indication for non-important + // or non-deterministic values. + if (expectedValue === undefined) + continue; + assert.eq(metrics[key], expectedValue, - `Expected the value for ${tag} to be ${expectedValue}: ${tojson(metrics)}`); + `Expected the value for ${key} to be ${expectedValue}: ${tojson(metrics)}`); + } +} + +function verifyServerStatusOutput(reshardingTest, inputCollection) { + function testMetricsArePresent(mongo) { + const stats = mongo.getDB('admin').serverStatus({}); + assert(stats.hasOwnProperty('shardingStatistics'), stats); + const shardingStats = stats.shardingStatistics; + assert(shardingStats.hasOwnProperty('resharding'), + `Missing resharding section in ${tojson(shardingStats)}`); + + const metrics = shardingStats.resharding; + verifyMetrics(metrics, { + "successfulOperations": 0, + "failedOperations": 0, + "canceledOperations": 0, + "documentsCopied": 0, + "bytesCopied": 0, + "oplogEntriesApplied": 0, + "countWritesDuringCriticalSection": 0, + }); + } + + const donorShardNames = reshardingTest.donorShardNames; + const recipientShardNames = reshardingTest.recipientShardNames; + + const mongos = inputCollection.getMongo(); + const topology = DiscoverTopology.findConnectedNodes(mongos); + + testMetricsArePresent(new Mongo(topology.shards[donorShardNames[0]].primary)); + testMetricsArePresent(new Mongo(topology.shards[donorShardNames[1]].primary)); + testMetricsArePresent(new Mongo(topology.shards[recipientShardNames[0]].primary)); + testMetricsArePresent(new Mongo(topology.shards[recipientShardNames[1]].primary)); + testMetricsArePresent(new Mongo(topology.configsvr.nodes[0])); +} + +// Tests the currentOp output for each donor, each recipient, and the coordinator. +function checkCurrentOp(mongo, clusterName, role, expected) { + function getCurrentOpReport(mongo, role) { + return mongo.getDB("admin").currentOp( + {ns: kNamespace, desc: {$regex: 'Resharding' + role + 'Service.*'}}); } - const metrics = shardingStats.resharding; - verifyMetric(metrics, "successfulOperations", 0); - verifyMetric(metrics, "failedOperations", 0); - verifyMetric(metrics, "canceledOperations", 0); - verifyMetric(metrics, "documentsCopied", 0); - verifyMetric(metrics, "bytesCopied", 0); - verifyMetric(metrics, "oplogEntriesApplied", 0); - verifyMetric(metrics, "countWritesDuringCriticalSection", 0); + jsTest.log(`Testing currentOp output for ${role}s on ${clusterName}`); + assert.soon(() => { + const report = getCurrentOpReport(mongo, role); + if (report.inprog.length === 1) + return true; + + jsTest.log(tojson(report)); + return false; + }, () => `: was unable to find resharding ${role} service in currentOp output`); + + verifyMetrics(getCurrentOpReport(mongo, role).inprog[0], expected); +} + +function verifyCurrentOpOutput(reshardingTest, inputCollection) { + // Wait for the resharding operation and the donor services to start. + const mongos = inputCollection.getMongo(); + assert.soon(() => { + const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ + nss: inputCollection.getFullName() + }); + return coordinatorDoc !== null && coordinatorDoc.fetchTimestamp !== undefined; + }); + + const topology = DiscoverTopology.findConnectedNodes(mongos); + + reshardingTest.donorShardNames.forEach(function(shardName) { + checkCurrentOp(new Mongo(topology.shards[shardName].primary), shardName, "Donor", { + "type": "op", + "op": "command", + "ns": kNamespace, + "originatingCommand": undefined, + "totalOperationTimeElapsed": undefined, + "remainingOperationTimeEstimated": undefined, + "countWritesDuringCriticalSection": 0, + "totalCriticalSectionTimeElapsed": undefined, + "donorState": undefined, + "opStatus": "actively running", + }); + }); + + // TODO SERVER-51021 verify currentOp output for recipients + // TODO SERVER-50976 verify currentOp output for the coordinator } -const st = new ShardingTest({mongos: 1, config: 1, shards: 1, rs: {nodes: 1}}); +const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 2, reshardInPlace: true}); +reshardingTest.setup(); + +const donorShardNames = reshardingTest.donorShardNames; +const inputCollection = reshardingTest.createShardedCollection({ + ns: kNamespace, + shardKeyPattern: {oldKey: 1}, + chunks: [ + {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]}, + {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]}, + ], +}); + +verifyServerStatusOutput(reshardingTest, inputCollection); + +assert.commandWorked(inputCollection.insert([ + {_id: "stays on shard0", oldKey: -10, newKey: -10}, + {_id: "moves to shard0", oldKey: 10, newKey: -10}, + {_id: "moves to shard1", oldKey: -10, newKey: 10}, + {_id: "stays on shard1", oldKey: 10, newKey: 10}, +])); -testMetricsArePresent(st.rs0.getPrimary()); -testMetricsArePresent(st.configRS.getPrimary()); +const recipientShardNames = reshardingTest.recipientShardNames; +reshardingTest.withReshardingInBackground( // + { + newShardKeyPattern: {newKey: 1}, + newChunks: [ + {min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]}, + {min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]}, + ], + }, + (tempNs) => { + verifyCurrentOpOutput(reshardingTest, inputCollection); + }); -st.stop(); +reshardingTest.teardown(); })(); diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 3a078effef2..d1f825f0f54 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -43,6 +43,7 @@ #include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" +#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/s/sharding_state.h" @@ -221,6 +222,17 @@ void ReshardingDonorService::DonorStateMachine::interrupt(Status status) { } } +boost::optional<BSONObj> ReshardingDonorService::DonorStateMachine::reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode connMode, + MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept { + ReshardingMetrics::ReporterOptions options(ReshardingMetrics::ReporterOptions::Role::kDonor, + _id, + _donorDoc.getNss(), + _donorDoc.getReshardingKey().toBSON(), + false); + return ReshardingMetrics::get(cc().getServiceContext())->reportForCurrentOp(options); +} + void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( const TypeCollectionReshardingFields& reshardingFields) { auto coordinatorState = reshardingFields.getState(); diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index 827341eb5b7..8f3d401e7e3 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -86,14 +86,9 @@ public: return _completionPromise.getFuture(); } - /** - * TODO(SERVER-50978) Report ReshardingDonorService Instances in currentOp(). - */ boost::optional<BSONObj> reportForCurrentOp( MongoProcessInterface::CurrentOpConnectionsMode connMode, - MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override { - return boost::none; - } + MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override; void onReshardingFieldsChanges(const TypeCollectionReshardingFields& reshardingFields); diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 9b56809832b..6e4833813d1 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -42,18 +42,18 @@ constexpr auto kNoOperationInProgress = "No operation is in progress"; constexpr auto kSuccessfulOps = "successfulOperations"; constexpr auto kFailedOps = "failedOperations"; constexpr auto kCanceledOps = "canceledOperations"; -constexpr auto kOpTimeElapsed = "totalOperationTimeElapsedMillis"; -constexpr auto kOpTimeRemaining = "remainingOperationTimeEstimatedMillis"; +constexpr auto kOpTimeElapsed = "totalOperationTimeElapsed"; +constexpr auto kOpTimeRemaining = "remainingOperationTimeEstimated"; constexpr auto kDocumentsToCopy = "approxDocumentsToCopy"; constexpr auto kDocumentsCopied = "documentsCopied"; constexpr auto kBytesToCopy = "approxBytesToCopy"; constexpr auto kBytesCopied = "bytesCopied"; -constexpr auto kCopyTimeElapsed = "totalCopyTimeElapsedMillis"; +constexpr auto kCopyTimeElapsed = "totalCopyTimeElapsed"; constexpr auto kOplogsFetched = "oplogEntriesFetched"; constexpr auto kOplogsApplied = "oplogEntriesApplied"; -constexpr auto kApplyTimeElapsed = "totalApplyTimeElapsedMillis"; +constexpr auto kApplyTimeElapsed = "totalApplyTimeElapsed"; constexpr auto kWritesDuringCritialSection = "countWritesDuringCriticalSection"; -constexpr auto kCriticalSectionTimeElapsed = "totalCriticalSectionTimeElapsedMillis"; +constexpr auto kCriticalSectionTimeElapsed = "totalCriticalSectionTimeElapsed"; constexpr auto kCoordinatorState = "coordinatorState"; constexpr auto kDonorState = "donorState"; constexpr auto kRecipientState = "recipientState"; @@ -218,9 +218,26 @@ Milliseconds ReshardingMetrics::OperationMetrics::TimeInterval::duration() const return duration_cast<Milliseconds>(_end.value() - _start.value()); } -void ReshardingMetrics::OperationMetrics::append(BSONObjBuilder* bob) const { - auto getElapsedTime = [](const TimeInterval& interval) -> int64_t { - return durationCount<Milliseconds>(interval.duration()); +std::string OperationStatus_serializer(const ReshardingMetrics::OperationStatus& status) noexcept { + switch (status) { + case ReshardingMetrics::OperationStatus::kUnknown: + return "actively running"; + case ReshardingMetrics::OperationStatus::kSucceeded: + return "success"; + case ReshardingMetrics::OperationStatus::kFailed: + return "failure"; + case ReshardingMetrics::OperationStatus::kCanceled: + return "canceled"; + } + MONGO_UNREACHABLE; +} + +void ReshardingMetrics::OperationMetrics::append(BSONObjBuilder* bob, Role role) const { + auto getElapsedTime = [role](const TimeInterval& interval) -> int64_t { + if (role == Role::kAll) + return durationCount<Milliseconds>(interval.duration()); + else + return durationCount<Seconds>(interval.duration()); }; auto estimateRemainingOperationTime = [&]() -> int64_t { @@ -242,43 +259,106 @@ void ReshardingMetrics::OperationMetrics::append(BSONObjBuilder* bob) const { } }; - bob->append(kOpTimeElapsed, getElapsedTime(runningOperation)); - bob->append(kOpTimeRemaining, estimateRemainingOperationTime()); - - bob->append(kDocumentsToCopy, documentsToCopy); - bob->append(kDocumentsCopied, documentsCopied); - bob->append(kBytesToCopy, bytesToCopy); - bob->append(kBytesCopied, bytesCopied); - bob->append(kCopyTimeElapsed, getElapsedTime(copyingDocuments)); + const std::string kIntervalSuffix = role == Role::kAll ? "Millis" : ""; + bob->append(kOpTimeElapsed + kIntervalSuffix, getElapsedTime(runningOperation)); + bob->append(kOpTimeRemaining + kIntervalSuffix, estimateRemainingOperationTime()); - bob->append(kOplogsFetched, oplogEntriesFetched); - bob->append(kOplogsApplied, oplogEntriesApplied); - bob->append(kApplyTimeElapsed, getElapsedTime(applyingOplogEntries)); + if (role == Role::kAll || role == Role::kRecipient) { + bob->append(kDocumentsToCopy, documentsToCopy); + bob->append(kDocumentsCopied, documentsCopied); + bob->append(kBytesToCopy, bytesToCopy); + bob->append(kBytesCopied, bytesCopied); + bob->append(kCopyTimeElapsed + kIntervalSuffix, getElapsedTime(copyingDocuments)); - bob->append(kWritesDuringCritialSection, writesDuringCriticalSection); - bob->append(kCriticalSectionTimeElapsed, getElapsedTime(inCriticalSection)); + bob->append(kOplogsFetched, oplogEntriesFetched); + bob->append(kOplogsApplied, oplogEntriesApplied); + bob->append(kApplyTimeElapsed + kIntervalSuffix, getElapsedTime(applyingOplogEntries)); + } - bob->append(kDonorState, donorState); - bob->append(kRecipientState, recipientState); - bob->append(kCoordinatorState, coordinatorState); + if (role == Role::kAll || role == Role::kDonor) { + bob->append(kWritesDuringCritialSection, writesDuringCriticalSection); + bob->append(kCriticalSectionTimeElapsed + kIntervalSuffix, + getElapsedTime(inCriticalSection)); + } - bob->append(kCompletionStatus, completionStatus.value_or(OperationStatus::kUnknown)); + const auto operationStatus = completionStatus.value_or(OperationStatus::kUnknown); + switch (role) { + case Role::kDonor: + bob->append(kDonorState, DonorState_serializer(donorState)); + bob->append(kCompletionStatus, OperationStatus_serializer(operationStatus)); + break; + case Role::kRecipient: + // TODO SERVER-51021 + MONGO_UNREACHABLE; + break; + case Role::kCoordinator: + // TODO SERVER-50976 + MONGO_UNREACHABLE; + case Role::kAll: + bob->append(kDonorState, donorState); + bob->append(kRecipientState, recipientState); + bob->append(kCoordinatorState, coordinatorState); + bob->append(kCompletionStatus, operationStatus); + break; + default: + MONGO_UNREACHABLE; + } } -void ReshardingMetrics::serialize(BSONObjBuilder* bob) const { +void ReshardingMetrics::serialize(BSONObjBuilder* bob, ReporterOptions::Role role) const { stdx::lock_guard<Latch> lk(_mutex); - bob->append(kSuccessfulOps, _succeeded); - bob->append(kFailedOps, _failed); - bob->append(kCanceledOps, _canceled); + if (role == ReporterOptions::Role::kAll) { + bob->append(kSuccessfulOps, _succeeded); + bob->append(kFailedOps, _failed); + bob->append(kCanceledOps, _canceled); + } if (_currentOp) { - _currentOp->append(bob); + _currentOp->append(bob, role); } else { // There are no resharding operations in progress, so report the default metrics. OperationMetrics opMetrics(_svcCtx->getFastClockSource()); - opMetrics.append(bob); + opMetrics.append(bob, role); } } +BSONObj ReshardingMetrics::reportForCurrentOp(const ReporterOptions& options) const noexcept { + const auto role = [&options] { + switch (options.role) { + case ReporterOptions::Role::kDonor: + return "Donor"; + case ReporterOptions::Role::kRecipient: + return "Recipient"; + case ReporterOptions::Role::kCoordinator: + return "Coordinator"; + default: + MONGO_UNREACHABLE; + } + }(); + + const auto originatingCommand = + [&options] { + BSONObjBuilder bob; + bob.append("reshardCollection", options.nss.toString()); + bob.append("key", options.shardKey); + bob.append("unique", options.unique); + bob.append("collation", + BSON("locale" + << "simple")); + return bob.obj(); + }(); + + BSONObjBuilder bob; + bob.append("type", "op"); + bob.append("desc", fmt::format("Resharding{}Service {}", role, options.id.toString())); + bob.append("op", "command"); + bob.append("ns", options.nss.toString()); + bob.append("originatingCommand", originatingCommand); + + serialize(&bob, options.role); + + return bob.obj(); +} + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h index 803bf2aacf8..2f0ce2fbc16 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.h +++ b/src/mongo/db/s/resharding/resharding_metrics.h @@ -31,11 +31,13 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/service_context.h" #include "mongo/platform/mutex.h" #include "mongo/s/resharding/common_types_gen.h" #include "mongo/util/clock_source.h" #include "mongo/util/duration.h" +#include "mongo/util/uuid.h" namespace mongo { @@ -79,7 +81,24 @@ public: enum class OperationStatus { kUnknown = -1, kSucceeded = 0, kFailed = 1, kCanceled = 2 }; void onCompletion(OperationStatus) noexcept; - void serialize(BSONObjBuilder*) const; + struct ReporterOptions { + enum class Role { kAll, kDonor, kRecipient, kCoordinator }; + ReporterOptions(Role role, UUID id, NamespaceString nss, BSONObj shardKey, bool unique) + : role(role), + id(std::move(id)), + nss(std::move(nss)), + shardKey(std::move(shardKey)), + unique(unique) {} + + const Role role; + const UUID id; + const NamespaceString nss; + const BSONObj shardKey; + const bool unique; + }; + BSONObj reportForCurrentOp(const ReporterOptions& options) const noexcept; + + void serialize(BSONObjBuilder*, ReporterOptions::Role role = ReporterOptions::Role::kAll) const; private: ServiceContext* const _svcCtx; @@ -119,7 +138,8 @@ private: applyingOplogEntries(clockSource), inCriticalSection(clockSource) {} - void append(BSONObjBuilder*) const; + using Role = ReporterOptions::Role; + void append(BSONObjBuilder*, Role) const; bool isCompleted() const noexcept { return completionStatus.has_value(); diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp index cc790e79081..eea7789e682 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp @@ -30,11 +30,13 @@ #include <fmt/format.h> #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/json.h" #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/service_context_test_fixture.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" +#include "mongo/util/uuid.h" namespace mongo { @@ -53,8 +55,8 @@ public: // Timer step in milliseconds static constexpr auto kTimerStep = 100; - void advanceTime() { - _clockSource->advance(Milliseconds(kTimerStep)); + void advanceTime(Milliseconds interval = Milliseconds(kTimerStep)) { + _clockSource->advance(interval); } auto getReport() { @@ -215,4 +217,43 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) { checkMetrics(kTag, kTimerStep * (kOplogEntriesFetched / kOplogEntriesApplied - 1)); } +TEST_F(ReshardingMetricsTest, CurrentOpReportForDonor) { + const auto kDonorState = DonorStateEnum::kPreparingToMirror; + getMetrics()->onStart(); + advanceTime(Seconds(2)); + getMetrics()->setDonorState(kDonorState); + advanceTime(Seconds(3)); + + const ReshardingMetrics::ReporterOptions options( + ReshardingMetrics::ReporterOptions::Role::kDonor, + UUID::parse("12345678-1234-1234-1234-123456789abc").getValue(), + NamespaceString("db", "collection"), + BSON("id" << 1), + true); + + const auto expected = + fromjson(fmt::format("{{ type: \"op\"," + "desc: \"ReshardingDonorService {0}\"," + "op: \"command\"," + "ns: \"{1}\"," + "originatingCommand: {{ reshardCollection: \"{1}\"," + "key: {2}," + "unique: {3}," + "collation: {{ locale: \"simple\" }} }}," + "totalOperationTimeElapsed: 5," + "remainingOperationTimeEstimated: -1," + "countWritesDuringCriticalSection: 0," + "totalCriticalSectionTimeElapsed : 3," + "donorState: \"{4}\"," + "opStatus: \"actively running\" }}", + options.id.toString(), + options.nss.toString(), + options.shardKey.toString(), + options.unique ? "true" : "false", + DonorState_serializer(kDonorState))); + + const auto report = getMetrics()->reportForCurrentOp(options); + ASSERT_BSONOBJ_EQ(expected, report); +} + } // namespace mongo |