summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2021-06-17 20:11:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-09 18:07:25 +0000
commit28bd2d76e81aca35b300efb48010460e7d441162 (patch)
tree892795e5173292672526de5a6885e67afac048e0 /src/mongo/db/s
parent95b18771c85283900f7a992b7eda350b8a3d6067 (diff)
downloadmongo-28bd2d76e81aca35b300efb48010460e7d441162.tar.gz
SERVER-57760 Introduce resharding dedicated OpCounters and use them in
ReshardingOplogApplier
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp78
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.h5
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.cpp19
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.h6
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp3
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp11
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp8
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp6
8 files changed, 121 insertions, 15 deletions
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index fe3fba53fb3..a18067a00fe 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/compiler.h"
+#include "mongo/util/with_alignment.h"
namespace mongo {
@@ -62,6 +63,7 @@ constexpr auto kDonorState = "donorState";
constexpr auto kRecipientState = "recipientState";
constexpr auto kOpStatus = "opStatus";
constexpr auto kLastOpEndingChunkImbalance = "lastOpEndingChunkImbalance";
+constexpr auto kOpCounters = "opcounters";
using MetricsPtr = std::unique_ptr<ReshardingMetrics>;
@@ -97,6 +99,50 @@ static StringData serializeState(boost::optional<CoordinatorStateEnum> e) {
return CoordinatorState_serializer(*e);
}
+// Enables resharding to distinguish the ops it does to keep up with client writes from the
+// client workload tracked in the globalOpCounters.
+class ReshardingOpCounters {
+public:
+ void gotInsert() noexcept {
+ _checkWrap(&ReshardingOpCounters::_insert);
+ }
+ void gotUpdate() noexcept {
+ _checkWrap(&ReshardingOpCounters::_update);
+ }
+ void gotDelete() noexcept {
+ _checkWrap(&ReshardingOpCounters::_delete);
+ }
+
+ BSONObj getObj() const noexcept {
+ BSONObjBuilder b;
+ b.append("insert", _insert.loadRelaxed());
+ b.append("update", _update.loadRelaxed());
+ b.append("delete", _delete.loadRelaxed());
+ return b.obj();
+ }
+
+private:
+ // Increment member `counter` by 1, resetting all counters if it was > 2^60.
+ void _checkWrap(CacheAligned<AtomicWord<long long>> ReshardingOpCounters::*counter) {
+ static constexpr auto maxCount = 1LL << 60;
+ auto oldValue = (this->*counter).fetchAndAddRelaxed(1);
+ if (oldValue > maxCount - 1) {
+ LOGV2(5776000,
+ "ReshardingOpCounters exceeded maximum value, resetting all to 0",
+ "insert"_attr = _insert.loadRelaxed(),
+ "update"_attr = _update.loadRelaxed(),
+ "delete"_attr = _delete.loadRelaxed());
+ _insert.store(0);
+ _update.store(0);
+ _delete.store(0);
+ }
+ }
+
+ CacheAligned<AtomicWord<long long>> _insert;
+ CacheAligned<AtomicWord<long long>> _update;
+ CacheAligned<AtomicWord<long long>> _delete;
+};
+
// Allows tracking elapsed time for the resharding operation and its sub operations (e.g.,
// applying oplog entries).
class TimeInterval {
@@ -136,6 +182,10 @@ public:
boost::optional<Milliseconds> remainingOperationTime(Date_t now) const;
+ void gotInsert() noexcept;
+ void gotUpdate() noexcept;
+ void gotDelete() noexcept;
+
TimeInterval runningOperation;
ReshardingOperationStatusEnum opStatus = ReshardingOperationStatusEnum::kInactive;
@@ -154,11 +204,26 @@ public:
int64_t chunkImbalanceCount = 0;
+ // The ops done by resharding to keep up with the client writes.
+ ReshardingOpCounters opCounters;
+
boost::optional<DonorStateEnum> donorState;
boost::optional<RecipientStateEnum> recipientState;
boost::optional<CoordinatorStateEnum> coordinatorState;
};
+void ReshardingMetrics::OperationMetrics::gotInsert() noexcept {
+ opCounters.gotInsert();
+}
+
+void ReshardingMetrics::OperationMetrics::gotUpdate() noexcept {
+ opCounters.gotUpdate();
+}
+
+void ReshardingMetrics::OperationMetrics::gotDelete() noexcept {
+ opCounters.gotDelete();
+}
+
boost::optional<Milliseconds> ReshardingMetrics::OperationMetrics::remainingOperationTime(
Date_t now) const {
if (recipientState > RecipientStateEnum::kCloning && oplogEntriesFetched == 0) {
@@ -446,6 +511,18 @@ void ReshardingMetrics::onDocumentsCopiedForCurrentOp(int64_t documents, int64_t
_currentOp->bytesCopied += bytes;
}
+void ReshardingMetrics::gotInsert() noexcept {
+ _cumulativeOp->gotInsert();
+}
+
+void ReshardingMetrics::gotUpdate() noexcept {
+ _cumulativeOp->gotUpdate();
+}
+
+void ReshardingMetrics::gotDelete() noexcept {
+ _cumulativeOp->gotDelete();
+}
+
void ReshardingMetrics::startCopyingDocuments(Date_t start) {
stdx::lock_guard<Latch> lk(_mutex);
_currentOp->copyingDocuments.start(start);
@@ -602,6 +679,7 @@ void ReshardingMetrics::serializeCumulativeOpMetrics(BSONObjBuilder* bob) const
bob->append(kWritesDuringCritialSection, ops.writesDuringCriticalSection);
bob->append(kOplogsFetched, ops.oplogEntriesFetched);
bob->append(kLastOpEndingChunkImbalance, ops.chunkImbalanceCount);
+ bob->append(kOpCounters, ops.opCounters.getObj());
}
Date_t ReshardingMetrics::_now() const {
diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h
index b08a34c7468..aaf3a2f0c73 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.h
+++ b/src/mongo/db/s/resharding/resharding_metrics.h
@@ -83,6 +83,11 @@ public:
// Allows updating metrics on "documents to copy".
void onDocumentsCopiedForCurrentOp(int64_t documents, int64_t bytes) noexcept;
+ // Allows updating metrics on "opcounters";
+ void gotInsert() noexcept;
+ void gotUpdate() noexcept;
+ void gotDelete() noexcept;
+
// Starts/ends the timers recording the times spend in the named sections.
void startCopyingDocuments(Date_t start);
void endCopyingDocuments(Date_t end);
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
index 25c14032d7e..104e79cdd37 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/repl/oplog_applier_utils.h"
+#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/stats/counters.h"
@@ -109,13 +110,15 @@ ReshardingOplogApplicationRules::ReshardingOplogApplicationRules(
std::vector<NamespaceString> allStashNss,
size_t myStashIdx,
ShardId donorShardId,
- ChunkManager sourceChunkMgr)
+ ChunkManager sourceChunkMgr,
+ ReshardingMetrics* metrics)
: _outputNss(std::move(outputNss)),
_allStashNss(std::move(allStashNss)),
_myStashIdx(myStashIdx),
_myStashNss(_allStashNss.at(_myStashIdx)),
_donorShardId(std::move(donorShardId)),
- _sourceChunkMgr(std::move(sourceChunkMgr)) {}
+ _sourceChunkMgr(std::move(sourceChunkMgr)),
+ _metrics(metrics) {}
Status ReshardingOplogApplicationRules::applyOperation(OperationContext* opCtx,
const repl::OplogEntry& op) const {
@@ -215,9 +218,7 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCt
* 4. If there exists a document with _id == [op _id] in the output collection and it is NOT
* owned by this donor shard, insert the contents of 'op' into the conflict stash collection.
*/
- // Writes are replicated, so use global op counters.
- OpCounters* opCounters = &globalOpCounters;
- opCounters->gotInsert();
+ _metrics->gotInsert();
BSONObj oField = op.getObject();
@@ -306,9 +307,7 @@ void ReshardingOplogApplicationRules::_applyUpdate_inlock(OperationContext* opCt
* 4. If there exists a document with _id == [op _id] in the output collection and it is owned
* by this donor shard, update the document from this collection.
*/
- // Writes are replicated, so use global op counters.
- OpCounters* opCounters = &globalOpCounters;
- opCounters->gotUpdate();
+ _metrics->gotUpdate();
BSONObj oField = op.getObject();
BSONObj o2Field;
@@ -393,9 +392,7 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt
* _id == [op _id] arbitrarily from among all resharding conflict stash collections to delete
* from that resharding conflict stash collection and insert into the output collection.
*/
- // Writes are replicated, so use global op counters.
- OpCounters* opCounters = &globalOpCounters;
- opCounters->gotDelete();
+ _metrics->gotDelete();
BSONObj oField = op.getObject();
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h
index 114a3637367..5b84e92e4bd 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.h
@@ -48,6 +48,7 @@ class Collection;
class CollectionPtr;
class NamespaceString;
class OperationContext;
+class ReshardingMetrics;
/**
* Applies an operation from an oplog entry using special rules that apply to resharding.
@@ -58,7 +59,8 @@ public:
std::vector<NamespaceString> allStashNss,
size_t myStashIdx,
ShardId donorShardId,
- ChunkManager sourceChunkMgr);
+ ChunkManager sourceChunkMgr,
+ ReshardingMetrics* metrics);
/**
* Wraps the op application in a writeConflictRetry loop and is responsible for creating and
@@ -112,6 +114,8 @@ private:
// The chunk manager for the source namespace and original shard key.
const ChunkManager _sourceChunkMgr;
+
+ ReshardingMetrics* _metrics;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index f7444e754d8..8b4015bed1e 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -65,7 +65,8 @@ ReshardingOplogApplier::ReshardingOplogApplier(
std::move(allStashNss),
myStashIdx,
_sourceId.getShardId(),
- std::move(sourceChunkMgr)},
+ std::move(sourceChunkMgr),
+ _env->metrics()},
_sessionApplication{},
_batchApplier{_crudApplication, _sessionApplication},
_oplogIter(std::move(oplogIterator)) {}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index fe19dbd3218..504ef1eb442 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -256,6 +256,12 @@ public:
return kStashCollections;
}
+ BSONObj getMetricsOpCounters() {
+ BSONObjBuilder bob;
+ _metrics->serializeCumulativeOpMetrics(&bob);
+ return bob.obj().getObjectField("opcounters").getOwned();
+ }
+
long long metricsAppliedCount() const {
BSONObjBuilder bob;
_metrics->serializeCurrentOpMetrics(&bob, ReshardingMetrics::Role::kRecipient);
@@ -815,6 +821,11 @@ TEST_F(ReshardingOplogApplierTest, MetricsAreReported) {
auto future = applier.run(executor, executor, cancelToken, factory);
ASSERT_OK(future.getNoThrow());
+ auto opCountersObj = getMetricsOpCounters();
+ ASSERT_EQ(opCountersObj.getIntField("insert"), 2);
+ ASSERT_EQ(opCountersObj.getIntField("update"), 1);
+ ASSERT_EQ(opCountersObj.getIntField("delete"), 2);
+
// The in-memory metrics should show the 5 ops above + the final oplog entry, but on disk should
// not include the final entry in its count.
ASSERT_EQ(metricsAppliedCount(), 6);
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
index 417139aada9..a54e6dde5fc 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/s/op_observer_sharding_impl.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
+#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_oplog_application.h"
#include "mongo/db/s/resharding/resharding_oplog_batch_applier.h"
#include "mongo/db/s/resharding/resharding_oplog_session_application.h"
@@ -103,12 +104,15 @@ public:
opCtx.get(), nss, CollectionOptions{});
}
+ _metrics = std::make_unique<ReshardingMetrics>(serviceContext);
+
_crudApplication = std::make_unique<ReshardingOplogApplicationRules>(
_outputNss,
std::vector<NamespaceString>{_myStashNss, _otherStashNss},
0U,
_myDonorId,
- makeChunkManagerForSourceCollection());
+ makeChunkManagerForSourceCollection(),
+ _metrics.get());
_sessionApplication = std::make_unique<ReshardingOplogSessionApplication>();
@@ -326,6 +330,8 @@ private:
const NamespaceString _otherStashNss =
getLocalConflictStashNamespace(_sourceUUID, _otherDonorId);
+ std::unique_ptr<ReshardingMetrics> _metrics;
+
std::unique_ptr<ReshardingOplogApplicationRules> _crudApplication;
std::unique_ptr<ReshardingOplogSessionApplication> _sessionApplication;
std::unique_ptr<ReshardingOplogBatchApplier> _batchApplier;
diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
index 9a8c9231677..2bc7b48771a 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/s/op_observer_sharding_impl.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
+#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_oplog_application.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/service_context_d_test_fixture.h"
@@ -95,12 +96,14 @@ public:
opCtx.get(), nss, CollectionOptions{});
}
+ _metrics = std::make_unique<ReshardingMetrics>(getServiceContext());
_applier = std::make_unique<ReshardingOplogApplicationRules>(
_outputNss,
std::vector<NamespaceString>{_myStashNss, _otherStashNss},
0U,
_myDonorId,
- makeChunkManagerForSourceCollection());
+ makeChunkManagerForSourceCollection(),
+ _metrics.get());
}
}
@@ -290,6 +293,7 @@ private:
getLocalConflictStashNamespace(_sourceUUID, _otherDonorId);
std::unique_ptr<ReshardingOplogApplicationRules> _applier;
+ std::unique_ptr<ReshardingMetrics> _metrics;
};
TEST_F(ReshardingOplogCrudApplicationTest, InsertOpInsertsIntoOuputCollection) {