summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-08-30 15:57:55 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-08-30 16:21:45 -0400
commit49489604c381bd805a4f4193fa656e7d41ba770f (patch)
treee7c4c00bcd94f604ae584cdb314ac898e0c198d6
parent34621e09900e3888b8b4101c9a806cf21c9eed19 (diff)
downloadmongo-49489604c381bd805a4f4193fa656e7d41ba770f.tar.gz
SERVER-30438 Turn on OPLOG_REPLAY for the change notification query.
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp65
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h20
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp24
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h2
-rw-r--r--src/mongo/db/query/get_executor.cpp9
-rw-r--r--src/mongo/db/query/get_executor.h3
6 files changed, 74 insertions, 49 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index acd55b63326..cc9d0fed9c9 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -37,7 +37,6 @@
#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
-#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
@@ -82,45 +81,40 @@ namespace {
static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd;
-/**
- * A custom subclass of DocumentSourceMatch which does not serialize itself (since it came from an
- * alias) and requires itself to be the first stage in the pipeline.
- */
-class DocumentSourceOplogMatch final : public DocumentSourceMatch {
-public:
- static intrusive_ptr<DocumentSourceOplogMatch> create(
- BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceOplogMatch(std::move(filter), expCtx);
- }
+} // namespace
- const char* getSourceName() const final {
- // This is used in error reporting, particularly if we find this stage in a position other
- // than first, so report the name as $changeStream.
- return DocumentSourceChangeStream::kStageName.rawData();
- }
+intrusive_ptr<DocumentSourceOplogMatch> DocumentSourceOplogMatch::create(
+ BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceOplogMatch(std::move(filter), expCtx);
+}
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
- constraints.isAllowedInsideFacetStage = false;
- return constraints;
- }
+const char* DocumentSourceOplogMatch::getSourceName() const {
+ // This is used in error reporting, particularly if we find this stage in a position other
+ // than first, so report the name as $changeStream.
+ return DocumentSourceChangeStream::kStageName.rawData();
+}
- /**
- * Only serialize this stage for explain purposes, otherwise keep it hidden so that we can
- * properly alias.
- */
- Value serialize(optional<ExplainOptions::Verbosity> explain) const final {
- if (explain) {
- return Value(Document{{kOplogMatchExplainName, Document{}}});
- }
- return Value();
+DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints() const {
+ StageConstraints constraints;
+ constraints.requiredPosition = PositionRequirement::kFirst;
+ constraints.isAllowedInsideFacetStage = false;
+ return constraints;
+}
+
+/**
+ * Only serialize this stage for explain purposes, otherwise keep it hidden so that we can
+ * properly alias.
+ */
+Value DocumentSourceOplogMatch::serialize(optional<ExplainOptions::Verbosity> explain) const {
+ if (explain) {
+ return Value(Document{{kOplogMatchExplainName, Document{}}});
}
+ return Value();
+}
-private:
- DocumentSourceOplogMatch(BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSourceMatch(std::move(filter), expCtx) {}
-};
+DocumentSourceOplogMatch::DocumentSourceOplogMatch(BSONObj filter,
+ const intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceMatch(std::move(filter), expCtx) {}
void checkValueType(const Value v, const StringData filedName, BSONType expectedType) {
uassert(40532,
@@ -131,6 +125,7 @@ void checkValueType(const Value v, const StringData filedName, BSONType expected
(v.getType() == expectedType));
}
+namespace {
/**
* This stage is used internally for change notifications to close cursor after returning
* "invalidate" entries.
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index c831ec16c50..1b637abbd6c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_sources_gen.h"
@@ -150,4 +151,23 @@ private:
DocumentSourceChangeStream() = default;
};
+/**
+ * A custom subclass of DocumentSourceMatch which does not serialize itself (since it came from an
+ * alias) and requires itself to be the first stage in the pipeline.
+ */
+class DocumentSourceOplogMatch final : public DocumentSourceMatch {
+public:
+ static boost::intrusive_ptr<DocumentSourceOplogMatch> create(
+ BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ const char* getSourceName() const final;
+
+ StageConstraints constraints() const final;
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
+
+private:
+ DocumentSourceOplogMatch(BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+};
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 2425978a9a0..42f9ea6615c 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -52,6 +52,7 @@
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
@@ -392,6 +393,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
Collection* collection,
const NamespaceString& nss,
const intrusive_ptr<ExpressionContext>& pExpCtx,
+ bool oplogReplay,
BSONObj queryObj,
BSONObj projectionObj,
BSONObj sortObj,
@@ -406,6 +408,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
qr->setAwaitData(true);
break;
}
+ qr->setOplogReplay(oplogReplay);
qr->setFilter(queryObj);
qr->setProj(projectionObj);
qr->setSort(sortObj);
@@ -442,8 +445,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
return {cq.getStatus()};
}
- return getExecutor(
- opCtx, collection, std::move(cq.getValue()), PlanExecutor::YIELD_AUTO, plannerOpts);
+ return getExecutorFind(
+ opCtx, collection, nss, std::move(cq.getValue()), PlanExecutor::YIELD_AUTO, plannerOpts);
}
} // namespace
@@ -500,9 +503,12 @@ void PipelineD::prepareCursorSource(Collection* collection,
// Look for an initial match. This works whether we got an initial query or not. If not, it
// results in a "{}" query, which will be what we want in that case.
+ bool oplogReplay = false;
const BSONObj queryObj = pipeline->getInitialQuery();
if (!queryObj.isEmpty()) {
- if (dynamic_cast<DocumentSourceMatch*>(sources.front().get())) {
+ auto matchStage = dynamic_cast<DocumentSourceMatch*>(sources.front().get());
+ if (matchStage) {
+ oplogReplay = dynamic_cast<DocumentSourceOplogMatch*>(matchStage) != nullptr;
// If a $match query is pulled into the cursor, the $match is redundant, and can be
// removed from the pipeline.
sources.pop_front();
@@ -543,6 +549,7 @@ void PipelineD::prepareCursorSource(Collection* collection,
nss,
pipeline,
expCtx,
+ oplogReplay,
sortStage,
deps,
queryObj,
@@ -571,6 +578,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
const NamespaceString& nss,
Pipeline* pipeline,
const intrusive_ptr<ExpressionContext>& expCtx,
+ bool oplogReplay,
const intrusive_ptr<DocumentSourceSort>& sortStage,
const DepsTracker& deps,
const BSONObj& queryObj,
@@ -597,12 +605,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
// order so we can then apply other optimizations there are tickets for, such as SERVER-4507.
size_t plannerOpts = QueryPlannerParams::DEFAULT | QueryPlannerParams::NO_BLOCKING_SORT;
- // If we are connecting directly to the shard rather than through a mongos, don't filter out
- // orphaned documents.
- if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, nss.ns())) {
- plannerOpts |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
- }
-
if (deps.hasNoRequirements()) {
// If we don't need any fields from the input document, performing a count is faster, and
// will output empty documents, which is okay.
@@ -623,6 +625,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
collection,
nss,
expCtx,
+ oplogReplay,
queryObj,
emptyProjection,
*sortObj,
@@ -635,6 +638,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
collection,
nss,
expCtx,
+ oplogReplay,
queryObj,
*projectionObj,
*sortObj,
@@ -684,6 +688,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
collection,
nss,
expCtx,
+ oplogReplay,
queryObj,
*projectionObj,
*sortObj,
@@ -706,6 +711,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
collection,
nss,
expCtx,
+ oplogReplay,
queryObj,
*projectionObj,
*sortObj,
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 3248459fbf4..81b095ca5b1 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -39,6 +39,7 @@
namespace mongo {
class Collection;
class DocumentSourceCursor;
+class DocumentSourceMatch;
class DocumentSourceSort;
class ExpressionContext;
class OperationContext;
@@ -102,6 +103,7 @@ private:
const NamespaceString& nss,
Pipeline* pipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ bool oplogReplay,
const boost::intrusive_ptr<DocumentSourceSort>& sortStage,
const DepsTracker& deps,
const BSONObj& queryObj,
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 60d3b166d04..8cdeb17c9b0 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -119,6 +119,7 @@ void filterAllowedIndexEntries(const AllowedIndicesFilter& allowedIndicesFilter,
namespace {
// The body is below in the "count hack" section but getExecutor calls it.
bool turnIxscanIntoCount(QuerySolution* soln);
+
} // namespace
@@ -624,17 +625,17 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind(
Collection* collection,
const NamespaceString& nss,
unique_ptr<CanonicalQuery> canonicalQuery,
- PlanExecutor::YieldPolicy yieldPolicy) {
+ PlanExecutor::YieldPolicy yieldPolicy,
+ size_t plannerOptions) {
if (NULL != collection && canonicalQuery->getQueryRequest().isOplogReplay()) {
return getOplogStartHack(opCtx, collection, std::move(canonicalQuery));
}
- size_t options = QueryPlannerParams::DEFAULT;
if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, nss.ns())) {
- options |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
+ plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
}
return getExecutor(
- opCtx, collection, std::move(canonicalQuery), PlanExecutor::YIELD_AUTO, options);
+ opCtx, collection, std::move(canonicalQuery), PlanExecutor::YIELD_AUTO, plannerOptions);
}
namespace {
diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h
index daf079bb56f..34857084bf9 100644
--- a/src/mongo/db/query/get_executor.h
+++ b/src/mongo/db/query/get_executor.h
@@ -91,7 +91,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind
Collection* collection,
const NamespaceString& nss,
std::unique_ptr<CanonicalQuery> canonicalQuery,
- PlanExecutor::YieldPolicy yieldPolicy);
+ PlanExecutor::YieldPolicy yieldPolicy,
+ size_t plannerOptions = QueryPlannerParams::DEFAULT);
/**
* If possible, turn the provided QuerySolution into a QuerySolution that uses a DistinctNode