diff options
author | Sviatlana Zuiko <sviatlana.zuiko@mongodb.com> | 2023-03-20 21:47:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-20 23:11:21 +0000 |
commit | 598ebfee8e441253efed2ee4118ec8a045f75479 (patch) | |
tree | 9e31e0a7e838dab79490990498e278a858bdfcef /src/mongo/db/pipeline/plan_executor_pipeline.cpp | |
parent | 5531212069bee2a38fff2e9c926b2748fdb6b523 (diff) | |
download | mongo-598ebfee8e441253efed2ee4118ec8a045f75479.tar.gz |
Revert "SERVER-67699 Add change stream large event metrics"
This reverts commit cfef97a4f1a377613ba264eb648bb175ca5a2216.
Diffstat (limited to 'src/mongo/db/pipeline/plan_executor_pipeline.cpp')
-rw-r--r-- | src/mongo/db/pipeline/plan_executor_pipeline.cpp | 29 |
1 files changed, 8 insertions, 21 deletions
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp index 75c8753446d..4921e2bcf4a 100644 --- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp +++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp @@ -40,9 +40,6 @@ #include "mongo/util/duration.h" namespace mongo { -namespace { -CounterMetric changeStreamsLargeEventsFailedCounter("changeStreams.largeEventsFailed"); -} PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, @@ -82,7 +79,9 @@ PlanExecutor::ExecState PlanExecutorPipeline::getNext(BSONObj* objOut, RecordId* Document docOut; auto execState = getNextDocument(&docOut, nullptr); if (execState == PlanExecutor::ADVANCED) { - *objOut = _trySerializeToBson(docOut); + // Include metadata if the output will be consumed by a merging node. + *objOut = _expCtx->needsMerge || _expCtx->forPerShardCursor ? docOut.toBsonWithMetaData() + : docOut.toBson(); } return execState; } @@ -142,19 +141,6 @@ boost::optional<Document> PlanExecutorPipeline::_tryGetNext() try { return Document::fromBsonWithMetaData(extraInfo->getStartAfterInvalidateEvent()); } -BSONObj PlanExecutorPipeline::_trySerializeToBson(const Document& doc) try { - // Include metadata if the output will be consumed by a merging node. - return _expCtx->needsMerge || _expCtx->forPerShardCursor ? doc.toBsonWithMetaData() - : doc.toBson(); -} catch (const ExceptionFor<ErrorCodes::BSONObjectTooLarge>&) { - // If in a change stream pipeline, increment change stream large event failed error - // count metric. - if (ResumableScanType::kChangeStream == _resumableScanType) { - changeStreamsLargeEventsFailedCounter.increment(); - } - throw; -} - void PlanExecutorPipeline::_updateResumableScanState(const boost::optional<Document>& document) { switch (_resumableScanType) { case ResumableScanType::kChangeStream: @@ -200,8 +186,9 @@ void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional void PlanExecutorPipeline::_validateChangeStreamsResumeToken(const Document& event) const { // Confirm that the document _id field matches the original resume token in the sort key field. + auto eventBSON = event.toBson(); auto resumeToken = event.metadata().getSortKey(); - auto idField = event.getField("_id"); + auto idField = eventBSON.getObjectField("_id"); invariant(!resumeToken.missing()); uassert(ErrorCodes::ChangeStreamFatalError, str::stream() << "Encountered an event whose _id field, which contains the resume " @@ -210,9 +197,9 @@ void PlanExecutorPipeline::_validateChangeStreamsResumeToken(const Document& eve "transformations that retain the unmodified _id field are allowed. " "Expected: " << BSON("_id" << resumeToken) << " but found: " - << (idField.missing() ? BSONObj() : BSON("_id" << idField)), - resumeToken.getType() == BSONType::Object && - ValueComparator::kInstance.evaluate(idField == resumeToken)); + << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()), + (resumeToken.getType() == BSONType::Object) && + idField.binaryEqual(resumeToken.getDocument().toBson())); } void PlanExecutorPipeline::_performResumableOplogScanAccounting() { |