diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-07-14 17:15:52 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-07-17 08:52:57 -0400 |
commit | 3d38a6ff86b47b71d735b77f39704adec3ef3da7 (patch) | |
tree | 8f318b2b52852a1511ed6da6ede9ac62cbe67d4d /src/mongo/db/pipeline | |
parent | a1c67941bf08c69cab04eba20bc9ce9a763e1c7f (diff) | |
download | mongo-3d38a6ff86b47b71d735b77f39704adec3ef3da7.tar.gz |
SERVER-29128 Fix performance regression on awaitData with lastKnownCommittedOpTime
Revert "Revert "SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries""
This reverts commit d29e92cffcb4db3cdd77b1e53d5d005db6cc309d.
Diffstat (limited to 'src/mongo/db/pipeline')
5 files changed, 33 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp index faf5bcf97ac..b8d7027fc5f 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification.cpp @@ -94,12 +94,10 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr !expCtx->getCollator()); BSONObj matchObj = buildMatch(elem, expCtx->ns); - BSONObj sortObj = BSON("$sort" << BSON("ts" << -1)); auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx); - auto sortSource = DocumentSourceSort::createFromBson(sortObj.firstElement(), expCtx); auto transformSource = createTransformationStage(expCtx); - return {matchSource, sortSource, transformSource}; + return {matchSource, transformSource}; } intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage( diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp index 0e2b53ea591..0fd14fd9a92 100644 --- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp @@ -94,14 +94,15 @@ public: } }; -TEST_F(ChangeNotificationStageTest, Basic) { +TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) { const auto spec = fromjson("{$changeNotification: {}}"); vector<intrusive_ptr<DocumentSource>> result = DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); - ASSERT_EQUALS(result.size(), 3UL); + ASSERT_EQUALS(result.size(), 2UL); ASSERT_EQUALS(string(result[0]->getSourceName()), "$match"); + ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification"); // TODO: Check explain result. } diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 9704d826e53..f15bafd5b8b 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -31,7 +31,6 @@ #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/catalog/collection.h" -#include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/pipeline/document.h" @@ -99,12 +98,21 @@ void DocumentSourceCursor::loadBatch() { memUsageBytes += _currentBatch.back().getApproximateSize(); - if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { + // As long as we're waiting for inserts, we shouldn't do any batching at this level + // we need the whole pipeline to see each document to see if we should stop waiting. + if (shouldWaitForInserts(pExpCtx->opCtx) || + memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); return; } } + // Special case for tailable cursor -- EOF doesn't preclude more results, so keep + // the PlanExecutor alive. + if (state == PlanExecutor::IS_EOF && pExpCtx->isTailable()) { + _exec->saveState(); + return; + } } } diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index e17128ec8a6..d04b095aa4b 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -58,6 +58,8 @@ public: std::vector<BSONObj> pipeline; }; + enum class TailableMode { kNormal, kTailableAndAwaitData }; + /** * Constructs an ExpressionContext to be used for Pipeline parsing and evaluation. * 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces. @@ -101,6 +103,13 @@ public: return it->second; }; + /** + * Convenience call that returns true if the tailableMode indicate a tailable query. + */ + bool isTailable() const { + return tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData; + } + // The explain verbosity requested by the user, or boost::none if no explain was requested. boost::optional<ExplainOptions::Verbosity> explain; @@ -121,6 +130,8 @@ public: Variables variables; VariablesParseState variablesParseState; + TailableMode tailableMode = TailableMode::kNormal; + protected: static const int kInterruptCheckPeriod = 128; ExpressionContext() : variablesParseState(variables.useIdGenerator()) {} diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index ec471f41df5..697c639ccf7 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -392,6 +392,14 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe const AggregationRequest* aggRequest, const size_t plannerOpts) { auto qr = stdx::make_unique<QueryRequest>(nss); + switch (pExpCtx->tailableMode) { + case ExpressionContext::TailableMode::kNormal: + break; + case ExpressionContext::TailableMode::kTailableAndAwaitData: + qr->setTailable(true); + qr->setAwaitData(true); + break; + } qr->setFilter(queryObj); qr->setProj(projectionObj); qr->setSort(sortObj); |