summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/plan_executor_pipeline.cpp
diff options
context:
space:
mode:
authorSviatlana Zuiko <sviatlana.zuiko@mongodb.com>2023-03-20 21:47:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-20 23:11:21 +0000
commit598ebfee8e441253efed2ee4118ec8a045f75479 (patch)
tree9e31e0a7e838dab79490990498e278a858bdfcef /src/mongo/db/pipeline/plan_executor_pipeline.cpp
parent5531212069bee2a38fff2e9c926b2748fdb6b523 (diff)
downloadmongo-598ebfee8e441253efed2ee4118ec8a045f75479.tar.gz
Revert "SERVER-67699 Add change stream large event metrics"
This reverts commit cfef97a4f1a377613ba264eb648bb175ca5a2216.
Diffstat (limited to 'src/mongo/db/pipeline/plan_executor_pipeline.cpp')
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.cpp29
1 files changed, 8 insertions, 21 deletions
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
index 75c8753446d..4921e2bcf4a 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
@@ -40,9 +40,6 @@
#include "mongo/util/duration.h"
namespace mongo {
-namespace {
-CounterMetric changeStreamsLargeEventsFailedCounter("changeStreams.largeEventsFailed");
-}
PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
@@ -82,7 +79,9 @@ PlanExecutor::ExecState PlanExecutorPipeline::getNext(BSONObj* objOut, RecordId*
Document docOut;
auto execState = getNextDocument(&docOut, nullptr);
if (execState == PlanExecutor::ADVANCED) {
- *objOut = _trySerializeToBson(docOut);
+ // Include metadata if the output will be consumed by a merging node.
+ *objOut = _expCtx->needsMerge || _expCtx->forPerShardCursor ? docOut.toBsonWithMetaData()
+ : docOut.toBson();
}
return execState;
}
@@ -142,19 +141,6 @@ boost::optional<Document> PlanExecutorPipeline::_tryGetNext() try {
return Document::fromBsonWithMetaData(extraInfo->getStartAfterInvalidateEvent());
}
-BSONObj PlanExecutorPipeline::_trySerializeToBson(const Document& doc) try {
- // Include metadata if the output will be consumed by a merging node.
- return _expCtx->needsMerge || _expCtx->forPerShardCursor ? doc.toBsonWithMetaData()
- : doc.toBson();
-} catch (const ExceptionFor<ErrorCodes::BSONObjectTooLarge>&) {
- // If in a change stream pipeline, increment change stream large event failed error
- // count metric.
- if (ResumableScanType::kChangeStream == _resumableScanType) {
- changeStreamsLargeEventsFailedCounter.increment();
- }
- throw;
-}
-
void PlanExecutorPipeline::_updateResumableScanState(const boost::optional<Document>& document) {
switch (_resumableScanType) {
case ResumableScanType::kChangeStream:
@@ -200,8 +186,9 @@ void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional
void PlanExecutorPipeline::_validateChangeStreamsResumeToken(const Document& event) const {
// Confirm that the document _id field matches the original resume token in the sort key field.
+ auto eventBSON = event.toBson();
auto resumeToken = event.metadata().getSortKey();
- auto idField = event.getField("_id");
+ auto idField = eventBSON.getObjectField("_id");
invariant(!resumeToken.missing());
uassert(ErrorCodes::ChangeStreamFatalError,
str::stream() << "Encountered an event whose _id field, which contains the resume "
@@ -210,9 +197,9 @@ void PlanExecutorPipeline::_validateChangeStreamsResumeToken(const Document& eve
"transformations that retain the unmodified _id field are allowed. "
"Expected: "
<< BSON("_id" << resumeToken) << " but found: "
- << (idField.missing() ? BSONObj() : BSON("_id" << idField)),
- resumeToken.getType() == BSONType::Object &&
- ValueComparator::kInstance.evaluate(idField == resumeToken));
+ << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()),
+ (resumeToken.getType() == BSONType::Object) &&
+ idField.binaryEqual(resumeToken.getDocument().toBson()));
}
void PlanExecutorPipeline::_performResumableOplogScanAccounting() {