diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-08-30 15:57:55 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-08-30 16:21:45 -0400 |
commit | 49489604c381bd805a4f4193fa656e7d41ba770f (patch) | |
tree | e7c4c00bcd94f604ae584cdb314ac898e0c198d6 /src | |
parent | 34621e09900e3888b8b4101c9a806cf21c9eed19 (diff) | |
download | mongo-49489604c381bd805a4f4193fa656e7d41ba770f.tar.gz |
SERVER-30438 Turn on OPLOG_REPLAY for the change notification query.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.cpp | 65 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.h | 20 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.h | 2 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.h | 3 |
6 files changed, 74 insertions, 49 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index acd55b63326..cc9d0fed9c9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -37,7 +37,6 @@ #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" -#include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" @@ -82,45 +81,40 @@ namespace { static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd; -/** - * A custom subclass of DocumentSourceMatch which does not serialize itself (since it came from an - * alias) and requires itself to be the first stage in the pipeline. - */ -class DocumentSourceOplogMatch final : public DocumentSourceMatch { -public: - static intrusive_ptr<DocumentSourceOplogMatch> create( - BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceOplogMatch(std::move(filter), expCtx); - } +} // namespace - const char* getSourceName() const final { - // This is used in error reporting, particularly if we find this stage in a position other - // than first, so report the name as $changeStream. - return DocumentSourceChangeStream::kStageName.rawData(); - } +intrusive_ptr<DocumentSourceOplogMatch> DocumentSourceOplogMatch::create( + BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceOplogMatch(std::move(filter), expCtx); +} - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; - constraints.isAllowedInsideFacetStage = false; - return constraints; - } +const char* DocumentSourceOplogMatch::getSourceName() const { + // This is used in error reporting, particularly if we find this stage in a position other + // than first, so report the name as $changeStream. + return DocumentSourceChangeStream::kStageName.rawData(); +} - /** - * Only serialize this stage for explain purposes, otherwise keep it hidden so that we can - * properly alias. - */ - Value serialize(optional<ExplainOptions::Verbosity> explain) const final { - if (explain) { - return Value(Document{{kOplogMatchExplainName, Document{}}}); - } - return Value(); +DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints() const { + StageConstraints constraints; + constraints.requiredPosition = PositionRequirement::kFirst; + constraints.isAllowedInsideFacetStage = false; + return constraints; +} + +/** + * Only serialize this stage for explain purposes, otherwise keep it hidden so that we can + * properly alias. + */ +Value DocumentSourceOplogMatch::serialize(optional<ExplainOptions::Verbosity> explain) const { + if (explain) { + return Value(Document{{kOplogMatchExplainName, Document{}}}); } + return Value(); +} -private: - DocumentSourceOplogMatch(BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSourceMatch(std::move(filter), expCtx) {} -}; +DocumentSourceOplogMatch::DocumentSourceOplogMatch(BSONObj filter, + const intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceMatch(std::move(filter), expCtx) {} void checkValueType(const Value v, const StringData filedName, BSONType expectedType) { uassert(40532, @@ -131,6 +125,7 @@ void checkValueType(const Value v, const StringData filedName, BSONType expected (v.getType() == expectedType)); } +namespace { /** * This stage is used internally for change notifications to close cursor after returning * "invalidate" entries. diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index c831ec16c50..1b637abbd6c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_sources_gen.h" @@ -150,4 +151,23 @@ private: DocumentSourceChangeStream() = default; }; +/** + * A custom subclass of DocumentSourceMatch which does not serialize itself (since it came from an + * alias) and requires itself to be the first stage in the pipeline. + */ +class DocumentSourceOplogMatch final : public DocumentSourceMatch { +public: + static boost::intrusive_ptr<DocumentSourceOplogMatch> create( + BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx); + + const char* getSourceName() const final; + + StageConstraints constraints() const final; + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; + +private: + DocumentSourceOplogMatch(BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx); +}; + } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 2425978a9a0..42f9ea6615c 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -52,6 +52,7 @@ #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" @@ -392,6 +393,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe Collection* collection, const NamespaceString& nss, const intrusive_ptr<ExpressionContext>& pExpCtx, + bool oplogReplay, BSONObj queryObj, BSONObj projectionObj, BSONObj sortObj, @@ -406,6 +408,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe qr->setAwaitData(true); break; } + qr->setOplogReplay(oplogReplay); qr->setFilter(queryObj); qr->setProj(projectionObj); qr->setSort(sortObj); @@ -442,8 +445,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe return {cq.getStatus()}; } - return getExecutor( - opCtx, collection, std::move(cq.getValue()), PlanExecutor::YIELD_AUTO, plannerOpts); + return getExecutorFind( + opCtx, collection, nss, std::move(cq.getValue()), PlanExecutor::YIELD_AUTO, plannerOpts); } } // namespace @@ -500,9 +503,12 @@ void PipelineD::prepareCursorSource(Collection* collection, // Look for an initial match. This works whether we got an initial query or not. If not, it // results in a "{}" query, which will be what we want in that case. + bool oplogReplay = false; const BSONObj queryObj = pipeline->getInitialQuery(); if (!queryObj.isEmpty()) { - if (dynamic_cast<DocumentSourceMatch*>(sources.front().get())) { + auto matchStage = dynamic_cast<DocumentSourceMatch*>(sources.front().get()); + if (matchStage) { + oplogReplay = dynamic_cast<DocumentSourceOplogMatch*>(matchStage) != nullptr; // If a $match query is pulled into the cursor, the $match is redundant, and can be // removed from the pipeline. sources.pop_front(); @@ -543,6 +549,7 @@ void PipelineD::prepareCursorSource(Collection* collection, nss, pipeline, expCtx, + oplogReplay, sortStage, deps, queryObj, @@ -571,6 +578,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep const NamespaceString& nss, Pipeline* pipeline, const intrusive_ptr<ExpressionContext>& expCtx, + bool oplogReplay, const intrusive_ptr<DocumentSourceSort>& sortStage, const DepsTracker& deps, const BSONObj& queryObj, @@ -597,12 +605,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep // order so we can then apply other optimizations there are tickets for, such as SERVER-4507. size_t plannerOpts = QueryPlannerParams::DEFAULT | QueryPlannerParams::NO_BLOCKING_SORT; - // If we are connecting directly to the shard rather than through a mongos, don't filter out - // orphaned documents. - if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, nss.ns())) { - plannerOpts |= QueryPlannerParams::INCLUDE_SHARD_FILTER; - } - if (deps.hasNoRequirements()) { // If we don't need any fields from the input document, performing a count is faster, and // will output empty documents, which is okay. @@ -623,6 +625,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep collection, nss, expCtx, + oplogReplay, queryObj, emptyProjection, *sortObj, @@ -635,6 +638,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep collection, nss, expCtx, + oplogReplay, queryObj, *projectionObj, *sortObj, @@ -684,6 +688,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep collection, nss, expCtx, + oplogReplay, queryObj, *projectionObj, *sortObj, @@ -706,6 +711,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep collection, nss, expCtx, + oplogReplay, queryObj, *projectionObj, *sortObj, diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 3248459fbf4..81b095ca5b1 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -39,6 +39,7 @@ namespace mongo { class Collection; class DocumentSourceCursor; +class DocumentSourceMatch; class DocumentSourceSort; class ExpressionContext; class OperationContext; @@ -102,6 +103,7 @@ private: const NamespaceString& nss, Pipeline* pipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, + bool oplogReplay, const boost::intrusive_ptr<DocumentSourceSort>& sortStage, const DepsTracker& deps, const BSONObj& queryObj, diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 60d3b166d04..8cdeb17c9b0 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -119,6 +119,7 @@ void filterAllowedIndexEntries(const AllowedIndicesFilter& allowedIndicesFilter, namespace { // The body is below in the "count hack" section but getExecutor calls it. bool turnIxscanIntoCount(QuerySolution* soln); + } // namespace @@ -624,17 +625,17 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( Collection* collection, const NamespaceString& nss, unique_ptr<CanonicalQuery> canonicalQuery, - PlanExecutor::YieldPolicy yieldPolicy) { + PlanExecutor::YieldPolicy yieldPolicy, + size_t plannerOptions) { if (NULL != collection && canonicalQuery->getQueryRequest().isOplogReplay()) { return getOplogStartHack(opCtx, collection, std::move(canonicalQuery)); } - size_t options = QueryPlannerParams::DEFAULT; if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, nss.ns())) { - options |= QueryPlannerParams::INCLUDE_SHARD_FILTER; + plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } return getExecutor( - opCtx, collection, std::move(canonicalQuery), PlanExecutor::YIELD_AUTO, options); + opCtx, collection, std::move(canonicalQuery), PlanExecutor::YIELD_AUTO, plannerOptions); } namespace { diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index daf079bb56f..34857084bf9 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -91,7 +91,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind Collection* collection, const NamespaceString& nss, std::unique_ptr<CanonicalQuery> canonicalQuery, - PlanExecutor::YieldPolicy yieldPolicy); + PlanExecutor::YieldPolicy yieldPolicy, + size_t plannerOptions = QueryPlannerParams::DEFAULT); /** * If possible, turn the provided QuerySolution into a QuerySolution that uses a DistinctNode |