summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-07-14 17:15:52 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-07-17 08:52:57 -0400
commit3d38a6ff86b47b71d735b77f39704adec3ef3da7 (patch)
tree8f318b2b52852a1511ed6da6ede9ac62cbe67d4d /src/mongo/db/pipeline
parenta1c67941bf08c69cab04eba20bc9ce9a763e1c7f (diff)
downloadmongo-3d38a6ff86b47b71d735b77f39704adec3ef3da7.tar.gz
SERVER-29128 Fix performance regression on awaitData with lastKnownCommittedOpTime
Revert "Revert "SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries"" This reverts commit d29e92cffcb4db3cdd77b1e53d5d005db6cc309d.
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification_test.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp12
-rw-r--r--src/mongo/db/pipeline/expression_context.h11
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp8
5 files changed, 33 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp
index faf5bcf97ac..b8d7027fc5f 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification.cpp
@@ -94,12 +94,10 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr
!expCtx->getCollator());
BSONObj matchObj = buildMatch(elem, expCtx->ns);
- BSONObj sortObj = BSON("$sort" << BSON("ts" << -1));
auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx);
- auto sortSource = DocumentSourceSort::createFromBson(sortObj.firstElement(), expCtx);
auto transformSource = createTransformationStage(expCtx);
- return {matchSource, sortSource, transformSource};
+ return {matchSource, transformSource};
}
intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage(
diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
index 0e2b53ea591..0fd14fd9a92 100644
--- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
@@ -94,14 +94,15 @@ public:
}
};
-TEST_F(ChangeNotificationStageTest, Basic) {
+TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) {
const auto spec = fromjson("{$changeNotification: {}}");
vector<intrusive_ptr<DocumentSource>> result =
DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
- ASSERT_EQUALS(result.size(), 3UL);
+ ASSERT_EQUALS(result.size(), 2UL);
ASSERT_EQUALS(string(result[0]->getSourceName()), "$match");
+ ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification");
// TODO: Check explain result.
}
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 9704d826e53..f15bafd5b8b 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -31,7 +31,6 @@
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/catalog/collection.h"
-#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/pipeline/document.h"
@@ -99,12 +98,21 @@ void DocumentSourceCursor::loadBatch() {
memUsageBytes += _currentBatch.back().getApproximateSize();
- if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
+ // As long as we're waiting for inserts, we shouldn't do any batching at this level
+ // we need the whole pipeline to see each document to see if we should stop waiting.
+ if (shouldWaitForInserts(pExpCtx->opCtx) ||
+ memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
return;
}
}
+ // Special case for tailable cursor -- EOF doesn't preclude more results, so keep
+ // the PlanExecutor alive.
+ if (state == PlanExecutor::IS_EOF && pExpCtx->isTailable()) {
+ _exec->saveState();
+ return;
+ }
}
}
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index e17128ec8a6..d04b095aa4b 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -58,6 +58,8 @@ public:
std::vector<BSONObj> pipeline;
};
+ enum class TailableMode { kNormal, kTailableAndAwaitData };
+
/**
* Constructs an ExpressionContext to be used for Pipeline parsing and evaluation.
* 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces.
@@ -101,6 +103,13 @@ public:
return it->second;
};
+ /**
+ * Convenience call that returns true if the tailableMode indicate a tailable query.
+ */
+ bool isTailable() const {
+ return tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData;
+ }
+
// The explain verbosity requested by the user, or boost::none if no explain was requested.
boost::optional<ExplainOptions::Verbosity> explain;
@@ -121,6 +130,8 @@ public:
Variables variables;
VariablesParseState variablesParseState;
+ TailableMode tailableMode = TailableMode::kNormal;
+
protected:
static const int kInterruptCheckPeriod = 128;
ExpressionContext() : variablesParseState(variables.useIdGenerator()) {}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index ec471f41df5..697c639ccf7 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -392,6 +392,14 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
const AggregationRequest* aggRequest,
const size_t plannerOpts) {
auto qr = stdx::make_unique<QueryRequest>(nss);
+ switch (pExpCtx->tailableMode) {
+ case ExpressionContext::TailableMode::kNormal:
+ break;
+ case ExpressionContext::TailableMode::kTailableAndAwaitData:
+ qr->setTailable(true);
+ qr->setAwaitData(true);
+ break;
+ }
qr->setFilter(queryObj);
qr->setProj(projectionObj);
qr->setSort(sortObj);