diff options
author | Randolph Tan <randolph@10gen.com> | 2022-04-05 21:11:01 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-18 16:56:18 +0000 |
commit | beb7e25a768721018f10693fdefe55656b307d4e (patch) | |
tree | c5d12fc51bad48c2d66360a633a9bb96bb9f5edf /src/mongo/db/s/resharding | |
parent | c08573de6d48f18e3772b083a8e5e3f85cb3b47b (diff) | |
download | mongo-beb7e25a768721018f10693fdefe55656b307d4e.tar.gz |
SERVER-63623 Implemented Basic Global Index Metrics Restoration For CrudOp Metrics
Diffstat (limited to 'src/mongo/db/s/resharding')
19 files changed, 449 insertions, 46 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp index a5918e3ea77..b6c0686872f 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp @@ -164,7 +164,7 @@ std::shared_ptr<executor::TaskExecutor> ReshardingDataReplication::_makeOplogFet std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::_makeOplogAppliers( OperationContext* opCtx, ReshardingMetrics* metrics, - ReshardingMetricsNew* metricsNew, + ReshardingApplierMetricsMap* applierMetricsMap, const CommonReshardingMetadata& metadata, const std::vector<DonorShardFetchTimestamp>& donorShards, Timestamp cloneTimestamp, @@ -175,18 +175,19 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication:: oplogAppliers.reserve(donorShards.size()); for (size_t i = 0; i < donorShards.size(); ++i) { - auto sourceId = - ReshardingSourceId{metadata.getReshardingUUID(), donorShards[i].getShardId()}; + const auto donorShardId = donorShards[i].getShardId(); + auto sourceId = ReshardingSourceId{metadata.getReshardingUUID(), donorShardId}; auto minFetchTimestamp = *donorShards[i].getMinFetchTimestamp(); auto idToResumeFrom = getOplogApplierResumeId(opCtx, sourceId, minFetchTimestamp); invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp})); const auto& oplogBufferNss = - getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShards[i].getShardId()); + getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShardId); + auto applierMetrics = (*applierMetricsMap)[donorShardId].get(); oplogAppliers.emplace_back(std::make_unique<ReshardingOplogApplier>( std::make_unique<ReshardingOplogApplier::Env>( - opCtx->getServiceContext(), metrics, metricsNew), + opCtx->getServiceContext(), metrics, applierMetrics), std::move(sourceId), oplogBufferNss, metadata.getTempReshardingNss(), @@ -206,7 +207,7 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication:: std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::make( OperationContext* opCtx, ReshardingMetrics* metrics, - ReshardingMetricsNew* metricsNew, + ReshardingApplierMetricsMap* applierMetricsMap, CommonReshardingMetadata metadata, const std::vector<DonorShardFetchTimestamp>& donorShards, Timestamp cloneTimestamp, @@ -228,7 +229,7 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m auto stashCollections = ensureStashCollectionsExist(opCtx, sourceChunkMgr, donorShards); auto oplogAppliers = _makeOplogAppliers(opCtx, metrics, - metricsNew, + applierMetricsMap, metadata, donorShards, cloneTimestamp, diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h index a2705abd6f6..7fe8ee023eb 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.h +++ b/src/mongo/db/s/resharding/resharding_data_replication.h @@ -36,7 +36,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/cancelable_operation_context.h" #include "mongo/db/s/resharding/donor_oplog_id_gen.h" -#include "mongo/db/s/resharding/resharding_metrics_new.h" +#include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/resharding/common_types_gen.h" #include "mongo/s/shard_id.h" @@ -61,6 +61,9 @@ class TaskExecutor; } // namespace executor +using ReshardingApplierMetricsMap = + std::map<ShardId, std::unique_ptr<ReshardingOplogApplierMetrics>>; + /** * Manages the full sequence of data replication in resharding on the recipient. * @@ -139,7 +142,7 @@ public: static std::unique_ptr<ReshardingDataReplicationInterface> make( OperationContext* opCtx, ReshardingMetrics* metrics, - ReshardingMetricsNew* metricsNew, + ReshardingApplierMetricsMap* applierMetricsMap, CommonReshardingMetadata metadata, const std::vector<DonorShardFetchTimestamp>& donorShards, Timestamp cloneTimestamp, @@ -215,7 +218,7 @@ private: static std::vector<std::unique_ptr<ReshardingOplogApplier>> _makeOplogAppliers( OperationContext* opCtx, ReshardingMetrics* metrics, - ReshardingMetricsNew* metricsNew, + ReshardingApplierMetricsMap* applierMetricsMap, const CommonReshardingMetadata& metadata, const std::vector<DonorShardFetchTimestamp>& donorShards, Timestamp cloneTimestamp, diff --git a/src/mongo/db/s/resharding/resharding_metrics_new.cpp b/src/mongo/db/s/resharding/resharding_metrics_new.cpp index 1d2f883d46c..ff0cfc3312f 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_new.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_new.cpp @@ -137,4 +137,12 @@ StringData ReshardingMetricsNew::getStateString() const noexcept { _state.load()); } +void ReshardingMetricsNew::accumulateFrom(const ReshardingOplogApplierProgress& progressDoc) { + invariant(_role == Role::kRecipient); + + accumulateValues(progressDoc.getInsertsApplied(), + progressDoc.getUpdatesApplied(), + progressDoc.getDeletesApplied()); +} + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_metrics_new.h b/src/mongo/db/s/resharding/resharding_metrics_new.h index dc1f1e20bda..a9243e996c9 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_new.h +++ b/src/mongo/db/s/resharding/resharding_metrics_new.h @@ -32,6 +32,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/s/resharding/resharding_metrics_helpers.h" +#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h" #include "mongo/db/s/sharding_data_transform_instance_metrics.h" #include "mongo/util/uuid.h" @@ -88,6 +89,8 @@ public: _state.store(state); } + void accumulateFrom(const ReshardingOplogApplierProgress& progressDoc); + protected: virtual StringData getStateString() const noexcept override; diff --git a/src/mongo/db/s/resharding/resharding_metrics_new_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_new_test.cpp index fb055bb5e6b..6b3af444be6 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_new_test.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_new_test.cpp @@ -156,5 +156,20 @@ TEST_F(ReshardingMetricsTest, RestoresFromCoordinatorStateDocument) { CoordinatorState_serializer(state)); } +TEST_F(ReshardingMetricsTest, RestoresFromReshardingApplierProgressDocument) { + ReshardingOplogApplierProgress progressDoc; + progressDoc.setInsertsApplied(123); + progressDoc.setUpdatesApplied(456); + progressDoc.setDeletesApplied(789); + + auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient); + metrics->accumulateFrom(progressDoc); + auto report = metrics->reportForCurrentOp(); + + ASSERT_EQ(report.getIntField("insertsApplied"), 123); + ASSERT_EQ(report.getIntField("updatesApplied"), 456); + ASSERT_EQ(report.getIntField("deletesApplied"), 789); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index 08067f21131..21db06d883b 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -126,7 +126,7 @@ ReshardingOplogApplicationRules::ReshardingOplogApplicationRules( ShardId donorShardId, ChunkManager sourceChunkMgr, ReshardingMetrics* metrics, - ReshardingMetricsNew* metricsNew) + ReshardingOplogApplierMetrics* applierMetrics) : _outputNss(std::move(outputNss)), _allStashNss(std::move(allStashNss)), _myStashIdx(myStashIdx), @@ -134,7 +134,7 @@ ReshardingOplogApplicationRules::ReshardingOplogApplicationRules( _donorShardId(std::move(donorShardId)), _sourceChunkMgr(std::move(sourceChunkMgr)), _metrics(metrics), - _metricsNew(metricsNew) {} + _applierMetrics(applierMetrics) {} Status ReshardingOplogApplicationRules::applyOperation(OperationContext* opCtx, const repl::OplogEntry& op) const { @@ -175,21 +175,21 @@ Status ReshardingOplogApplicationRules::applyOperation(OperationContext* opCtx, _applyInsert_inlock( opCtx, autoCollOutput.getDb(), *autoCollOutput, *autoCollStash, op); if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onInsertApplied(); + _applierMetrics->onInsertApplied(); } break; case repl::OpTypeEnum::kUpdate: _applyUpdate_inlock( opCtx, autoCollOutput.getDb(), *autoCollOutput, *autoCollStash, op); if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onUpdateApplied(); + _applierMetrics->onUpdateApplied(); } break; case repl::OpTypeEnum::kDelete: _applyDelete_inlock( opCtx, autoCollOutput.getDb(), *autoCollOutput, *autoCollStash, op); if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onDeleteApplied(); + _applierMetrics->onDeleteApplied(); } break; default: @@ -272,7 +272,7 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCt invariant(ur.numMatched != 0); if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onWriteToStashedCollections(); + _applierMetrics->onWriteToStashedCollections(); } return; @@ -317,7 +317,7 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCt opCtx, InsertStatement(oField), nullptr /* nullOpDebug */, false /* fromMigrate */)); if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onWriteToStashedCollections(); + _applierMetrics->onWriteToStashedCollections(); } } @@ -373,7 +373,7 @@ void ReshardingOplogApplicationRules::_applyUpdate_inlock(OperationContext* opCt invariant(ur.numMatched != 0); if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onWriteToStashedCollections(); + _applierMetrics->onWriteToStashedCollections(); } return; @@ -451,7 +451,7 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt invariant(nDeleted != 0); if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onWriteToStashedCollections(); + _applierMetrics->onWriteToStashedCollections(); } return; @@ -539,7 +539,7 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt auto state = exec->getNext(&res, nullptr); if (ShardingDataTransformMetrics::isEnabled()) { - _metricsNew->onWriteToStashedCollections(); + _applierMetrics->onWriteToStashedCollections(); } if (PlanExecutor::ADVANCED == state) { diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h index 2f5c03a8592..5052627c81b 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.h +++ b/src/mongo/db/s/resharding/resharding_oplog_application.h @@ -41,7 +41,8 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/resharding/resharding_metrics_new.h" +#include "mongo/db/s/resharding/resharding_metrics.h" +#include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h" #include "mongo/s/chunk_manager.h" namespace mongo { @@ -49,7 +50,6 @@ class Collection; class CollectionPtr; class NamespaceString; class OperationContext; -class ReshardingMetrics; /** * Applies an operation from an oplog entry using special rules that apply to resharding. @@ -62,7 +62,7 @@ public: ShardId donorShardId, ChunkManager sourceChunkMgr, ReshardingMetrics* metrics, - ReshardingMetricsNew* metricsNew); + ReshardingOplogApplierMetrics* applierMetrics); const NamespaceString& getOutputNss() const { return _outputNss; @@ -122,7 +122,7 @@ private: const ChunkManager _sourceChunkMgr; ReshardingMetrics* _metrics; - ReshardingMetricsNew* _metricsNew; + ReshardingOplogApplierMetrics* _applierMetrics; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index 766ac67f266..3341bdcd9cc 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -69,7 +69,7 @@ ReshardingOplogApplier::ReshardingOplogApplier( _sourceId.getShardId(), std::move(sourceChunkMgr), _env->metrics(), - _env->metricsNew()}, + _env->applierMetrics()}, _sessionApplication{std::move(oplogBufferNss)}, _batchApplier{_crudApplication, _sessionApplication}, _oplogIter(std::move(oplogIterator)) {} @@ -233,13 +233,25 @@ void ReshardingOplogApplier::_clearAppliedOpsAndStoreProgress(OperationContext* BSON(ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName << static_cast<long long>(_currentBatchToApply.size()))); + if (ShardingDataTransformMetrics::isEnabled()) { + builder.append("$set", + BSON(ReshardingOplogApplierProgress::kInsertsAppliedFieldName + << _env->applierMetrics()->getInsertsApplied())); + builder.append("$set", + BSON(ReshardingOplogApplierProgress::kUpdatesAppliedFieldName + << _env->applierMetrics()->getUpdatesApplied())); + builder.append("$set", + BSON(ReshardingOplogApplierProgress::kDeletesAppliedFieldName + << _env->applierMetrics()->getDeletesApplied())); + } + store.upsert( opCtx, BSON(ReshardingOplogApplierProgress::kOplogSourceIdFieldName << _sourceId.toBSON()), builder.obj()); _env->metrics()->onOplogEntriesApplied(_currentBatchToApply.size()); if (ShardingDataTransformMetrics::isEnabled()) { - _env->metricsNew()->onOplogEntriesApplied(_currentBatchToApply.size()); + _env->applierMetrics()->onOplogEntriesApplied(_currentBatchToApply.size()); } _currentBatchToApply.clear(); diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h index 46875c47099..cee7ce3af0a 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.h +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h @@ -67,8 +67,10 @@ class ReshardingOplogApplier { public: class Env { public: - Env(ServiceContext* service, ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew) - : _service(service), _metrics(metrics), _metricsNew(metricsNew) {} + Env(ServiceContext* service, + ReshardingMetrics* metrics, + ReshardingOplogApplierMetrics* applierMetrics) + : _service(service), _metrics(metrics), _applierMetrics(applierMetrics) {} ServiceContext* service() const { return _service; @@ -77,14 +79,14 @@ public: return _metrics; } - ReshardingMetricsNew* metricsNew() const { - return _metricsNew; + ReshardingOplogApplierMetrics* applierMetrics() { + return _applierMetrics; } private: ServiceContext* _service; ReshardingMetrics* _metrics; - ReshardingMetricsNew* _metricsNew; + ReshardingOplogApplierMetrics* _applierMetrics; }; ReshardingOplogApplier(std::unique_ptr<Env> env, diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_metrics.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics.cpp new file mode 100644 index 00000000000..e930dc5b0a9 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics.cpp @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h" + +namespace mongo { + +ReshardingOplogApplierMetrics::ReshardingOplogApplierMetrics( + ReshardingMetricsNew* metricsNew, boost::optional<ReshardingOplogApplierProgress> progressDoc) + : _metricsNew(metricsNew) { + if (progressDoc) { + _insertsApplied = progressDoc->getInsertsApplied(); + _updatesApplied = progressDoc->getUpdatesApplied(); + _deletesApplied = progressDoc->getDeletesApplied(); + } +} + +void ReshardingOplogApplierMetrics::onInsertApplied() { + _insertsApplied++; + _metricsNew->onInsertApplied(); +} + +void ReshardingOplogApplierMetrics::onUpdateApplied() { + _updatesApplied++; + _metricsNew->onUpdateApplied(); +} + +void ReshardingOplogApplierMetrics::onDeleteApplied() { + _deletesApplied++; + _metricsNew->onDeleteApplied(); +} + +void ReshardingOplogApplierMetrics::onOplogEntriesApplied(int64_t numEntries) { + _oplogEntriesApplied += numEntries; + _metricsNew->onOplogEntriesApplied(numEntries); +} + +void ReshardingOplogApplierMetrics::onWriteToStashedCollections() { + _metricsNew->onWriteToStashedCollections(); +} + +int64_t ReshardingOplogApplierMetrics::getInsertsApplied() const { + return _insertsApplied; +} + +int64_t ReshardingOplogApplierMetrics::getUpdatesApplied() const { + return _updatesApplied; +} + +int64_t ReshardingOplogApplierMetrics::getDeletesApplied() const { + return _deletesApplied; +} + +int64_t ReshardingOplogApplierMetrics::getOplogEntriesApplied() const { + return _oplogEntriesApplied; +} + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_metrics.h b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics.h new file mode 100644 index 00000000000..eb70b12a9f9 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics.h @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/s/resharding/resharding_metrics_new.h" +#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h" + +namespace mongo { + +/** + * Not thread safe and should only be called on a single threaded context. + */ +class ReshardingOplogApplierMetrics { +public: + ReshardingOplogApplierMetrics(ReshardingMetricsNew* metricsNew, + boost::optional<ReshardingOplogApplierProgress> progressDoc); + + void onInsertApplied(); + void onUpdateApplied(); + void onDeleteApplied(); + void onOplogEntriesApplied(int64_t numEntries); + void onWriteToStashedCollections(); + + int64_t getInsertsApplied() const; + int64_t getUpdatesApplied() const; + int64_t getDeletesApplied() const; + int64_t getOplogEntriesApplied() const; + +private: + ReshardingMetricsNew* _metricsNew; + int64_t _insertsApplied{0}; + int64_t _updatesApplied{0}; + int64_t _deletesApplied{0}; + int64_t _oplogEntriesApplied{0}; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp new file mode 100644 index 00000000000..99844f02862 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h" +#include "mongo/db/s/sharding_data_transform_metrics_test_fixture.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class ReshardingOplogApplierMetricsTest : public ShardingDataTransformMetricsTestFixture { +public: + std::unique_ptr<ReshardingMetricsNew> createInstanceMetrics() { + return std::make_unique<ReshardingMetricsNew>(UUID::gen(), + kTestCommand, + kTestNamespace, + ReshardingMetricsNew::Role::kRecipient, + getClockSource()->now(), + getClockSource(), + &_cumulativeMetrics); + } +}; + +TEST_F(ReshardingOplogApplierMetricsTest, + IncrementInsertOnApplierMetricsShouldAlsoIncrementInstance) { + auto metrics = createInstanceMetrics(); + + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("insertsApplied"), 0); + + ReshardingOplogApplierMetrics applierMetrics(metrics.get(), boost::none); + applierMetrics.onInsertApplied(); + + ASSERT_EQ(applierMetrics.getInsertsApplied(), 1); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("insertsApplied"), 1); +} + +TEST_F(ReshardingOplogApplierMetricsTest, + IncrementUpdateOnApplierMetricsShouldAlsoIncrementInstance) { + auto metrics = createInstanceMetrics(); + + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("updatesApplied"), 0); + + ReshardingOplogApplierMetrics applierMetrics(metrics.get(), boost::none); + applierMetrics.onUpdateApplied(); + + ASSERT_EQ(applierMetrics.getUpdatesApplied(), 1); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("updatesApplied"), 1); +} + +TEST_F(ReshardingOplogApplierMetricsTest, + IncrementDeleteOnApplierMetricsShouldAlsoIncrementInstance) { + auto metrics = createInstanceMetrics(); + + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("deletesApplied"), 0); + + ReshardingOplogApplierMetrics applierMetrics(metrics.get(), boost::none); + applierMetrics.onDeleteApplied(); + + ASSERT_EQ(applierMetrics.getDeletesApplied(), 1); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("deletesApplied"), 1); +} + +TEST_F(ReshardingOplogApplierMetricsTest, ApplierInsertProgressIncrementsIdependentlyFromInstance) { + auto metrics = createInstanceMetrics(); + + ReshardingOplogApplierProgress progressDoc; + progressDoc.setInsertsApplied(12); + ReshardingOplogApplierMetrics applierMetrics(metrics.get(), progressDoc); + + ASSERT_EQ(applierMetrics.getInsertsApplied(), 12); + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("insertsApplied"), 0); + + applierMetrics.onInsertApplied(); + + ASSERT_EQ(applierMetrics.getInsertsApplied(), 13); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("insertsApplied"), 1); +} + +TEST_F(ReshardingOplogApplierMetricsTest, ApplierUpdateProgressIncrementsIdependentlyFromInstance) { + auto metrics = createInstanceMetrics(); + + ReshardingOplogApplierProgress progressDoc; + progressDoc.setUpdatesApplied(34); + ReshardingOplogApplierMetrics applierMetrics(metrics.get(), progressDoc); + + ASSERT_EQ(applierMetrics.getUpdatesApplied(), 34); + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("updatesApplied"), 0); + + applierMetrics.onUpdateApplied(); + + ASSERT_EQ(applierMetrics.getUpdatesApplied(), 35); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("updatesApplied"), 1); +} + +TEST_F(ReshardingOplogApplierMetricsTest, ApplierDeleteProgressIncrementsIdependentlyFromInstance) { + auto metrics = createInstanceMetrics(); + + ReshardingOplogApplierProgress progressDoc; + progressDoc.setDeletesApplied(56); + ReshardingOplogApplierMetrics applierMetrics(metrics.get(), progressDoc); + + ASSERT_EQ(applierMetrics.getDeletesApplied(), 56); + auto report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("deletesApplied"), 0); + + applierMetrics.onDeleteApplied(); + + ASSERT_EQ(applierMetrics.getDeletesApplied(), 57); + report = metrics->reportForCurrentOp(); + ASSERT_EQ(report.getIntField("deletesApplied"), 1); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl b/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl index 084e85046f6..4872757ec43 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl @@ -59,3 +59,21 @@ structs: description: >- The total number of oplog entries that have been applied by this ReshardingOplogApplier. + insertsApplied: + type: safeInt64 + default: 0 + description: >- + The total number of insert type oplog entries that have been applied by this + ReshardingOplogApplier. + updatesApplied: + type: safeInt64 + default: 0 + description: >- + The total number of update type oplog entries that have been applied by this + ReshardingOplogApplier. + deletesApplied: + type: safeInt64 + default: 0 + description: >- + The total number of delete type oplog entries that have been applied by this + ReshardingOplogApplier. diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index dd709560ab5..540143d9a44 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -162,6 +162,8 @@ public: ReshardingMetricsNew::Role::kRecipient, getServiceContext()->getFastClockSource()->now(), getServiceContext()); + _applierMetrics = + std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none); _metrics->onStart(ReshardingMetrics::Role::kRecipient, getServiceContext()->getFastClockSource()->now()); _metrics->setRecipientState(RecipientStateEnum::kApplying); @@ -309,7 +311,7 @@ public: protected: auto makeApplierEnv() { return std::make_unique<ReshardingOplogApplier::Env>( - getServiceContext(), _metrics.get(), _metricsNew.get()); + getServiceContext(), _metrics.get(), _applierMetrics.get()); } std::shared_ptr<executor::ThreadPoolTaskExecutor> makeTaskExecutorForApplier() { @@ -367,6 +369,7 @@ protected: const ReshardingSourceId _sourceId{UUID::gen(), kMyShardId}; std::unique_ptr<ReshardingMetrics> _metrics; std::unique_ptr<ReshardingMetricsNew> _metricsNew; + std::unique_ptr<ReshardingOplogApplierMetrics> _applierMetrics; std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor; std::shared_ptr<ThreadPool> _cancelableOpCtxExecutor; diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp index 3dd3f19d0e7..c1d471f0837 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp @@ -116,6 +116,8 @@ public: ShardingDataTransformMetrics::Role::kRecipient, serviceContext->getFastClockSource()->now(), serviceContext); + _applierMetrics = + std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none); _crudApplication = std::make_unique<ReshardingOplogApplicationRules>( _outputNss, std::vector<NamespaceString>{_myStashNss, _otherStashNss}, @@ -123,7 +125,7 @@ public: _myDonorId, makeChunkManagerForSourceCollection(), _metrics.get(), - _metricsNew.get()); + _applierMetrics.get()); _sessionApplication = std::make_unique<ReshardingOplogSessionApplication>(_myOplogBufferNss); @@ -361,6 +363,7 @@ private: std::unique_ptr<ReshardingMetrics> _metrics; std::unique_ptr<ReshardingMetricsNew> _metricsNew; + std::unique_ptr<ReshardingOplogApplierMetrics> _applierMetrics; std::unique_ptr<ReshardingOplogApplicationRules> _crudApplication; std::unique_ptr<ReshardingOplogSessionApplication> _sessionApplication; diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp index 2b233cb57f0..4f5bf2fe69f 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp @@ -118,6 +118,8 @@ public: ShardingDataTransformMetrics::Role::kRecipient, serviceContext->getFastClockSource()->now(), serviceContext); + _oplogApplierMetrics = + std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none); _applier = std::make_unique<ReshardingOplogApplicationRules>( _outputNss, std::vector<NamespaceString>{_myStashNss, _otherStashNss}, @@ -125,7 +127,7 @@ public: _myDonorId, makeChunkManagerForSourceCollection(), _metrics.get(), - _metricsNew.get()); + _oplogApplierMetrics.get()); } } @@ -341,6 +343,7 @@ private: std::unique_ptr<ReshardingOplogApplicationRules> _applier; std::unique_ptr<ReshardingMetrics> _metrics; std::unique_ptr<ReshardingMetricsNew> _metricsNew; + std::unique_ptr<ReshardingOplogApplierMetrics> _oplogApplierMetrics; }; TEST_F(ReshardingOplogCrudApplicationTest, InsertOpInsertsIntoOuputCollection) { diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index e5206a1e972..6908554e6f1 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -547,9 +547,18 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio auto sourceChunkMgr = _externalState->getShardedCollectionRoutingInfo(opCtx, _metadata.getSourceNss()); + // The metrics map can already be pre-populated if it was recovered from disk. + if (_applierMetricsMap.empty()) { + for (const auto& donor : _donorShards) { + _applierMetricsMap.emplace( + donor.getShardId(), + std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none)); + } + } + return _dataReplicationFactory(opCtx, _metrics(), - _metricsNew.get(), + &_applierMetricsMap, _metadata, _donorShards, *_cloneTimestamp, @@ -1096,6 +1105,7 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( reshardingOpCtxKilledWhileRestoringMetrics.execute( [&opCtx](const BSONObj& data) { opCtx->markKilled(); }); + std::vector<std::pair<ShardId, ReshardingOplogApplierProgress>> progressDocList; for (const auto& donor : _donorShards) { { AutoGetCollection oplogBufferColl( @@ -1121,14 +1131,31 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics( result); if (!result.isEmpty()) { - oplogEntriesApplied += - result.getField(ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName) - .Long(); + auto progressDoc = ReshardingOplogApplierProgress::parse( + IDLParserErrorContext("resharding-recipient-service-progress-doc"), result); + oplogEntriesApplied += progressDoc.getNumEntriesApplied(); + + if (ShardingDataTransformMetrics::isEnabled()) { + progressDocList.emplace_back(donor.getShardId(), progressDoc); + } } } } } + // Restore stats here where interrupts will never occur, this is to ensure we will only update + // the metrics only once. + for (const auto& shardIdDocPair : progressDocList) { + const auto& shardId = shardIdDocPair.first; + const auto& progressDoc = shardIdDocPair.second; + + _metricsNew->accumulateFrom(progressDoc); + + auto applierMetrics = + std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), progressDoc); + _applierMetricsMap.emplace(shardId, std::move(applierMetrics)); + } + _metrics()->restoreForCurrentOp( documentCountCopied, documentBytesCopied, oplogEntriesFetched, oplogEntriesApplied); } diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index e1fe504a970..19dd962ace9 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -34,6 +34,7 @@ #include "mongo/db/s/resharding/resharding_data_replication.h" #include "mongo/db/s/resharding/resharding_future_util.h" #include "mongo/db/s/resharding/resharding_metrics_new.h" +#include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h" #include "mongo/db/s/resharding/resharding_util.h" #include "mongo/s/resharding/type_collection_fields_gen.h" #include "mongo/util/concurrency/thread_pool.h" @@ -277,6 +278,7 @@ private: const ReshardingRecipientService* const _recipientService; std::unique_ptr<ReshardingMetricsNew> _metricsNew; + ReshardingApplierMetricsMap _applierMetricsMap; // The in-memory representation of the immutable portion of the document in // config.localReshardingOperations.recipient. diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index 43ccab32bec..518df23e17b 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -775,19 +775,18 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) { unsigned int i = 0; for (const auto& donor : donorShards) { // Setup oplogBuffer collection. + ReshardingDonorOplogId donorOplogId{{20, i}, {19, 0}}; insertFn(getLocalOplogBufferNamespace(doc.getSourceUUID(), donor.getShardId()), - InsertStatement{ - BSON("_id" << (ReshardingDonorOplogId{{20, i}, {19, 0}}).toBSON())}); + InsertStatement{BSON("_id" << donorOplogId.toBSON())}); ++i; // Setup reshardingApplierProgress collection. - auto progressDoc = BSON( - ReshardingOplogApplierProgress::kOplogSourceIdFieldName - << (ReshardingSourceId{doc.getReshardingUUID(), donor.getShardId()}).toBSON() - << ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName - << oplogEntriesAppliedOnEachDonor); + ReshardingOplogApplierProgress progressDoc( + {doc.getReshardingUUID(), donor.getShardId()}, + donorOplogId, + oplogEntriesAppliedOnEachDonor); insertFn(NamespaceString::kReshardingApplierProgressNamespace, - InsertStatement{progressDoc}); + InsertStatement{progressDoc.toBSON()}); } } |