summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2022-04-05 21:11:01 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-18 16:56:18 +0000
commitbeb7e25a768721018f10693fdefe55656b307d4e (patch)
treec5d12fc51bad48c2d66360a633a9bb96bb9f5edf /src/mongo/db/s/resharding
parentc08573de6d48f18e3772b083a8e5e3f85cb3b47b (diff)
downloadmongo-beb7e25a768721018f10693fdefe55656b307d4e.tar.gz
SERVER-63623 Implemented Basic Global Index Metrics Restoration For CrudOp Metrics
Diffstat (limited to 'src/mongo/db/s/resharding')
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.cpp15
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.h9
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_new.cpp8
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_new.h3
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_new_test.cpp15
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.cpp20
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.h8
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp16
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.h12
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_metrics.cpp86
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_metrics.h64
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp154
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl18
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp5
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp5
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp5
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp35
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp15
19 files changed, 449 insertions, 46 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp
index a5918e3ea77..b6c0686872f 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp
@@ -164,7 +164,7 @@ std::shared_ptr<executor::TaskExecutor> ReshardingDataReplication::_makeOplogFet
std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::_makeOplogAppliers(
OperationContext* opCtx,
ReshardingMetrics* metrics,
- ReshardingMetricsNew* metricsNew,
+ ReshardingApplierMetricsMap* applierMetricsMap,
const CommonReshardingMetadata& metadata,
const std::vector<DonorShardFetchTimestamp>& donorShards,
Timestamp cloneTimestamp,
@@ -175,18 +175,19 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::
oplogAppliers.reserve(donorShards.size());
for (size_t i = 0; i < donorShards.size(); ++i) {
- auto sourceId =
- ReshardingSourceId{metadata.getReshardingUUID(), donorShards[i].getShardId()};
+ const auto donorShardId = donorShards[i].getShardId();
+ auto sourceId = ReshardingSourceId{metadata.getReshardingUUID(), donorShardId};
auto minFetchTimestamp = *donorShards[i].getMinFetchTimestamp();
auto idToResumeFrom = getOplogApplierResumeId(opCtx, sourceId, minFetchTimestamp);
invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}));
const auto& oplogBufferNss =
- getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShards[i].getShardId());
+ getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShardId);
+ auto applierMetrics = (*applierMetricsMap)[donorShardId].get();
oplogAppliers.emplace_back(std::make_unique<ReshardingOplogApplier>(
std::make_unique<ReshardingOplogApplier::Env>(
- opCtx->getServiceContext(), metrics, metricsNew),
+ opCtx->getServiceContext(), metrics, applierMetrics),
std::move(sourceId),
oplogBufferNss,
metadata.getTempReshardingNss(),
@@ -206,7 +207,7 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::
std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::make(
OperationContext* opCtx,
ReshardingMetrics* metrics,
- ReshardingMetricsNew* metricsNew,
+ ReshardingApplierMetricsMap* applierMetricsMap,
CommonReshardingMetadata metadata,
const std::vector<DonorShardFetchTimestamp>& donorShards,
Timestamp cloneTimestamp,
@@ -228,7 +229,7 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m
auto stashCollections = ensureStashCollectionsExist(opCtx, sourceChunkMgr, donorShards);
auto oplogAppliers = _makeOplogAppliers(opCtx,
metrics,
- metricsNew,
+ applierMetricsMap,
metadata,
donorShards,
cloneTimestamp,
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h
index a2705abd6f6..7fe8ee023eb 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.h
+++ b/src/mongo/db/s/resharding/resharding_data_replication.h
@@ -36,7 +36,7 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/s/resharding/donor_oplog_id_gen.h"
-#include "mongo/db/s/resharding/resharding_metrics_new.h"
+#include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/resharding/common_types_gen.h"
#include "mongo/s/shard_id.h"
@@ -61,6 +61,9 @@ class TaskExecutor;
} // namespace executor
+using ReshardingApplierMetricsMap =
+ std::map<ShardId, std::unique_ptr<ReshardingOplogApplierMetrics>>;
+
/**
* Manages the full sequence of data replication in resharding on the recipient.
*
@@ -139,7 +142,7 @@ public:
static std::unique_ptr<ReshardingDataReplicationInterface> make(
OperationContext* opCtx,
ReshardingMetrics* metrics,
- ReshardingMetricsNew* metricsNew,
+ ReshardingApplierMetricsMap* applierMetricsMap,
CommonReshardingMetadata metadata,
const std::vector<DonorShardFetchTimestamp>& donorShards,
Timestamp cloneTimestamp,
@@ -215,7 +218,7 @@ private:
static std::vector<std::unique_ptr<ReshardingOplogApplier>> _makeOplogAppliers(
OperationContext* opCtx,
ReshardingMetrics* metrics,
- ReshardingMetricsNew* metricsNew,
+ ReshardingApplierMetricsMap* applierMetricsMap,
const CommonReshardingMetadata& metadata,
const std::vector<DonorShardFetchTimestamp>& donorShards,
Timestamp cloneTimestamp,
diff --git a/src/mongo/db/s/resharding/resharding_metrics_new.cpp b/src/mongo/db/s/resharding/resharding_metrics_new.cpp
index 1d2f883d46c..ff0cfc3312f 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_new.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics_new.cpp
@@ -137,4 +137,12 @@ StringData ReshardingMetricsNew::getStateString() const noexcept {
_state.load());
}
+void ReshardingMetricsNew::accumulateFrom(const ReshardingOplogApplierProgress& progressDoc) {
+ invariant(_role == Role::kRecipient);
+
+ accumulateValues(progressDoc.getInsertsApplied(),
+ progressDoc.getUpdatesApplied(),
+ progressDoc.getDeletesApplied());
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_metrics_new.h b/src/mongo/db/s/resharding/resharding_metrics_new.h
index dc1f1e20bda..a9243e996c9 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_new.h
+++ b/src/mongo/db/s/resharding/resharding_metrics_new.h
@@ -32,6 +32,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/resharding/resharding_metrics_helpers.h"
+#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h"
#include "mongo/db/s/sharding_data_transform_instance_metrics.h"
#include "mongo/util/uuid.h"
@@ -88,6 +89,8 @@ public:
_state.store(state);
}
+ void accumulateFrom(const ReshardingOplogApplierProgress& progressDoc);
+
protected:
virtual StringData getStateString() const noexcept override;
diff --git a/src/mongo/db/s/resharding/resharding_metrics_new_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_new_test.cpp
index fb055bb5e6b..6b3af444be6 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_new_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics_new_test.cpp
@@ -156,5 +156,20 @@ TEST_F(ReshardingMetricsTest, RestoresFromCoordinatorStateDocument) {
CoordinatorState_serializer(state));
}
+TEST_F(ReshardingMetricsTest, RestoresFromReshardingApplierProgressDocument) {
+ ReshardingOplogApplierProgress progressDoc;
+ progressDoc.setInsertsApplied(123);
+ progressDoc.setUpdatesApplied(456);
+ progressDoc.setDeletesApplied(789);
+
+ auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient);
+ metrics->accumulateFrom(progressDoc);
+ auto report = metrics->reportForCurrentOp();
+
+ ASSERT_EQ(report.getIntField("insertsApplied"), 123);
+ ASSERT_EQ(report.getIntField("updatesApplied"), 456);
+ ASSERT_EQ(report.getIntField("deletesApplied"), 789);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
index 08067f21131..21db06d883b 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
@@ -126,7 +126,7 @@ ReshardingOplogApplicationRules::ReshardingOplogApplicationRules(
ShardId donorShardId,
ChunkManager sourceChunkMgr,
ReshardingMetrics* metrics,
- ReshardingMetricsNew* metricsNew)
+ ReshardingOplogApplierMetrics* applierMetrics)
: _outputNss(std::move(outputNss)),
_allStashNss(std::move(allStashNss)),
_myStashIdx(myStashIdx),
@@ -134,7 +134,7 @@ ReshardingOplogApplicationRules::ReshardingOplogApplicationRules(
_donorShardId(std::move(donorShardId)),
_sourceChunkMgr(std::move(sourceChunkMgr)),
_metrics(metrics),
- _metricsNew(metricsNew) {}
+ _applierMetrics(applierMetrics) {}
Status ReshardingOplogApplicationRules::applyOperation(OperationContext* opCtx,
const repl::OplogEntry& op) const {
@@ -175,21 +175,21 @@ Status ReshardingOplogApplicationRules::applyOperation(OperationContext* opCtx,
_applyInsert_inlock(
opCtx, autoCollOutput.getDb(), *autoCollOutput, *autoCollStash, op);
if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onInsertApplied();
+ _applierMetrics->onInsertApplied();
}
break;
case repl::OpTypeEnum::kUpdate:
_applyUpdate_inlock(
opCtx, autoCollOutput.getDb(), *autoCollOutput, *autoCollStash, op);
if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onUpdateApplied();
+ _applierMetrics->onUpdateApplied();
}
break;
case repl::OpTypeEnum::kDelete:
_applyDelete_inlock(
opCtx, autoCollOutput.getDb(), *autoCollOutput, *autoCollStash, op);
if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onDeleteApplied();
+ _applierMetrics->onDeleteApplied();
}
break;
default:
@@ -272,7 +272,7 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCt
invariant(ur.numMatched != 0);
if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onWriteToStashedCollections();
+ _applierMetrics->onWriteToStashedCollections();
}
return;
@@ -317,7 +317,7 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock(OperationContext* opCt
opCtx, InsertStatement(oField), nullptr /* nullOpDebug */, false /* fromMigrate */));
if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onWriteToStashedCollections();
+ _applierMetrics->onWriteToStashedCollections();
}
}
@@ -373,7 +373,7 @@ void ReshardingOplogApplicationRules::_applyUpdate_inlock(OperationContext* opCt
invariant(ur.numMatched != 0);
if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onWriteToStashedCollections();
+ _applierMetrics->onWriteToStashedCollections();
}
return;
@@ -451,7 +451,7 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt
invariant(nDeleted != 0);
if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onWriteToStashedCollections();
+ _applierMetrics->onWriteToStashedCollections();
}
return;
@@ -539,7 +539,7 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt
auto state = exec->getNext(&res, nullptr);
if (ShardingDataTransformMetrics::isEnabled()) {
- _metricsNew->onWriteToStashedCollections();
+ _applierMetrics->onWriteToStashedCollections();
}
if (PlanExecutor::ADVANCED == state) {
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h
index 2f5c03a8592..5052627c81b 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.h
@@ -41,7 +41,8 @@
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/s/resharding/resharding_metrics_new.h"
+#include "mongo/db/s/resharding/resharding_metrics.h"
+#include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h"
#include "mongo/s/chunk_manager.h"
namespace mongo {
@@ -49,7 +50,6 @@ class Collection;
class CollectionPtr;
class NamespaceString;
class OperationContext;
-class ReshardingMetrics;
/**
* Applies an operation from an oplog entry using special rules that apply to resharding.
@@ -62,7 +62,7 @@ public:
ShardId donorShardId,
ChunkManager sourceChunkMgr,
ReshardingMetrics* metrics,
- ReshardingMetricsNew* metricsNew);
+ ReshardingOplogApplierMetrics* applierMetrics);
const NamespaceString& getOutputNss() const {
return _outputNss;
@@ -122,7 +122,7 @@ private:
const ChunkManager _sourceChunkMgr;
ReshardingMetrics* _metrics;
- ReshardingMetricsNew* _metricsNew;
+ ReshardingOplogApplierMetrics* _applierMetrics;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index 766ac67f266..3341bdcd9cc 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -69,7 +69,7 @@ ReshardingOplogApplier::ReshardingOplogApplier(
_sourceId.getShardId(),
std::move(sourceChunkMgr),
_env->metrics(),
- _env->metricsNew()},
+ _env->applierMetrics()},
_sessionApplication{std::move(oplogBufferNss)},
_batchApplier{_crudApplication, _sessionApplication},
_oplogIter(std::move(oplogIterator)) {}
@@ -233,13 +233,25 @@ void ReshardingOplogApplier::_clearAppliedOpsAndStoreProgress(OperationContext*
BSON(ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName
<< static_cast<long long>(_currentBatchToApply.size())));
+ if (ShardingDataTransformMetrics::isEnabled()) {
+ builder.append("$set",
+ BSON(ReshardingOplogApplierProgress::kInsertsAppliedFieldName
+ << _env->applierMetrics()->getInsertsApplied()));
+ builder.append("$set",
+ BSON(ReshardingOplogApplierProgress::kUpdatesAppliedFieldName
+ << _env->applierMetrics()->getUpdatesApplied()));
+ builder.append("$set",
+ BSON(ReshardingOplogApplierProgress::kDeletesAppliedFieldName
+ << _env->applierMetrics()->getDeletesApplied()));
+ }
+
store.upsert(
opCtx,
BSON(ReshardingOplogApplierProgress::kOplogSourceIdFieldName << _sourceId.toBSON()),
builder.obj());
_env->metrics()->onOplogEntriesApplied(_currentBatchToApply.size());
if (ShardingDataTransformMetrics::isEnabled()) {
- _env->metricsNew()->onOplogEntriesApplied(_currentBatchToApply.size());
+ _env->applierMetrics()->onOplogEntriesApplied(_currentBatchToApply.size());
}
_currentBatchToApply.clear();
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h
index 46875c47099..cee7ce3af0a 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h
@@ -67,8 +67,10 @@ class ReshardingOplogApplier {
public:
class Env {
public:
- Env(ServiceContext* service, ReshardingMetrics* metrics, ReshardingMetricsNew* metricsNew)
- : _service(service), _metrics(metrics), _metricsNew(metricsNew) {}
+ Env(ServiceContext* service,
+ ReshardingMetrics* metrics,
+ ReshardingOplogApplierMetrics* applierMetrics)
+ : _service(service), _metrics(metrics), _applierMetrics(applierMetrics) {}
ServiceContext* service() const {
return _service;
@@ -77,14 +79,14 @@ public:
return _metrics;
}
- ReshardingMetricsNew* metricsNew() const {
- return _metricsNew;
+ ReshardingOplogApplierMetrics* applierMetrics() {
+ return _applierMetrics;
}
private:
ServiceContext* _service;
ReshardingMetrics* _metrics;
- ReshardingMetricsNew* _metricsNew;
+ ReshardingOplogApplierMetrics* _applierMetrics;
};
ReshardingOplogApplier(std::unique_ptr<Env> env,
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_metrics.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics.cpp
new file mode 100644
index 00000000000..e930dc5b0a9
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics.cpp
@@ -0,0 +1,86 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h"
+
+namespace mongo {
+
+ReshardingOplogApplierMetrics::ReshardingOplogApplierMetrics(
+ ReshardingMetricsNew* metricsNew, boost::optional<ReshardingOplogApplierProgress> progressDoc)
+ : _metricsNew(metricsNew) {
+ if (progressDoc) {
+ _insertsApplied = progressDoc->getInsertsApplied();
+ _updatesApplied = progressDoc->getUpdatesApplied();
+ _deletesApplied = progressDoc->getDeletesApplied();
+ }
+}
+
+void ReshardingOplogApplierMetrics::onInsertApplied() {
+ _insertsApplied++;
+ _metricsNew->onInsertApplied();
+}
+
+void ReshardingOplogApplierMetrics::onUpdateApplied() {
+ _updatesApplied++;
+ _metricsNew->onUpdateApplied();
+}
+
+void ReshardingOplogApplierMetrics::onDeleteApplied() {
+ _deletesApplied++;
+ _metricsNew->onDeleteApplied();
+}
+
+void ReshardingOplogApplierMetrics::onOplogEntriesApplied(int64_t numEntries) {
+ _oplogEntriesApplied += numEntries;
+ _metricsNew->onOplogEntriesApplied(numEntries);
+}
+
+void ReshardingOplogApplierMetrics::onWriteToStashedCollections() {
+ _metricsNew->onWriteToStashedCollections();
+}
+
+int64_t ReshardingOplogApplierMetrics::getInsertsApplied() const {
+ return _insertsApplied;
+}
+
+int64_t ReshardingOplogApplierMetrics::getUpdatesApplied() const {
+ return _updatesApplied;
+}
+
+int64_t ReshardingOplogApplierMetrics::getDeletesApplied() const {
+ return _deletesApplied;
+}
+
+int64_t ReshardingOplogApplierMetrics::getOplogEntriesApplied() const {
+ return _oplogEntriesApplied;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_metrics.h b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics.h
new file mode 100644
index 00000000000..eb70b12a9f9
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics.h
@@ -0,0 +1,64 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/s/resharding/resharding_metrics_new.h"
+#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h"
+
+namespace mongo {
+
+/**
+ * Not thread safe and should only be called on a single threaded context.
+ */
+class ReshardingOplogApplierMetrics {
+public:
+ ReshardingOplogApplierMetrics(ReshardingMetricsNew* metricsNew,
+ boost::optional<ReshardingOplogApplierProgress> progressDoc);
+
+ void onInsertApplied();
+ void onUpdateApplied();
+ void onDeleteApplied();
+ void onOplogEntriesApplied(int64_t numEntries);
+ void onWriteToStashedCollections();
+
+ int64_t getInsertsApplied() const;
+ int64_t getUpdatesApplied() const;
+ int64_t getDeletesApplied() const;
+ int64_t getOplogEntriesApplied() const;
+
+private:
+ ReshardingMetricsNew* _metricsNew;
+ int64_t _insertsApplied{0};
+ int64_t _updatesApplied{0};
+ int64_t _deletesApplied{0};
+ int64_t _oplogEntriesApplied{0};
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp
new file mode 100644
index 00000000000..99844f02862
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_metrics_test.cpp
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h"
+#include "mongo/db/s/sharding_data_transform_metrics_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+class ReshardingOplogApplierMetricsTest : public ShardingDataTransformMetricsTestFixture {
+public:
+ std::unique_ptr<ReshardingMetricsNew> createInstanceMetrics() {
+ return std::make_unique<ReshardingMetricsNew>(UUID::gen(),
+ kTestCommand,
+ kTestNamespace,
+ ReshardingMetricsNew::Role::kRecipient,
+ getClockSource()->now(),
+ getClockSource(),
+ &_cumulativeMetrics);
+ }
+};
+
+TEST_F(ReshardingOplogApplierMetricsTest,
+ IncrementInsertOnApplierMetricsShouldAlsoIncrementInstance) {
+ auto metrics = createInstanceMetrics();
+
+ auto report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("insertsApplied"), 0);
+
+ ReshardingOplogApplierMetrics applierMetrics(metrics.get(), boost::none);
+ applierMetrics.onInsertApplied();
+
+ ASSERT_EQ(applierMetrics.getInsertsApplied(), 1);
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("insertsApplied"), 1);
+}
+
+TEST_F(ReshardingOplogApplierMetricsTest,
+ IncrementUpdateOnApplierMetricsShouldAlsoIncrementInstance) {
+ auto metrics = createInstanceMetrics();
+
+ auto report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("updatesApplied"), 0);
+
+ ReshardingOplogApplierMetrics applierMetrics(metrics.get(), boost::none);
+ applierMetrics.onUpdateApplied();
+
+ ASSERT_EQ(applierMetrics.getUpdatesApplied(), 1);
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("updatesApplied"), 1);
+}
+
+TEST_F(ReshardingOplogApplierMetricsTest,
+ IncrementDeleteOnApplierMetricsShouldAlsoIncrementInstance) {
+ auto metrics = createInstanceMetrics();
+
+ auto report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("deletesApplied"), 0);
+
+ ReshardingOplogApplierMetrics applierMetrics(metrics.get(), boost::none);
+ applierMetrics.onDeleteApplied();
+
+ ASSERT_EQ(applierMetrics.getDeletesApplied(), 1);
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("deletesApplied"), 1);
+}
+
+TEST_F(ReshardingOplogApplierMetricsTest, ApplierInsertProgressIncrementsIdependentlyFromInstance) {
+ auto metrics = createInstanceMetrics();
+
+ ReshardingOplogApplierProgress progressDoc;
+ progressDoc.setInsertsApplied(12);
+ ReshardingOplogApplierMetrics applierMetrics(metrics.get(), progressDoc);
+
+ ASSERT_EQ(applierMetrics.getInsertsApplied(), 12);
+ auto report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("insertsApplied"), 0);
+
+ applierMetrics.onInsertApplied();
+
+ ASSERT_EQ(applierMetrics.getInsertsApplied(), 13);
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("insertsApplied"), 1);
+}
+
+TEST_F(ReshardingOplogApplierMetricsTest, ApplierUpdateProgressIncrementsIdependentlyFromInstance) {
+ auto metrics = createInstanceMetrics();
+
+ ReshardingOplogApplierProgress progressDoc;
+ progressDoc.setUpdatesApplied(34);
+ ReshardingOplogApplierMetrics applierMetrics(metrics.get(), progressDoc);
+
+ ASSERT_EQ(applierMetrics.getUpdatesApplied(), 34);
+ auto report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("updatesApplied"), 0);
+
+ applierMetrics.onUpdateApplied();
+
+ ASSERT_EQ(applierMetrics.getUpdatesApplied(), 35);
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("updatesApplied"), 1);
+}
+
+TEST_F(ReshardingOplogApplierMetricsTest, ApplierDeleteProgressIncrementsIdependentlyFromInstance) {
+ auto metrics = createInstanceMetrics();
+
+ ReshardingOplogApplierProgress progressDoc;
+ progressDoc.setDeletesApplied(56);
+ ReshardingOplogApplierMetrics applierMetrics(metrics.get(), progressDoc);
+
+ ASSERT_EQ(applierMetrics.getDeletesApplied(), 56);
+ auto report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("deletesApplied"), 0);
+
+ applierMetrics.onDeleteApplied();
+
+ ASSERT_EQ(applierMetrics.getDeletesApplied(), 57);
+ report = metrics->reportForCurrentOp();
+ ASSERT_EQ(report.getIntField("deletesApplied"), 1);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl b/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl
index 084e85046f6..4872757ec43 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_progress.idl
@@ -59,3 +59,21 @@ structs:
description: >-
The total number of oplog entries that have been applied by this
ReshardingOplogApplier.
+ insertsApplied:
+ type: safeInt64
+ default: 0
+ description: >-
+ The total number of insert type oplog entries that have been applied by this
+ ReshardingOplogApplier.
+ updatesApplied:
+ type: safeInt64
+ default: 0
+ description: >-
+ The total number of update type oplog entries that have been applied by this
+ ReshardingOplogApplier.
+ deletesApplied:
+ type: safeInt64
+ default: 0
+ description: >-
+ The total number of delete type oplog entries that have been applied by this
+ ReshardingOplogApplier.
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index dd709560ab5..540143d9a44 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -162,6 +162,8 @@ public:
ReshardingMetricsNew::Role::kRecipient,
getServiceContext()->getFastClockSource()->now(),
getServiceContext());
+ _applierMetrics =
+ std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none);
_metrics->onStart(ReshardingMetrics::Role::kRecipient,
getServiceContext()->getFastClockSource()->now());
_metrics->setRecipientState(RecipientStateEnum::kApplying);
@@ -309,7 +311,7 @@ public:
protected:
auto makeApplierEnv() {
return std::make_unique<ReshardingOplogApplier::Env>(
- getServiceContext(), _metrics.get(), _metricsNew.get());
+ getServiceContext(), _metrics.get(), _applierMetrics.get());
}
std::shared_ptr<executor::ThreadPoolTaskExecutor> makeTaskExecutorForApplier() {
@@ -367,6 +369,7 @@ protected:
const ReshardingSourceId _sourceId{UUID::gen(), kMyShardId};
std::unique_ptr<ReshardingMetrics> _metrics;
std::unique_ptr<ReshardingMetricsNew> _metricsNew;
+ std::unique_ptr<ReshardingOplogApplierMetrics> _applierMetrics;
std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor;
std::shared_ptr<ThreadPool> _cancelableOpCtxExecutor;
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
index 3dd3f19d0e7..c1d471f0837 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
@@ -116,6 +116,8 @@ public:
ShardingDataTransformMetrics::Role::kRecipient,
serviceContext->getFastClockSource()->now(),
serviceContext);
+ _applierMetrics =
+ std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none);
_crudApplication = std::make_unique<ReshardingOplogApplicationRules>(
_outputNss,
std::vector<NamespaceString>{_myStashNss, _otherStashNss},
@@ -123,7 +125,7 @@ public:
_myDonorId,
makeChunkManagerForSourceCollection(),
_metrics.get(),
- _metricsNew.get());
+ _applierMetrics.get());
_sessionApplication =
std::make_unique<ReshardingOplogSessionApplication>(_myOplogBufferNss);
@@ -361,6 +363,7 @@ private:
std::unique_ptr<ReshardingMetrics> _metrics;
std::unique_ptr<ReshardingMetricsNew> _metricsNew;
+ std::unique_ptr<ReshardingOplogApplierMetrics> _applierMetrics;
std::unique_ptr<ReshardingOplogApplicationRules> _crudApplication;
std::unique_ptr<ReshardingOplogSessionApplication> _sessionApplication;
diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
index 2b233cb57f0..4f5bf2fe69f 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
@@ -118,6 +118,8 @@ public:
ShardingDataTransformMetrics::Role::kRecipient,
serviceContext->getFastClockSource()->now(),
serviceContext);
+ _oplogApplierMetrics =
+ std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none);
_applier = std::make_unique<ReshardingOplogApplicationRules>(
_outputNss,
std::vector<NamespaceString>{_myStashNss, _otherStashNss},
@@ -125,7 +127,7 @@ public:
_myDonorId,
makeChunkManagerForSourceCollection(),
_metrics.get(),
- _metricsNew.get());
+ _oplogApplierMetrics.get());
}
}
@@ -341,6 +343,7 @@ private:
std::unique_ptr<ReshardingOplogApplicationRules> _applier;
std::unique_ptr<ReshardingMetrics> _metrics;
std::unique_ptr<ReshardingMetricsNew> _metricsNew;
+ std::unique_ptr<ReshardingOplogApplierMetrics> _oplogApplierMetrics;
};
TEST_F(ReshardingOplogCrudApplicationTest, InsertOpInsertsIntoOuputCollection) {
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index e5206a1e972..6908554e6f1 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -547,9 +547,18 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio
auto sourceChunkMgr =
_externalState->getShardedCollectionRoutingInfo(opCtx, _metadata.getSourceNss());
+ // The metrics map can already be pre-populated if it was recovered from disk.
+ if (_applierMetricsMap.empty()) {
+ for (const auto& donor : _donorShards) {
+ _applierMetricsMap.emplace(
+ donor.getShardId(),
+ std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), boost::none));
+ }
+ }
+
return _dataReplicationFactory(opCtx,
_metrics(),
- _metricsNew.get(),
+ &_applierMetricsMap,
_metadata,
_donorShards,
*_cloneTimestamp,
@@ -1096,6 +1105,7 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
reshardingOpCtxKilledWhileRestoringMetrics.execute(
[&opCtx](const BSONObj& data) { opCtx->markKilled(); });
+ std::vector<std::pair<ShardId, ReshardingOplogApplierProgress>> progressDocList;
for (const auto& donor : _donorShards) {
{
AutoGetCollection oplogBufferColl(
@@ -1121,14 +1131,31 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
result);
if (!result.isEmpty()) {
- oplogEntriesApplied +=
- result.getField(ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName)
- .Long();
+ auto progressDoc = ReshardingOplogApplierProgress::parse(
+ IDLParserErrorContext("resharding-recipient-service-progress-doc"), result);
+ oplogEntriesApplied += progressDoc.getNumEntriesApplied();
+
+ if (ShardingDataTransformMetrics::isEnabled()) {
+ progressDocList.emplace_back(donor.getShardId(), progressDoc);
+ }
}
}
}
}
+ // Restore stats here where interrupts will never occur, this is to ensure we will only update
+ // the metrics only once.
+ for (const auto& shardIdDocPair : progressDocList) {
+ const auto& shardId = shardIdDocPair.first;
+ const auto& progressDoc = shardIdDocPair.second;
+
+ _metricsNew->accumulateFrom(progressDoc);
+
+ auto applierMetrics =
+ std::make_unique<ReshardingOplogApplierMetrics>(_metricsNew.get(), progressDoc);
+ _applierMetricsMap.emplace(shardId, std::move(applierMetrics));
+ }
+
_metrics()->restoreForCurrentOp(
documentCountCopied, documentBytesCopied, oplogEntriesFetched, oplogEntriesApplied);
}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index e1fe504a970..19dd962ace9 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -34,6 +34,7 @@
#include "mongo/db/s/resharding/resharding_data_replication.h"
#include "mongo/db/s/resharding/resharding_future_util.h"
#include "mongo/db/s/resharding/resharding_metrics_new.h"
+#include "mongo/db/s/resharding/resharding_oplog_applier_metrics.h"
#include "mongo/db/s/resharding/resharding_util.h"
#include "mongo/s/resharding/type_collection_fields_gen.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -277,6 +278,7 @@ private:
const ReshardingRecipientService* const _recipientService;
std::unique_ptr<ReshardingMetricsNew> _metricsNew;
+ ReshardingApplierMetricsMap _applierMetricsMap;
// The in-memory representation of the immutable portion of the document in
// config.localReshardingOperations.recipient.
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 43ccab32bec..518df23e17b 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -775,19 +775,18 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
unsigned int i = 0;
for (const auto& donor : donorShards) {
// Setup oplogBuffer collection.
+ ReshardingDonorOplogId donorOplogId{{20, i}, {19, 0}};
insertFn(getLocalOplogBufferNamespace(doc.getSourceUUID(), donor.getShardId()),
- InsertStatement{
- BSON("_id" << (ReshardingDonorOplogId{{20, i}, {19, 0}}).toBSON())});
+ InsertStatement{BSON("_id" << donorOplogId.toBSON())});
++i;
// Setup reshardingApplierProgress collection.
- auto progressDoc = BSON(
- ReshardingOplogApplierProgress::kOplogSourceIdFieldName
- << (ReshardingSourceId{doc.getReshardingUUID(), donor.getShardId()}).toBSON()
- << ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName
- << oplogEntriesAppliedOnEachDonor);
+ ReshardingOplogApplierProgress progressDoc(
+ {doc.getReshardingUUID(), donor.getShardId()},
+ donorOplogId,
+ oplogEntriesAppliedOnEachDonor);
insertFn(NamespaceString::kReshardingApplierProgressNamespace,
- InsertStatement{progressDoc});
+ InsertStatement{progressDoc.toBSON()});
}
}