diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/exec/collection_scan.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/exec/collection_scan_common.h | 8 | ||||
-rw-r--r-- | src/mongo/db/exec/plan_stats.h | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.h | 1 | ||||
-rw-r--r-- | src/mongo/db/query/explain.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.cpp | 122 | ||||
-rw-r--r-- | src/mongo/db/query/internal_plans.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/query/internal_plans.h | 9 | ||||
-rw-r--r-- | src/mongo/db/query/planner_access.cpp | 80 | ||||
-rw-r--r-- | src/mongo/db/query/query_solution.h | 14 | ||||
-rw-r--r-- | src/mongo/db/query/stage_builder.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/transaction_history_iterator.cpp | 1 | ||||
-rw-r--r-- | src/mongo/dbtests/querytests.cpp | 212 | ||||
-rw-r--r-- | src/mongo/shell/query.js | 21 |
16 files changed, 277 insertions, 266 deletions
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index e06493b39eb..dcd07553bc8 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -42,6 +42,7 @@ #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/storage/oplog_hack.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -66,12 +67,20 @@ CollectionScan::CollectionScan(OperationContext* opCtx, _params(params) { // Explain reports the direction of the collection scan. _specificStats.direction = params.direction; + _specificStats.minTs = params.minTs; _specificStats.maxTs = params.maxTs; _specificStats.tailable = params.tailable; + if (params.minTs || params.maxTs) { + // The 'minTs' and 'maxTs' parameters are used for a special optimization that + // applies only to forwards scans of the oplog. + invariant(params.direction == CollectionScanParams::FORWARD); + invariant(collection->ns().isOplog()); + } invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog()); + // Set early stop condition. if (params.maxTs) { - _endConditionBSON = BSON("$gte" << *(params.maxTs)); + _endConditionBSON = BSON("$gte"_sd << *(params.maxTs)); _endCondition = std::make_unique<GTEMatchExpression>(repl::OpTime::kTimestampFieldName, _endConditionBSON.firstElement()); } @@ -129,9 +138,20 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { return PlanStage::NEED_TIME; } - if (_lastSeenId.isNull() && !_params.start.isNull()) { - record = _cursor->seekExact(_params.start); - } else { + if (_lastSeenId.isNull() && _params.minTs) { + // See if the RecordStore supports the oplogStartHack. + StatusWith<RecordId> goal = oploghack::keyForOptime(*_params.minTs); + if (goal.isOK()) { + boost::optional<RecordId> startLoc = + collection()->getRecordStore()->oplogStartHack(getOpCtx(), goal.getValue()); + if (startLoc && !startLoc->isNull()) { + LOG(3) << "Using direct oplog seek"; + record = _cursor->seekExact(*startLoc); + } + } + } + + if (!record) { record = _cursor->next(); } } catch (const WriteConflictException&) { @@ -190,7 +210,6 @@ PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member, WorkingSetID memberID, WorkingSetID* out) { ++_specificStats.docsTested; - if (Filter::passes(member, _filter)) { if (_params.stopApplyingFilterAfterFirstMatch) { _filter = nullptr; diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h index b655872f6fa..55307ba70c8 100644 --- a/src/mongo/db/exec/collection_scan_common.h +++ b/src/mongo/db/exec/collection_scan_common.h @@ -42,11 +42,13 @@ struct CollectionScanParams { BACKWARD = -1, }; - // The RecordId to which we should seek to as the first document of the scan. - RecordId start; + // If present, the collection scan will seek directly to the RecordId of an oplog entry as + // close to 'minTs' as possible without going higher. Must only be set on forward oplog scans. + boost::optional<Timestamp> minTs; // If present, the collection scan will stop and return EOF the first time it sees a document - // that does not pass the filter and has 'ts' greater than 'maxTs'. + // that does not pass the filter and has 'ts' greater than 'maxTs'. Must only be set on forward + // oplog scans. boost::optional<Timestamp> maxTs; Direction direction = FORWARD; diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h index 08d623ffc44..938a633e342 100644 --- a/src/mongo/db/exec/plan_stats.h +++ b/src/mongo/db/exec/plan_stats.h @@ -200,9 +200,12 @@ struct CollectionScanStats : public SpecificStats { bool tailable{false}; - // If present, indicates that the collection scan will stop and return EOF the first time it - // sees a document that does not pass the filter and has a "ts" Timestamp field greater than - // 'maxTs'. + // The start location of the scan. Must only be set on forward oplog scans. + boost::optional<Timestamp> minTs; + + // Indicates that the collection scan will stop and return EOF the first time it sees a + // document that does not pass the filter and has a "ts" Timestamp field greater than 'maxTs'. + // Must only be set on forward oplog scans. boost::optional<Timestamp> maxTs; }; diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 5f9ca9ef2ea..2da744ae88e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -381,6 +381,10 @@ DepsTracker::State DocumentSourceChangeStreamTransform::getDependencies(DepsTrac deps->fields.insert(repl::OplogEntry::kUuidFieldName.toString()); deps->fields.insert(repl::OplogEntry::kObjectFieldName.toString()); deps->fields.insert(repl::OplogEntry::kObject2FieldName.toString()); + deps->fields.insert(repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName.toString()); + deps->fields.insert(repl::OplogEntry::kSessionIdFieldName.toString()); + deps->fields.insert(repl::OplogEntry::kTermFieldName.toString()); + deps->fields.insert(repl::OplogEntry::kTxnNumberFieldName.toString()); return DepsTracker::State::EXHAUSTIVE_ALL; } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index d6e3557bcd9..f133a67c9bc 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -168,7 +168,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe Collection* collection, const NamespaceString& nss, const intrusive_ptr<ExpressionContext>& pExpCtx, - bool oplogReplay, BSONObj queryObj, BSONObj projectionObj, BSONObj sortObj, @@ -178,7 +177,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe const MatchExpressionParser::AllowedFeatureSet& matcherFeatures) { auto qr = std::make_unique<QueryRequest>(nss); qr->setTailableMode(pExpCtx->tailableMode); - qr->setOplogReplay(oplogReplay); qr->setFilter(queryObj); qr->setProj(projectionObj); qr->setSort(sortObj); @@ -442,12 +440,10 @@ PipelineD::buildInnerQueryExecutorGeneric(Collection* collection, // Look for an initial match. This works whether we got an initial query or not. If not, it // results in a "{}" query, which will be what we want in that case. - bool oplogReplay = false; const BSONObj queryObj = pipeline->getInitialQuery(); if (!queryObj.isEmpty()) { auto matchStage = dynamic_cast<DocumentSourceMatch*>(sources.front().get()); if (matchStage) { - oplogReplay = dynamic_cast<DocumentSourceOplogMatch*>(matchStage) != nullptr; // If a $match query is pulled into the cursor, the $match is redundant, and can be // removed from the pipeline. sources.pop_front(); @@ -487,7 +483,6 @@ PipelineD::buildInnerQueryExecutorGeneric(Collection* collection, nss, pipeline, expCtx, - oplogReplay, sortStage, std::move(rewrittenGroupStage), deps, @@ -558,7 +553,6 @@ PipelineD::buildInnerQueryExecutorGeoNear(Collection* collection, nss, pipeline, expCtx, - false, /* oplogReplay */ nullptr, /* sortStage */ nullptr, /* rewrittenGroupStage */ deps, @@ -600,7 +594,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep const NamespaceString& nss, Pipeline* pipeline, const intrusive_ptr<ExpressionContext>& expCtx, - bool oplogReplay, const boost::intrusive_ptr<DocumentSourceSort>& sortStage, std::unique_ptr<GroupFromFirstDocumentTransformation> rewrittenGroupStage, const DepsTracker& deps, @@ -652,7 +645,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep collection, nss, expCtx, - oplogReplay, queryObj, *projectionObj, sortObj ? *sortObj : emptySort, @@ -708,7 +700,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep collection, nss, expCtx, - oplogReplay, queryObj, expCtx->needsMerge ? metaSortProjection : emptyProjection, *sortObj, @@ -724,7 +715,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep collection, nss, expCtx, - oplogReplay, queryObj, *projectionObj, *sortObj, @@ -787,7 +777,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep collection, nss, expCtx, - oplogReplay, queryObj, *projectionObj, *sortObj, @@ -812,7 +801,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep collection, nss, expCtx, - oplogReplay, queryObj, *projectionObj, *sortObj, diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index a4db94f74b4..9b8f618c12f 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -166,7 +166,6 @@ private: const NamespaceString& nss, Pipeline* pipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, - bool oplogReplay, const boost::intrusive_ptr<DocumentSourceSort>& sortStage, std::unique_ptr<GroupFromFirstDocumentTransformation> rewrittenGroupStage, const DepsTracker& deps, diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp index eb109928be4..be8a1c95cdd 100644 --- a/src/mongo/db/query/explain.cpp +++ b/src/mongo/db/query/explain.cpp @@ -361,6 +361,9 @@ void Explain::statsToBSON(const PlanStageStats& stats, } else if (STAGE_COLLSCAN == stats.stageType) { CollectionScanStats* spec = static_cast<CollectionScanStats*>(stats.specific.get()); bob->append("direction", spec->direction > 0 ? "forward" : "backward"); + if (spec->minTs) { + bob->append("minTs", *(spec->minTs)); + } if (spec->maxTs) { bob->append("maxTs", *(spec->maxTs)); } diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 82ceb978a29..73df3c9876d 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -81,7 +81,6 @@ #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" -#include "mongo/db/storage/oplog_hack.h" #include "mongo/db/storage/storage_options.h" #include "mongo/scripting/engine.h" #include "mongo/util/log.h" @@ -622,133 +621,12 @@ bool isOplogTsLowerBoundPred(const mongo::MatchExpression* me) { return me->path() == repl::OpTime::kTimestampFieldName; } -/** - * Extracts the lower and upper bounds on the "ts" field from 'me'. This only examines comparisons - * of "ts" against a Timestamp at the top level or inside a top-level $and. - */ -std::pair<boost::optional<Timestamp>, boost::optional<Timestamp>> extractTsRange( - const MatchExpression* me, bool topLevel = true) { - boost::optional<Timestamp> min; - boost::optional<Timestamp> max; - - if (me->matchType() == MatchExpression::AND && topLevel) { - for (size_t i = 0; i < me->numChildren(); ++i) { - boost::optional<Timestamp> childMin; - boost::optional<Timestamp> childMax; - std::tie(childMin, childMax) = extractTsRange(me->getChild(i), false); - if (childMin && (!min || childMin.get() > min.get())) { - min = childMin; - } - if (childMax && (!max || childMax.get() < max.get())) { - max = childMax; - } - } - return {min, max}; - } - - if (!ComparisonMatchExpression::isComparisonMatchExpression(me) || - me->path() != repl::OpTime::kTimestampFieldName) { - return {min, max}; - } - - auto rawElem = static_cast<const ComparisonMatchExpression*>(me)->getData(); - if (rawElem.type() != BSONType::bsonTimestamp) { - return {min, max}; - } - - switch (me->matchType()) { - case MatchExpression::EQ: - min = rawElem.timestamp(); - max = rawElem.timestamp(); - return {min, max}; - case MatchExpression::GT: - case MatchExpression::GTE: - min = rawElem.timestamp(); - return {min, max}; - case MatchExpression::LT: - case MatchExpression::LTE: - max = rawElem.timestamp(); - return {min, max}; - default: - MONGO_UNREACHABLE; - } -} - -StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack( - OperationContext* opCtx, - Collection* collection, - unique_ptr<CanonicalQuery> cq, - size_t plannerOptions, - PlanExecutor::YieldPolicy yieldPolicy) { - invariant(collection); - invariant(cq.get()); - - if (!collection->isCapped()) { - return Status(ErrorCodes::BadValue, - "OplogReplay cursor requested on non-capped collection"); - } - - // If the canonical query does not have a user-specified collation, set it from the collection - // default. - if (cq->getQueryRequest().getCollation().isEmpty() && collection->getDefaultCollator()) { - cq->setCollator(collection->getDefaultCollator()->clone()); - } - - boost::optional<Timestamp> minTs, maxTs; - std::tie(minTs, maxTs) = extractTsRange(cq->root()); - - if (!minTs) { - return Status(ErrorCodes::OplogOperationUnsupported, - "OplogReplay query does not contain top-level " - "$eq, $gt, or $gte over the 'ts' field."); - } - - boost::optional<RecordId> startLoc = boost::none; - - // See if the RecordStore supports the oplogStartHack. - StatusWith<RecordId> goal = oploghack::keyForOptime(*minTs); - if (goal.isOK()) { - startLoc = collection->getRecordStore()->oplogStartHack(opCtx, goal.getValue()); - } - - // Build our collection scan. - CollectionScanParams params; - if (startLoc) { - LOG(3) << "Using direct oplog seek"; - params.start = *startLoc; - } - params.maxTs = maxTs; - params.direction = CollectionScanParams::FORWARD; - params.tailable = cq->getQueryRequest().isTailable(); - params.shouldTrackLatestOplogTimestamp = - plannerOptions & QueryPlannerParams::TRACK_LATEST_OPLOG_TS; - params.shouldWaitForOplogVisibility = - shouldWaitForOplogVisibility(opCtx, collection, params.tailable); - - // If the query is just a lower bound on "ts", we know that every document in the collection - // after the first matching one must also match. To avoid wasting time running the match - // expression on every document to be returned, we tell the CollectionScan stage to stop - // applying the filter once it finds the first match. - if (isOplogTsLowerBoundPred(cq->root())) { - params.stopApplyingFilterAfterFirstMatch = true; - } - - auto ws = std::make_unique<WorkingSet>(); - auto cs = std::make_unique<CollectionScan>(opCtx, collection, params, ws.get(), cq->root()); - return PlanExecutor::make( - opCtx, std::move(ws), std::move(cs), std::move(cq), collection, yieldPolicy); -} - StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind( OperationContext* opCtx, Collection* collection, unique_ptr<CanonicalQuery> canonicalQuery, PlanExecutor::YieldPolicy yieldPolicy, size_t plannerOptions) { - if (nullptr != collection && canonicalQuery->getQueryRequest().isOplogReplay()) { - return getOplogStartHack( - opCtx, collection, std::move(canonicalQuery), plannerOptions, yieldPolicy); - } if (OperationShardingState::isOperationVersioned(opCtx)) { plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER; diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp index 2fa5359bd48..594c23ada38 100644 --- a/src/mongo/db/query/internal_plans.cpp +++ b/src/mongo/db/query/internal_plans.cpp @@ -51,8 +51,7 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection StringData ns, Collection* collection, PlanExecutor::YieldPolicy yieldPolicy, - const Direction direction, - const RecordId startLoc) { + const Direction direction) { std::unique_ptr<WorkingSet> ws = std::make_unique<WorkingSet>(); if (nullptr == collection) { @@ -66,7 +65,7 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection invariant(ns == collection->ns().ns()); - auto cs = _collectionScan(opCtx, ws.get(), collection, direction, startLoc); + auto cs = _collectionScan(opCtx, ws.get(), collection, direction); // Takes ownership of 'ws' and 'cs'. auto statusWithPlanExecutor = @@ -80,12 +79,11 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith Collection* collection, std::unique_ptr<DeleteStageParams> params, PlanExecutor::YieldPolicy yieldPolicy, - Direction direction, - const RecordId& startLoc) { + Direction direction) { invariant(collection); auto ws = std::make_unique<WorkingSet>(); - auto root = _collectionScan(opCtx, ws.get(), collection, direction, startLoc); + auto root = _collectionScan(opCtx, ws.get(), collection, direction); root = std::make_unique<DeleteStage>( opCtx, std::move(params), ws.get(), collection, root.release()); @@ -180,12 +178,10 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::updateWith std::unique_ptr<PlanStage> InternalPlanner::_collectionScan(OperationContext* opCtx, WorkingSet* ws, const Collection* collection, - Direction direction, - const RecordId& startLoc) { + Direction direction) { invariant(collection); CollectionScanParams params; - params.start = startLoc; params.shouldWaitForOplogVisibility = shouldWaitForOplogVisibility(opCtx, collection, false); if (FORWARD == direction) { diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h index cdc91987c82..a846a55b60b 100644 --- a/src/mongo/db/query/internal_plans.h +++ b/src/mongo/db/query/internal_plans.h @@ -73,8 +73,7 @@ public: StringData ns, Collection* collection, PlanExecutor::YieldPolicy yieldPolicy, - const Direction direction = FORWARD, - const RecordId startLoc = RecordId()); + const Direction direction = FORWARD); /** * Returns a FETCH => DELETE plan. @@ -84,8 +83,7 @@ public: Collection* collection, std::unique_ptr<DeleteStageParams> params, PlanExecutor::YieldPolicy yieldPolicy, - Direction direction = FORWARD, - const RecordId& startLoc = RecordId()); + Direction direction = FORWARD); /** * Returns an index scan. Caller owns returned pointer. @@ -135,8 +133,7 @@ private: static std::unique_ptr<PlanStage> _collectionScan(OperationContext* opCtx, WorkingSet* ws, const Collection* collection, - Direction direction, - const RecordId& startLoc); + Direction direction); /** * Returns a plan stage that is either an index scan or an index scan with a fetch stage. diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp index ef4ef25bcac..eb837ff2121 100644 --- a/src/mongo/db/query/planner_access.cpp +++ b/src/mongo/db/query/planner_access.cpp @@ -143,6 +143,72 @@ namespace mongo { using std::unique_ptr; using std::vector; +namespace { +/** + * Extracts the lower and upper bounds on the "ts" field from 'me'. This only examines comparisons + * of "ts" against a Timestamp at the top level or inside a top-level $and. + */ +std::pair<boost::optional<Timestamp>, boost::optional<Timestamp>> extractTsRange( + const MatchExpression* me, bool topLevel = true) { + boost::optional<Timestamp> min; + boost::optional<Timestamp> max; + + if (me->matchType() == MatchExpression::AND && topLevel) { + for (size_t i = 0; i < me->numChildren(); ++i) { + boost::optional<Timestamp> childMin; + boost::optional<Timestamp> childMax; + std::tie(childMin, childMax) = extractTsRange(me->getChild(i), false); + if (childMin && (!min || childMin.get() > min.get())) { + min = childMin; + } + if (childMax && (!max || childMax.get() < max.get())) { + max = childMax; + } + } + return {min, max}; + } + + if (!ComparisonMatchExpression::isComparisonMatchExpression(me) || + me->path() != repl::OpTime::kTimestampFieldName) { + return {min, max}; + } + + auto rawElem = static_cast<const ComparisonMatchExpression*>(me)->getData(); + if (rawElem.type() != BSONType::bsonTimestamp) { + return {min, max}; + } + + switch (me->matchType()) { + case MatchExpression::EQ: + min = rawElem.timestamp(); + max = rawElem.timestamp(); + return {min, max}; + case MatchExpression::GT: + case MatchExpression::GTE: + min = rawElem.timestamp(); + return {min, max}; + case MatchExpression::LT: + case MatchExpression::LTE: + max = rawElem.timestamp(); + return {min, max}; + default: + MONGO_UNREACHABLE; + } +} + +/** + * Returns true if 'me' is a GTE or GE predicate over the "ts" field. + */ +bool isOplogTsLowerBoundPred(const mongo::MatchExpression* me) { + if (mongo::MatchExpression::GT != me->matchType() && + mongo::MatchExpression::GTE != me->matchType()) { + return false; + } + + return me->path() == repl::OpTime::kTimestampFieldName; +} +} // namespace + std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( const CanonicalQuery& query, bool tailable, const QueryPlannerParams& params) { // Make the (only) node, a collection scan. @@ -174,6 +240,20 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( } } + if (query.nss().isOplog() && csn->direction == 1) { + // Optimizes the start and end location parameters for a collection scan for an oplog + // collection. + std::tie(csn->minTs, csn->maxTs) = extractTsRange(query.root()); + + // If the query is just a lower bound on "ts" on a forward scan, every document in the + // collection after the first matching one must also match. To avoid wasting time + // running the match expression on every document to be returned, we tell the + // CollectionScan stage to stop applying the filter once it finds the first match. + if (isOplogTsLowerBoundPred(query.root())) { + csn->stopApplyingFilterAfterFirstMatch = true; + } + } + return std::move(csn); } diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h index 16098b0146b..cba57fd1263 100644 --- a/src/mongo/db/query/query_solution.h +++ b/src/mongo/db/query/query_solution.h @@ -319,6 +319,15 @@ struct CollectionScanNode : public QuerySolutionNode { // Name of the namespace. std::string name; + // If present, the collection scan will seek directly to the RecordId of an oplog entry as + // close to 'minTs' as possible without going higher. Should only be set on forward oplog scans. + boost::optional<Timestamp> minTs; + + // If present the collection scan will stop and return EOF the first time it sees a document + // that does not pass the filter and has 'ts' greater than 'maxTs'. Should only be set on + // forward oplog scans. + boost::optional<Timestamp> maxTs; + // Should we make a tailable cursor? bool tailable; @@ -327,10 +336,13 @@ struct CollectionScanNode : public QuerySolutionNode { // across a sharded cluster. bool shouldTrackLatestOplogTimestamp = false; - int direction; + int direction{1}; // Whether or not to wait for oplog visibility on oplog collection scans. bool shouldWaitForOplogVisibility = false; + + // Once the first matching document is found, assume that all documents after it must match. + bool stopApplyingFilterAfterFirstMatch = false; }; struct AndHashNode : public QuerySolutionNode { diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index 8ab1ca42cfb..2e73e2509ef 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -60,6 +60,7 @@ #include "mongo/db/index/fts_access_method.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/storage/oplog_hack.h" #include "mongo/util/log.h" namespace mongo { @@ -81,6 +82,9 @@ PlanStage* buildStages(OperationContext* opCtx, params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD : CollectionScanParams::BACKWARD; params.shouldWaitForOplogVisibility = csn->shouldWaitForOplogVisibility; + params.minTs = csn->minTs; + params.maxTs = csn->maxTs; + params.stopApplyingFilterAfterFirstMatch = csn->stopApplyingFilterAfterFirstMatch; return new CollectionScan(opCtx, collection, params, ws, csn->filter.get()); } case STAGE_IXSCAN: { diff --git a/src/mongo/db/transaction_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp index cbef1bafc4d..81c8095eeac 100644 --- a/src/mongo/db/transaction_history_iterator.cpp +++ b/src/mongo/db/transaction_history_iterator.cpp @@ -58,7 +58,6 @@ BSONObj findOneOplogEntry(OperationContext* opCtx, auto qr = std::make_unique<QueryRequest>(NamespaceString::kRsOplogNamespace); qr->setFilter(opTime.asQuery()); - qr->setOplogReplay(true); // QueryOption_OplogReplay if (prevOpOnly) { qr->setProj( diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp index 2dca15f78b7..0114a3563d5 100644 --- a/src/mongo/dbtests/querytests.cpp +++ b/src/mongo/dbtests/querytests.cpp @@ -689,7 +689,7 @@ public: class OplogReplayMode : public ClientBase { public: ~OplogReplayMode() { - _client.dropCollection("unittests.querytests.OplogReplayMode"); + _client.dropCollection(ns); } void run() { // Skip the test if the storage engine doesn't support capped collections. @@ -697,11 +697,22 @@ public: return; } - const char* ns = "unittests.querytests.OplogReplayMode"; - // Create a capped collection of size 10. _client.dropCollection(ns); _client.createCollection(ns, 10, true); + // WiredTiger storage engines forbid dropping of the oplog. Evergreen reuses nodes for + // testing, so the oplog may already exist on the test node; in this case, trying to create + // the oplog once again would fail. + // + // To ensure we are working with a clean oplog (an oplog without entries), we resort + // to truncating the oplog instead. + if (getGlobalServiceContext()->getStorageEngine()->supportsRecoveryTimestamp()) { + BSONObj info; + _client.runCommand("local", + BSON("emptycapped" + << "oplog.querytests.OplogReplayMode"), + info); + } insert(ns, BSON("ts" << Timestamp(1000, 0))); insert(ns, BSON("ts" << Timestamp(1000, 1))); @@ -711,8 +722,7 @@ public: QUERY("ts" << GT << Timestamp(1000, 1)).hint(BSON("$natural" << 1)), 0, 0, - nullptr, - QueryOption_OplogReplay); + nullptr); ASSERT(c->more()); ASSERT_EQUALS(2u, c->next()["ts"].timestamp().getInc()); ASSERT(!c->more()); @@ -722,18 +732,20 @@ public: QUERY("ts" << GT << Timestamp(1000, 1)).hint(BSON("$natural" << 1)), 0, 0, - nullptr, - QueryOption_OplogReplay); + nullptr); ASSERT(c->more()); ASSERT_EQUALS(2u, c->next()["ts"].timestamp().getInc()); ASSERT(c->more()); } + +private: + const char* ns = "local.oplog.querytests.OplogReplayMode"; }; class OplogReplayExplain : public ClientBase { public: ~OplogReplayExplain() { - _client.dropCollection("unittests.querytests.OplogReplayExplain"); + _client.dropCollection(string(ns)); } void run() { // Skip the test if the storage engine doesn't support capped collections. @@ -741,11 +753,22 @@ public: return; } - const char* ns = "unittests.querytests.OplogReplayExplain"; - // Create a capped collection of size 10. _client.dropCollection(ns); _client.createCollection(ns, 10, true); + // WiredTiger storage engines forbid dropping of the oplog. Evergreen reuses nodes for + // testing, so the oplog may already exist on the test node; in this case, trying to create + // the oplog once again would fail. + // + // To ensure we are working with a clean oplog (an oplog without entries), we resort + // to truncating the oplog instead. + if (getGlobalServiceContext()->getStorageEngine()->supportsRecoveryTimestamp()) { + BSONObj info; + _client.runCommand("local", + BSON("emptycapped" + << "oplog.querytests.OplogReplayExplain"), + info); + } insert(ns, BSON("ts" << Timestamp(1000, 0))); insert(ns, BSON("ts" << Timestamp(1000, 1))); @@ -755,8 +778,7 @@ public: QUERY("ts" << GT << Timestamp(1000, 1)).hint(BSON("$natural" << 1)).explain(), 0, 0, - nullptr, - QueryOption_OplogReplay); + nullptr); ASSERT(c->more()); // Check number of results and filterSet flag in explain. @@ -768,6 +790,9 @@ public: ASSERT(!c->more()); } + +private: + const char* ns = "local.oplog.querytests.OplogReplayExplain"; }; class BasicCount : public ClientBase { @@ -1498,7 +1523,7 @@ class FindingStart : public CollectionBase { public: FindingStart() : CollectionBase("findingstart") {} static const char* ns() { - return "local.querytests.findingstart"; + return "local.oplog.querytests.findingstart"; } void run() { @@ -1509,16 +1534,28 @@ public: BSONObj info; // Must use local db so that the collection is not replicated, to allow autoIndexId:false. - ASSERT(_client.runCommand("local", - BSON("create" - << "querytests.findingstart" - << "capped" - << true - << "size" - << 4096 - << "autoIndexId" - << false), - info)); + _client.runCommand("local", + BSON("create" + << "oplog.querytests.findingstart" + << "capped" + << true + << "size" + << 4096 + << "autoIndexId" + << false), + info); + // WiredTiger storage engines forbid dropping of the oplog. Evergreen reuses nodes for + // testing, so the oplog may already exist on the test node; in this case, trying to create + // the oplog once again would fail. + // + // To ensure we are working with a clean oplog (an oplog without entries), we resort + // to truncating the oplog instead. + if (getGlobalServiceContext()->getStorageEngine()->supportsRecoveryTimestamp()) { + _client.runCommand("local", + BSON("emptycapped" + << "oplog.querytests.findingstart"), + info); + } unsigned i = 0; int max = 1; @@ -1542,20 +1579,15 @@ public: .timestamp() .getInc(); for (unsigned j = -1; j < i; ++j) { - unique_ptr<DBClientCursor> c = - _client.query(NamespaceString(ns()), - QUERY("ts" << GTE << Timestamp(1000, j)), - 0, - 0, - nullptr, - QueryOption_OplogReplay); + unique_ptr<DBClientCursor> c = _client.query( + NamespaceString(ns()), QUERY("ts" << GTE << Timestamp(1000, j)), 0, 0, nullptr); ASSERT(c->more()); BSONObj next = c->next(); ASSERT(!next["ts"].eoo()); ASSERT_EQUALS((j > min ? j : min), next["ts"].timestamp().getInc()); } } - ASSERT(_client.dropCollection(ns())); + _client.dropCollection(ns()); } }; @@ -1563,7 +1595,7 @@ class FindingStartPartiallyFull : public CollectionBase { public: FindingStartPartiallyFull() : CollectionBase("findingstart") {} static const char* ns() { - return "local.querytests.findingstart"; + return "local.oplog.querytests.findingstart"; } void run() { @@ -1576,16 +1608,28 @@ public: BSONObj info; // Must use local db so that the collection is not replicated, to allow autoIndexId:false. - ASSERT(_client.runCommand("local", - BSON("create" - << "querytests.findingstart" - << "capped" - << true - << "size" - << 4096 - << "autoIndexId" - << false), - info)); + _client.runCommand("local", + BSON("create" + << "oplog.querytests.findingstart" + << "capped" + << true + << "size" + << 4096 + << "autoIndexId" + << false), + info); + // WiredTiger storage engines forbid dropping of the oplog. Evergreen reuses nodes for + // testing, so the oplog may already exist on the test node; in this case, trying to create + // the oplog once again would fail. + // + // To ensure we are working with a clean oplog (an oplog without entries), we resort + // to truncating the oplog instead. + if (getGlobalServiceContext()->getStorageEngine()->supportsRecoveryTimestamp()) { + _client.runCommand("local", + BSON("emptycapped" + << "oplog.querytests.findingstart"), + info); + } unsigned i = 0; for (; i < 150; _client.insert(ns(), BSON("ts" << Timestamp(1000, i++)))) @@ -1599,13 +1643,8 @@ public: .timestamp() .getInc(); for (unsigned j = -1; j < i; ++j) { - unique_ptr<DBClientCursor> c = - _client.query(NamespaceString(ns()), - QUERY("ts" << GTE << Timestamp(1000, j)), - 0, - 0, - nullptr, - QueryOption_OplogReplay); + unique_ptr<DBClientCursor> c = _client.query( + NamespaceString(ns()), QUERY("ts" << GTE << Timestamp(1000, j)), 0, 0, nullptr); ASSERT(c->more()); BSONObj next = c->next(); ASSERT(!next["ts"].eoo()); @@ -1614,19 +1653,19 @@ public: } ASSERT_EQUALS(startNumCursors, numCursorsOpen()); - ASSERT(_client.dropCollection(ns())); + _client.dropCollection(ns()); } }; /** - * Check OplogReplay mode where query timestamp is earlier than the earliest + * Check oplog replay mode where query timestamp is earlier than the earliest * entry in the collection. */ class FindingStartStale : public CollectionBase { public: FindingStartStale() : CollectionBase("findingstart") {} static const char* ns() { - return "local.querytests.findingstart"; + return "local.oplog.querytests.findingstart"; } void run() { @@ -1637,53 +1676,57 @@ public: size_t startNumCursors = numCursorsOpen(); - // Check OplogReplay mode with missing collection. - unique_ptr<DBClientCursor> c0 = _client.query(NamespaceString(ns()), - QUERY("ts" << GTE << Timestamp(1000, 50)), - 0, - 0, - nullptr, - QueryOption_OplogReplay); + // Check oplog replay mode with missing collection. + unique_ptr<DBClientCursor> c0 = + _client.query(NamespaceString("local.oplog.querytests.missing"), + QUERY("ts" << GTE << Timestamp(1000, 50)), + 0, + 0, + nullptr); ASSERT(!c0->more()); BSONObj info; // Must use local db so that the collection is not replicated, to allow autoIndexId:false. - ASSERT(_client.runCommand("local", - BSON("create" - << "querytests.findingstart" - << "capped" - << true - << "size" - << 4096 - << "autoIndexId" - << false), - info)); + _client.runCommand("local", + BSON("create" + << "oplog.querytests.findingstart" + << "capped" + << true + << "size" + << 4096 + << "autoIndexId" + << false), + info); + // WiredTiger storage engines forbid dropping of the oplog. Evergreen reuses nodes for + // testing, so the oplog may already exist on the test node; in this case, trying to create + // the oplog once again would fail. + // + // To ensure we are working with a clean oplog (an oplog without entries), we resort + // to truncating the oplog instead. + if (getGlobalServiceContext()->getStorageEngine()->supportsRecoveryTimestamp()) { + _client.runCommand("local", + BSON("emptycapped" + << "oplog.querytests.findingstart"), + info); + } - // Check OplogReplay mode with empty collection. - unique_ptr<DBClientCursor> c = _client.query(NamespaceString(ns()), - QUERY("ts" << GTE << Timestamp(1000, 50)), - 0, - 0, - nullptr, - QueryOption_OplogReplay); + // Check oplog replay mode with empty collection. + unique_ptr<DBClientCursor> c = _client.query( + NamespaceString(ns()), QUERY("ts" << GTE << Timestamp(1000, 50)), 0, 0, nullptr); ASSERT(!c->more()); // Check with some docs in the collection. for (int i = 100; i < 150; _client.insert(ns(), BSON("ts" << Timestamp(1000, i++)))) ; - c = _client.query(NamespaceString(ns()), - QUERY("ts" << GTE << Timestamp(1000, 50)), - 0, - 0, - nullptr, - QueryOption_OplogReplay); + c = _client.query( + NamespaceString(ns()), QUERY("ts" << GTE << Timestamp(1000, 50)), 0, 0, nullptr); ASSERT(c->more()); ASSERT_EQUALS(100u, c->next()["ts"].timestamp().getInc()); // Check that no persistent cursors outlast our queries above. ASSERT_EQUALS(startNumCursors, numCursorsOpen()); - ASSERT(_client.dropCollection(ns())); + _client.dropCollection(ns()); } }; @@ -1761,8 +1804,7 @@ public: 0, 0, nullptr, - QueryOption_OplogReplay | QueryOption_CursorTailable | - QueryOption_Exhaust, + QueryOption_CursorTailable | QueryOption_Exhaust, message); DbMessage dbMessage(message); QueryMessage queryMessage(dbMessage); diff --git a/src/mongo/shell/query.js b/src/mongo/shell/query.js index def25f8f65a..7bd167b5437 100644 --- a/src/mongo/shell/query.js +++ b/src/mongo/shell/query.js @@ -238,10 +238,6 @@ DBQuery.prototype._convertToCommand = function(canAttachReadPref) { cmd["tailable"] = true; } - if ((this._options & DBQuery.Option.oplogReplay) != 0) { - cmd["oplogReplay"] = true; - } - if ((this._options & DBQuery.Option.noTimeout) != 0) { cmd["noCursorTimeout"] = true; } @@ -598,19 +594,6 @@ DBQuery.prototype.noCursorTimeout = function() { }; /** -* Internal replication use only - driver should not set -* -* @method -* @see http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#op-query -* @return {DBQuery} -*/ -DBQuery.prototype.oplogReplay = function() { - this._checkModify(); - this.addOption(DBQuery.Option.oplogReplay); - return this; -}; - -/** * Limits the fields to return for all matching documents. * * @method @@ -694,7 +677,9 @@ DBQuery.shellBatchSize = 20; DBQuery.Option = { tailable: 0x2, slaveOk: 0x4, - oplogReplay: 0x8, + // 0x8 is reserved for oplogReplay, but not explicitly defined. This is because the flag no + // longer has any meaning to the server, and will be ignored, so there is no reason for it to + // be set by clients. noTimeout: 0x10, awaitData: 0x20, exhaust: 0x40, |