diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2021-06-17 20:11:33 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-07-09 18:07:25 +0000 |
commit | 28bd2d76e81aca35b300efb48010460e7d441162 (patch) | |
tree | 892795e5173292672526de5a6885e67afac048e0 | |
parent | 95b18771c85283900f7a992b7eda350b8a3d6067 (diff) | |
download | mongo-28bd2d76e81aca35b300efb48010460e7d441162.tar.gz |
SERVER-57760 Introduce resharding dedicated OpCounters and use them in
ReshardingOplogApplier
9 files changed, 142 insertions, 17 deletions
diff --git a/jstests/sharding/resharding_metrics_increment.js b/jstests/sharding/resharding_metrics_increment.js index 914ac1503bb..2291f647c00 100644 --- a/jstests/sharding/resharding_metrics_increment.js +++ b/jstests/sharding/resharding_metrics_increment.js @@ -94,8 +94,20 @@ const topology = DiscoverTopology.findConnectedNodes(mongos); // baseline of 2 fetches/applies on each recipient (one "no-op" for each donor). // Additionally, recipientShard[1] gets the 10 late inserts above, so expect 12 // oplogEntry applies for those late inserts. -[{shardName: recipientShardNames[0], documents: 2, fetched: 2, applied: 2}, - {shardName: recipientShardNames[1], documents: 2, fetched: 12, applied: 12}, +[{ + shardName: recipientShardNames[0], + documents: 2, + fetched: 2, + applied: 2, + opcounters: {insert: 0, update: 0, delete: 0} +}, + { + shardName: recipientShardNames[1], + documents: 2, + fetched: 12, + applied: 12, + opcounters: {insert: 10, update: 0, delete: 0} + }, ].forEach(e => { const mongo = new Mongo(topology.shards[e.shardName].primary); const doc = mongo.getDB('admin').serverStatus({}); @@ -113,6 +125,13 @@ const topology = DiscoverTopology.findConnectedNodes(mongos); "oplogEntriesFetched": e.fetched, "oplogEntriesApplied": e.applied, }); + + verifyDict(sub.opcounters, { + "insert": e.opcounters.insert, + "update": e.opcounters.update, + "delete": e.opcounters.delete, + }); + // bytesCopied is harder to pin down but it should be >0. assert.betweenIn(1, sub['bytesCopied'], 1024, 'bytesCopied'); }); diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index fe3fba53fb3..a18067a00fe 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -34,6 +34,7 @@ #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/logv2/log.h" #include "mongo/platform/compiler.h" +#include "mongo/util/with_alignment.h" namespace mongo { @@ -62,6 +63,7 @@ constexpr auto kDonorState = "donorState"; constexpr auto kRecipientState = "recipientState"; constexpr auto kOpStatus = "opStatus"; constexpr auto kLastOpEndingChunkImbalance = "lastOpEndingChunkImbalance"; +constexpr auto kOpCounters = "opcounters"; using MetricsPtr = std::unique_ptr<ReshardingMetrics>; @@ -97,6 +99,50 @@ static StringData serializeState(boost::optional<CoordinatorStateEnum> e) { return CoordinatorState_serializer(*e); } +// Enables resharding to distinguish the ops it does to keep up with client writes from the +// client workload tracked in the globalOpCounters. +class ReshardingOpCounters { +public: + void gotInsert() noexcept { + _checkWrap(&ReshardingOpCounters::_insert); + } + void gotUpdate() noexcept { + _checkWrap(&ReshardingOpCounters::_update); + } + void gotDelete() noexcept { + _checkWrap(&ReshardingOpCounters::_delete); + } + + BSONObj getObj() const noexcept { + BSONObjBuilder b; + b.append("insert", _insert.loadRelaxed()); + b.append("update", _update.loadRelaxed()); + b.append("delete", _delete.loadRelaxed()); + return b.obj(); + } + +private: + // Increment member `counter` by 1, resetting all counters if it was > 2^60. + void _checkWrap(CacheAligned<AtomicWord<long long>> ReshardingOpCounters::*counter) { + static constexpr auto maxCount = 1LL << 60; + auto oldValue = (this->*counter).fetchAndAddRelaxed(1); + if (oldValue > maxCount - 1) { + LOGV2(5776000, + "ReshardingOpCounters exceeded maximum value, resetting all to 0", + "insert"_attr = _insert.loadRelaxed(), + "update"_attr = _update.loadRelaxed(), + "delete"_attr = _delete.loadRelaxed()); + _insert.store(0); + _update.store(0); + _delete.store(0); + } + } + + CacheAligned<AtomicWord<long long>> _insert; + CacheAligned<AtomicWord<long long>> _update; + CacheAligned<AtomicWord<long long>> _delete; +}; + // Allows tracking elapsed time for the resharding operation and its sub operations (e.g., // applying oplog entries). class TimeInterval { @@ -136,6 +182,10 @@ public: boost::optional<Milliseconds> remainingOperationTime(Date_t now) const; + void gotInsert() noexcept; + void gotUpdate() noexcept; + void gotDelete() noexcept; + TimeInterval runningOperation; ReshardingOperationStatusEnum opStatus = ReshardingOperationStatusEnum::kInactive; @@ -154,11 +204,26 @@ public: int64_t chunkImbalanceCount = 0; + // The ops done by resharding to keep up with the client writes. + ReshardingOpCounters opCounters; + boost::optional<DonorStateEnum> donorState; boost::optional<RecipientStateEnum> recipientState; boost::optional<CoordinatorStateEnum> coordinatorState; }; +void ReshardingMetrics::OperationMetrics::gotInsert() noexcept { + opCounters.gotInsert(); +} + +void ReshardingMetrics::OperationMetrics::gotUpdate() noexcept { + opCounters.gotUpdate(); +} + +void ReshardingMetrics::OperationMetrics::gotDelete() noexcept { + opCounters.gotDelete(); +} + boost::optional<Milliseconds> ReshardingMetrics::OperationMetrics::remainingOperationTime( Date_t now) const { if (recipientState > RecipientStateEnum::kCloning && oplogEntriesFetched == 0) { @@ -446,6 +511,18 @@ void ReshardingMetrics::onDocumentsCopiedForCurrentOp(int64_t documents, int64_t _currentOp->bytesCopied += bytes; } +void ReshardingMetrics::gotInsert() noexcept { + _cumulativeOp->gotInsert(); +} + +void ReshardingMetrics::gotUpdate() noexcept { + _cumulativeOp->gotUpdate(); +} + +void ReshardingMetrics::gotDelete() noexcept { + _cumulativeOp->gotDelete(); +} + void ReshardingMetrics::startCopyingDocuments(Date_t start) { stdx::lock_guard<Latch> lk(_mutex); _currentOp->copyingDocuments.start(start); @@ -602,6 +679,7 @@ void ReshardingMetrics::serializeCumulativeOpMetrics(BSONObjBuilder* bob) const bob->append(kWritesDuringCritialSection, ops.writesDuringCriticalSection); bob->append(kOplogsFetched, ops.oplogEntriesFetched); bob->append(kLastOpEndingChunkImbalance, ops.chunkImbalanceCount); + bob->append(kOpCounters, ops.opCounters.getObj()); } Date_t ReshardingMetrics::_now() const { diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h index b08a34c7468..aaf3a2f0c73 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.h +++ b/src/mongo/db/s/resharding/resharding_metrics.h @@ -83,6 +83,11 @@ public: // Allows updating metrics on "documents to copy". void onDocumentsCopiedForCurrentOp(int64_t documents, int64_t bytes) noexcept; + // Allows updating metrics on "opcounters"; + void gotInsert() noexcept; + void gotUpdate() noexcept; + void gotDelete() noexcept; + // Starts/ends the timers recording the times spend in the named sections. void startCopyingDocuments(Date_t start); void endCopyingDocuments(Date_t end); diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index 25c14032d7e..104e79cdd37 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -45,6 +45,7 @@ #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/repl/oplog_applier_utils.h" +#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/stats/counters.h" @@ -109,13 +110,15 @@ ReshardingOplogApplicationRules::ReshardingOplogApplicationRules( std::vector<NamespaceString> allStashNss, size_t myStashIdx, ShardId donorShardId, - ChunkManager sourceChunkMgr) + ChunkManager sourceChunkMgr, + ReshardingMetrics* metrics) : _outputNss(std::move(outputNss)), _allStashNss(std::move(allStashNss)), _myStashIdx(myStashIdx), _myStashNss(_allStashNss.at(_myStashIdx)), _donorShardId(std::move(donorShardId)), - _sourceChunkMgr(std::move(sourceChunkMgr)) {} + _sourceChunkMgr(std::move(sourceChunkMgr)), + _metrics(metrics) {} Status ReshardingOplogApplicationRules::applyOperation(OperationContext* opCtx, const repl::OplogEntry& op) const { @@ -215,9 +218,7 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCt * 4. If there exists a document with _id == [op _id] in the output collection and it is NOT * owned by this donor shard, insert the contents of 'op' into the conflict stash collection. */ - // Writes are replicated, so use global op counters. - OpCounters* opCounters = &globalOpCounters; - opCounters->gotInsert(); + _metrics->gotInsert(); BSONObj oField = op.getObject(); @@ -306,9 +307,7 @@ void ReshardingOplogApplicationRules::_applyUpdate_inlock(OperationContext* opCt * 4. If there exists a document with _id == [op _id] in the output collection and it is owned * by this donor shard, update the document from this collection. */ - // Writes are replicated, so use global op counters. - OpCounters* opCounters = &globalOpCounters; - opCounters->gotUpdate(); + _metrics->gotUpdate(); BSONObj oField = op.getObject(); BSONObj o2Field; @@ -393,9 +392,7 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt * _id == [op _id] arbitrarily from among all resharding conflict stash collections to delete * from that resharding conflict stash collection and insert into the output collection. */ - // Writes are replicated, so use global op counters. - OpCounters* opCounters = &globalOpCounters; - opCounters->gotDelete(); + _metrics->gotDelete(); BSONObj oField = op.getObject(); diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h index 114a3637367..5b84e92e4bd 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.h +++ b/src/mongo/db/s/resharding/resharding_oplog_application.h @@ -48,6 +48,7 @@ class Collection; class CollectionPtr; class NamespaceString; class OperationContext; +class ReshardingMetrics; /** * Applies an operation from an oplog entry using special rules that apply to resharding. @@ -58,7 +59,8 @@ public: std::vector<NamespaceString> allStashNss, size_t myStashIdx, ShardId donorShardId, - ChunkManager sourceChunkMgr); + ChunkManager sourceChunkMgr, + ReshardingMetrics* metrics); /** * Wraps the op application in a writeConflictRetry loop and is responsible for creating and @@ -112,6 +114,8 @@ private: // The chunk manager for the source namespace and original shard key. const ChunkManager _sourceChunkMgr; + + ReshardingMetrics* _metrics; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index f7444e754d8..8b4015bed1e 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -65,7 +65,8 @@ ReshardingOplogApplier::ReshardingOplogApplier( std::move(allStashNss), myStashIdx, _sourceId.getShardId(), - std::move(sourceChunkMgr)}, + std::move(sourceChunkMgr), + _env->metrics()}, _sessionApplication{}, _batchApplier{_crudApplication, _sessionApplication}, _oplogIter(std::move(oplogIterator)) {} diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index fe19dbd3218..504ef1eb442 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -256,6 +256,12 @@ public: return kStashCollections; } + BSONObj getMetricsOpCounters() { + BSONObjBuilder bob; + _metrics->serializeCumulativeOpMetrics(&bob); + return bob.obj().getObjectField("opcounters").getOwned(); + } + long long metricsAppliedCount() const { BSONObjBuilder bob; _metrics->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kRecipient); @@ -815,6 +821,11 @@ TEST_F(ReshardingOplogApplierTest, MetricsAreReported) { auto future = applier.run(executor, executor, cancelToken, factory); ASSERT_OK(future.getNoThrow()); + auto opCountersObj = getMetricsOpCounters(); + ASSERT_EQ(opCountersObj.getIntField("insert"), 2); + ASSERT_EQ(opCountersObj.getIntField("update"), 1); + ASSERT_EQ(opCountersObj.getIntField("delete"), 2); + // The in-memory metrics should show the 5 ops above + the final oplog entry, but on disk should // not include the final entry in its count. ASSERT_EQ(metricsAppliedCount(), 6); diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp index 417139aada9..a54e6dde5fc 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp @@ -47,6 +47,7 @@ #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/s/op_observer_sharding_impl.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" +#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_oplog_application.h" #include "mongo/db/s/resharding/resharding_oplog_batch_applier.h" #include "mongo/db/s/resharding/resharding_oplog_session_application.h" @@ -103,12 +104,15 @@ public: opCtx.get(), nss, CollectionOptions{}); } + _metrics = std::make_unique<ReshardingMetrics>(serviceContext); + _crudApplication = std::make_unique<ReshardingOplogApplicationRules>( _outputNss, std::vector<NamespaceString>{_myStashNss, _otherStashNss}, 0U, _myDonorId, - makeChunkManagerForSourceCollection()); + makeChunkManagerForSourceCollection(), + _metrics.get()); _sessionApplication = std::make_unique<ReshardingOplogSessionApplication>(); @@ -326,6 +330,8 @@ private: const NamespaceString _otherStashNss = getLocalConflictStashNamespace(_sourceUUID, _otherDonorId); + std::unique_ptr<ReshardingMetrics> _metrics; + std::unique_ptr<ReshardingOplogApplicationRules> _crudApplication; std::unique_ptr<ReshardingOplogSessionApplication> _sessionApplication; std::unique_ptr<ReshardingOplogBatchApplier> _batchApplier; diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp index 9a8c9231677..2bc7b48771a 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp @@ -47,6 +47,7 @@ #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/s/op_observer_sharding_impl.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" +#include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_oplog_application.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/service_context_d_test_fixture.h" @@ -95,12 +96,14 @@ public: opCtx.get(), nss, CollectionOptions{}); } + _metrics = std::make_unique<ReshardingMetrics>(getServiceContext()); _applier = std::make_unique<ReshardingOplogApplicationRules>( _outputNss, std::vector<NamespaceString>{_myStashNss, _otherStashNss}, 0U, _myDonorId, - makeChunkManagerForSourceCollection()); + makeChunkManagerForSourceCollection(), + _metrics.get()); } } @@ -290,6 +293,7 @@ private: getLocalConflictStashNamespace(_sourceUUID, _otherDonorId); std::unique_ptr<ReshardingOplogApplicationRules> _applier; + std::unique_ptr<ReshardingMetrics> _metrics; }; TEST_F(ReshardingOplogCrudApplicationTest, InsertOpInsertsIntoOuputCollection) { |