summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/exec/collection_scan.cpp29
-rw-r--r--src/mongo/db/exec/collection_scan_common.h8
-rw-r--r--src/mongo/db/exec/plan_stats.h9
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp4
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp12
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h1
-rw-r--r--src/mongo/db/query/explain.cpp3
-rw-r--r--src/mongo/db/query/get_executor.cpp122
-rw-r--r--src/mongo/db/query/internal_plans.cpp14
-rw-r--r--src/mongo/db/query/internal_plans.h9
-rw-r--r--src/mongo/db/query/planner_access.cpp80
-rw-r--r--src/mongo/db/query/query_solution.h14
-rw-r--r--src/mongo/db/query/stage_builder.cpp4
-rw-r--r--src/mongo/db/transaction_history_iterator.cpp1
-rw-r--r--src/mongo/dbtests/querytests.cpp212
-rw-r--r--src/mongo/shell/query.js21
16 files changed, 277 insertions, 266 deletions
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index e06493b39eb..dcd07553bc8 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/storage/oplog_hack.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -66,12 +67,20 @@ CollectionScan::CollectionScan(OperationContext* opCtx,
_params(params) {
// Explain reports the direction of the collection scan.
_specificStats.direction = params.direction;
+ _specificStats.minTs = params.minTs;
_specificStats.maxTs = params.maxTs;
_specificStats.tailable = params.tailable;
+ if (params.minTs || params.maxTs) {
+ // The 'minTs' and 'maxTs' parameters are used for a special optimization that
+ // applies only to forwards scans of the oplog.
+ invariant(params.direction == CollectionScanParams::FORWARD);
+ invariant(collection->ns().isOplog());
+ }
invariant(!_params.shouldTrackLatestOplogTimestamp || collection->ns().isOplog());
+ // Set early stop condition.
if (params.maxTs) {
- _endConditionBSON = BSON("$gte" << *(params.maxTs));
+ _endConditionBSON = BSON("$gte"_sd << *(params.maxTs));
_endCondition = std::make_unique<GTEMatchExpression>(repl::OpTime::kTimestampFieldName,
_endConditionBSON.firstElement());
}
@@ -129,9 +138,20 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
return PlanStage::NEED_TIME;
}
- if (_lastSeenId.isNull() && !_params.start.isNull()) {
- record = _cursor->seekExact(_params.start);
- } else {
+ if (_lastSeenId.isNull() && _params.minTs) {
+ // See if the RecordStore supports the oplogStartHack.
+ StatusWith<RecordId> goal = oploghack::keyForOptime(*_params.minTs);
+ if (goal.isOK()) {
+ boost::optional<RecordId> startLoc =
+ collection()->getRecordStore()->oplogStartHack(getOpCtx(), goal.getValue());
+ if (startLoc && !startLoc->isNull()) {
+ LOG(3) << "Using direct oplog seek";
+ record = _cursor->seekExact(*startLoc);
+ }
+ }
+ }
+
+ if (!record) {
record = _cursor->next();
}
} catch (const WriteConflictException&) {
@@ -190,7 +210,6 @@ PlanStage::StageState CollectionScan::returnIfMatches(WorkingSetMember* member,
WorkingSetID memberID,
WorkingSetID* out) {
++_specificStats.docsTested;
-
if (Filter::passes(member, _filter)) {
if (_params.stopApplyingFilterAfterFirstMatch) {
_filter = nullptr;
diff --git a/src/mongo/db/exec/collection_scan_common.h b/src/mongo/db/exec/collection_scan_common.h
index b655872f6fa..55307ba70c8 100644
--- a/src/mongo/db/exec/collection_scan_common.h
+++ b/src/mongo/db/exec/collection_scan_common.h
@@ -42,11 +42,13 @@ struct CollectionScanParams {
BACKWARD = -1,
};
- // The RecordId to which we should seek to as the first document of the scan.
- RecordId start;
+ // If present, the collection scan will seek directly to the RecordId of an oplog entry as
+ // close to 'minTs' as possible without going higher. Must only be set on forward oplog scans.
+ boost::optional<Timestamp> minTs;
// If present, the collection scan will stop and return EOF the first time it sees a document
- // that does not pass the filter and has 'ts' greater than 'maxTs'.
+ // that does not pass the filter and has 'ts' greater than 'maxTs'. Must only be set on forward
+ // oplog scans.
boost::optional<Timestamp> maxTs;
Direction direction = FORWARD;
diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h
index 08d623ffc44..938a633e342 100644
--- a/src/mongo/db/exec/plan_stats.h
+++ b/src/mongo/db/exec/plan_stats.h
@@ -200,9 +200,12 @@ struct CollectionScanStats : public SpecificStats {
bool tailable{false};
- // If present, indicates that the collection scan will stop and return EOF the first time it
- // sees a document that does not pass the filter and has a "ts" Timestamp field greater than
- // 'maxTs'.
+ // The start location of the scan. Must only be set on forward oplog scans.
+ boost::optional<Timestamp> minTs;
+
+ // Indicates that the collection scan will stop and return EOF the first time it sees a
+ // document that does not pass the filter and has a "ts" Timestamp field greater than 'maxTs'.
+ // Must only be set on forward oplog scans.
boost::optional<Timestamp> maxTs;
};
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 5f9ca9ef2ea..2da744ae88e 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -381,6 +381,10 @@ DepsTracker::State DocumentSourceChangeStreamTransform::getDependencies(DepsTrac
deps->fields.insert(repl::OplogEntry::kUuidFieldName.toString());
deps->fields.insert(repl::OplogEntry::kObjectFieldName.toString());
deps->fields.insert(repl::OplogEntry::kObject2FieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kSessionIdFieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kTermFieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kTxnNumberFieldName.toString());
return DepsTracker::State::EXHAUSTIVE_ALL;
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index d6e3557bcd9..f133a67c9bc 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -168,7 +168,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
Collection* collection,
const NamespaceString& nss,
const intrusive_ptr<ExpressionContext>& pExpCtx,
- bool oplogReplay,
BSONObj queryObj,
BSONObj projectionObj,
BSONObj sortObj,
@@ -178,7 +177,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
const MatchExpressionParser::AllowedFeatureSet& matcherFeatures) {
auto qr = std::make_unique<QueryRequest>(nss);
qr->setTailableMode(pExpCtx->tailableMode);
- qr->setOplogReplay(oplogReplay);
qr->setFilter(queryObj);
qr->setProj(projectionObj);
qr->setSort(sortObj);
@@ -442,12 +440,10 @@ PipelineD::buildInnerQueryExecutorGeneric(Collection* collection,
// Look for an initial match. This works whether we got an initial query or not. If not, it
// results in a "{}" query, which will be what we want in that case.
- bool oplogReplay = false;
const BSONObj queryObj = pipeline->getInitialQuery();
if (!queryObj.isEmpty()) {
auto matchStage = dynamic_cast<DocumentSourceMatch*>(sources.front().get());
if (matchStage) {
- oplogReplay = dynamic_cast<DocumentSourceOplogMatch*>(matchStage) != nullptr;
// If a $match query is pulled into the cursor, the $match is redundant, and can be
// removed from the pipeline.
sources.pop_front();
@@ -487,7 +483,6 @@ PipelineD::buildInnerQueryExecutorGeneric(Collection* collection,
nss,
pipeline,
expCtx,
- oplogReplay,
sortStage,
std::move(rewrittenGroupStage),
deps,
@@ -558,7 +553,6 @@ PipelineD::buildInnerQueryExecutorGeoNear(Collection* collection,
nss,
pipeline,
expCtx,
- false, /* oplogReplay */
nullptr, /* sortStage */
nullptr, /* rewrittenGroupStage */
deps,
@@ -600,7 +594,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
const NamespaceString& nss,
Pipeline* pipeline,
const intrusive_ptr<ExpressionContext>& expCtx,
- bool oplogReplay,
const boost::intrusive_ptr<DocumentSourceSort>& sortStage,
std::unique_ptr<GroupFromFirstDocumentTransformation> rewrittenGroupStage,
const DepsTracker& deps,
@@ -652,7 +645,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
collection,
nss,
expCtx,
- oplogReplay,
queryObj,
*projectionObj,
sortObj ? *sortObj : emptySort,
@@ -708,7 +700,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
collection,
nss,
expCtx,
- oplogReplay,
queryObj,
expCtx->needsMerge ? metaSortProjection : emptyProjection,
*sortObj,
@@ -724,7 +715,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
collection,
nss,
expCtx,
- oplogReplay,
queryObj,
*projectionObj,
*sortObj,
@@ -787,7 +777,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
collection,
nss,
expCtx,
- oplogReplay,
queryObj,
*projectionObj,
*sortObj,
@@ -812,7 +801,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
collection,
nss,
expCtx,
- oplogReplay,
queryObj,
*projectionObj,
*sortObj,
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index a4db94f74b4..9b8f618c12f 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -166,7 +166,6 @@ private:
const NamespaceString& nss,
Pipeline* pipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- bool oplogReplay,
const boost::intrusive_ptr<DocumentSourceSort>& sortStage,
std::unique_ptr<GroupFromFirstDocumentTransformation> rewrittenGroupStage,
const DepsTracker& deps,
diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp
index eb109928be4..be8a1c95cdd 100644
--- a/src/mongo/db/query/explain.cpp
+++ b/src/mongo/db/query/explain.cpp
@@ -361,6 +361,9 @@ void Explain::statsToBSON(const PlanStageStats& stats,
} else if (STAGE_COLLSCAN == stats.stageType) {
CollectionScanStats* spec = static_cast<CollectionScanStats*>(stats.specific.get());
bob->append("direction", spec->direction > 0 ? "forward" : "backward");
+ if (spec->minTs) {
+ bob->append("minTs", *(spec->minTs));
+ }
if (spec->maxTs) {
bob->append("maxTs", *(spec->maxTs));
}
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 82ceb978a29..73df3c9876d 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -81,7 +81,6 @@
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
-#include "mongo/db/storage/oplog_hack.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/scripting/engine.h"
#include "mongo/util/log.h"
@@ -622,133 +621,12 @@ bool isOplogTsLowerBoundPred(const mongo::MatchExpression* me) {
return me->path() == repl::OpTime::kTimestampFieldName;
}
-/**
- * Extracts the lower and upper bounds on the "ts" field from 'me'. This only examines comparisons
- * of "ts" against a Timestamp at the top level or inside a top-level $and.
- */
-std::pair<boost::optional<Timestamp>, boost::optional<Timestamp>> extractTsRange(
- const MatchExpression* me, bool topLevel = true) {
- boost::optional<Timestamp> min;
- boost::optional<Timestamp> max;
-
- if (me->matchType() == MatchExpression::AND && topLevel) {
- for (size_t i = 0; i < me->numChildren(); ++i) {
- boost::optional<Timestamp> childMin;
- boost::optional<Timestamp> childMax;
- std::tie(childMin, childMax) = extractTsRange(me->getChild(i), false);
- if (childMin && (!min || childMin.get() > min.get())) {
- min = childMin;
- }
- if (childMax && (!max || childMax.get() < max.get())) {
- max = childMax;
- }
- }
- return {min, max};
- }
-
- if (!ComparisonMatchExpression::isComparisonMatchExpression(me) ||
- me->path() != repl::OpTime::kTimestampFieldName) {
- return {min, max};
- }
-
- auto rawElem = static_cast<const ComparisonMatchExpression*>(me)->getData();
- if (rawElem.type() != BSONType::bsonTimestamp) {
- return {min, max};
- }
-
- switch (me->matchType()) {
- case MatchExpression::EQ:
- min = rawElem.timestamp();
- max = rawElem.timestamp();
- return {min, max};
- case MatchExpression::GT:
- case MatchExpression::GTE:
- min = rawElem.timestamp();
- return {min, max};
- case MatchExpression::LT:
- case MatchExpression::LTE:
- max = rawElem.timestamp();
- return {min, max};
- default:
- MONGO_UNREACHABLE;
- }
-}
-
-StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack(
- OperationContext* opCtx,
- Collection* collection,
- unique_ptr<CanonicalQuery> cq,
- size_t plannerOptions,
- PlanExecutor::YieldPolicy yieldPolicy) {
- invariant(collection);
- invariant(cq.get());
-
- if (!collection->isCapped()) {
- return Status(ErrorCodes::BadValue,
- "OplogReplay cursor requested on non-capped collection");
- }
-
- // If the canonical query does not have a user-specified collation, set it from the collection
- // default.
- if (cq->getQueryRequest().getCollation().isEmpty() && collection->getDefaultCollator()) {
- cq->setCollator(collection->getDefaultCollator()->clone());
- }
-
- boost::optional<Timestamp> minTs, maxTs;
- std::tie(minTs, maxTs) = extractTsRange(cq->root());
-
- if (!minTs) {
- return Status(ErrorCodes::OplogOperationUnsupported,
- "OplogReplay query does not contain top-level "
- "$eq, $gt, or $gte over the 'ts' field.");
- }
-
- boost::optional<RecordId> startLoc = boost::none;
-
- // See if the RecordStore supports the oplogStartHack.
- StatusWith<RecordId> goal = oploghack::keyForOptime(*minTs);
- if (goal.isOK()) {
- startLoc = collection->getRecordStore()->oplogStartHack(opCtx, goal.getValue());
- }
-
- // Build our collection scan.
- CollectionScanParams params;
- if (startLoc) {
- LOG(3) << "Using direct oplog seek";
- params.start = *startLoc;
- }
- params.maxTs = maxTs;
- params.direction = CollectionScanParams::FORWARD;
- params.tailable = cq->getQueryRequest().isTailable();
- params.shouldTrackLatestOplogTimestamp =
- plannerOptions & QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
- params.shouldWaitForOplogVisibility =
- shouldWaitForOplogVisibility(opCtx, collection, params.tailable);
-
- // If the query is just a lower bound on "ts", we know that every document in the collection
- // after the first matching one must also match. To avoid wasting time running the match
- // expression on every document to be returned, we tell the CollectionScan stage to stop
- // applying the filter once it finds the first match.
- if (isOplogTsLowerBoundPred(cq->root())) {
- params.stopApplyingFilterAfterFirstMatch = true;
- }
-
- auto ws = std::make_unique<WorkingSet>();
- auto cs = std::make_unique<CollectionScan>(opCtx, collection, params, ws.get(), cq->root());
- return PlanExecutor::make(
- opCtx, std::move(ws), std::move(cs), std::move(cq), collection, yieldPolicy);
-}
-
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind(
OperationContext* opCtx,
Collection* collection,
unique_ptr<CanonicalQuery> canonicalQuery,
PlanExecutor::YieldPolicy yieldPolicy,
size_t plannerOptions) {
- if (nullptr != collection && canonicalQuery->getQueryRequest().isOplogReplay()) {
- return getOplogStartHack(
- opCtx, collection, std::move(canonicalQuery), plannerOptions, yieldPolicy);
- }
if (OperationShardingState::isOperationVersioned(opCtx)) {
plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp
index 2fa5359bd48..594c23ada38 100644
--- a/src/mongo/db/query/internal_plans.cpp
+++ b/src/mongo/db/query/internal_plans.cpp
@@ -51,8 +51,7 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection
StringData ns,
Collection* collection,
PlanExecutor::YieldPolicy yieldPolicy,
- const Direction direction,
- const RecordId startLoc) {
+ const Direction direction) {
std::unique_ptr<WorkingSet> ws = std::make_unique<WorkingSet>();
if (nullptr == collection) {
@@ -66,7 +65,7 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection
invariant(ns == collection->ns().ns());
- auto cs = _collectionScan(opCtx, ws.get(), collection, direction, startLoc);
+ auto cs = _collectionScan(opCtx, ws.get(), collection, direction);
// Takes ownership of 'ws' and 'cs'.
auto statusWithPlanExecutor =
@@ -80,12 +79,11 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith
Collection* collection,
std::unique_ptr<DeleteStageParams> params,
PlanExecutor::YieldPolicy yieldPolicy,
- Direction direction,
- const RecordId& startLoc) {
+ Direction direction) {
invariant(collection);
auto ws = std::make_unique<WorkingSet>();
- auto root = _collectionScan(opCtx, ws.get(), collection, direction, startLoc);
+ auto root = _collectionScan(opCtx, ws.get(), collection, direction);
root = std::make_unique<DeleteStage>(
opCtx, std::move(params), ws.get(), collection, root.release());
@@ -180,12 +178,10 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::updateWith
std::unique_ptr<PlanStage> InternalPlanner::_collectionScan(OperationContext* opCtx,
WorkingSet* ws,
const Collection* collection,
- Direction direction,
- const RecordId& startLoc) {
+ Direction direction) {
invariant(collection);
CollectionScanParams params;
- params.start = startLoc;
params.shouldWaitForOplogVisibility = shouldWaitForOplogVisibility(opCtx, collection, false);
if (FORWARD == direction) {
diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h
index cdc91987c82..a846a55b60b 100644
--- a/src/mongo/db/query/internal_plans.h
+++ b/src/mongo/db/query/internal_plans.h
@@ -73,8 +73,7 @@ public:
StringData ns,
Collection* collection,
PlanExecutor::YieldPolicy yieldPolicy,
- const Direction direction = FORWARD,
- const RecordId startLoc = RecordId());
+ const Direction direction = FORWARD);
/**
* Returns a FETCH => DELETE plan.
@@ -84,8 +83,7 @@ public:
Collection* collection,
std::unique_ptr<DeleteStageParams> params,
PlanExecutor::YieldPolicy yieldPolicy,
- Direction direction = FORWARD,
- const RecordId& startLoc = RecordId());
+ Direction direction = FORWARD);
/**
* Returns an index scan. Caller owns returned pointer.
@@ -135,8 +133,7 @@ private:
static std::unique_ptr<PlanStage> _collectionScan(OperationContext* opCtx,
WorkingSet* ws,
const Collection* collection,
- Direction direction,
- const RecordId& startLoc);
+ Direction direction);
/**
* Returns a plan stage that is either an index scan or an index scan with a fetch stage.
diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp
index ef4ef25bcac..eb837ff2121 100644
--- a/src/mongo/db/query/planner_access.cpp
+++ b/src/mongo/db/query/planner_access.cpp
@@ -143,6 +143,72 @@ namespace mongo {
using std::unique_ptr;
using std::vector;
+namespace {
+/**
+ * Extracts the lower and upper bounds on the "ts" field from 'me'. This only examines comparisons
+ * of "ts" against a Timestamp at the top level or inside a top-level $and.
+ */
+std::pair<boost::optional<Timestamp>, boost::optional<Timestamp>> extractTsRange(
+ const MatchExpression* me, bool topLevel = true) {
+ boost::optional<Timestamp> min;
+ boost::optional<Timestamp> max;
+
+ if (me->matchType() == MatchExpression::AND && topLevel) {
+ for (size_t i = 0; i < me->numChildren(); ++i) {
+ boost::optional<Timestamp> childMin;
+ boost::optional<Timestamp> childMax;
+ std::tie(childMin, childMax) = extractTsRange(me->getChild(i), false);
+ if (childMin && (!min || childMin.get() > min.get())) {
+ min = childMin;
+ }
+ if (childMax && (!max || childMax.get() < max.get())) {
+ max = childMax;
+ }
+ }
+ return {min, max};
+ }
+
+ if (!ComparisonMatchExpression::isComparisonMatchExpression(me) ||
+ me->path() != repl::OpTime::kTimestampFieldName) {
+ return {min, max};
+ }
+
+ auto rawElem = static_cast<const ComparisonMatchExpression*>(me)->getData();
+ if (rawElem.type() != BSONType::bsonTimestamp) {
+ return {min, max};
+ }
+
+ switch (me->matchType()) {
+ case MatchExpression::EQ:
+ min = rawElem.timestamp();
+ max = rawElem.timestamp();
+ return {min, max};
+ case MatchExpression::GT:
+ case MatchExpression::GTE:
+ min = rawElem.timestamp();
+ return {min, max};
+ case MatchExpression::LT:
+ case MatchExpression::LTE:
+ max = rawElem.timestamp();
+ return {min, max};
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
+/**
+ * Returns true if 'me' is a GTE or GE predicate over the "ts" field.
+ */
+bool isOplogTsLowerBoundPred(const mongo::MatchExpression* me) {
+ if (mongo::MatchExpression::GT != me->matchType() &&
+ mongo::MatchExpression::GTE != me->matchType()) {
+ return false;
+ }
+
+ return me->path() == repl::OpTime::kTimestampFieldName;
+}
+} // namespace
+
std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
const CanonicalQuery& query, bool tailable, const QueryPlannerParams& params) {
// Make the (only) node, a collection scan.
@@ -174,6 +240,20 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
}
}
+ if (query.nss().isOplog() && csn->direction == 1) {
+ // Optimizes the start and end location parameters for a collection scan for an oplog
+ // collection.
+ std::tie(csn->minTs, csn->maxTs) = extractTsRange(query.root());
+
+ // If the query is just a lower bound on "ts" on a forward scan, every document in the
+ // collection after the first matching one must also match. To avoid wasting time
+ // running the match expression on every document to be returned, we tell the
+ // CollectionScan stage to stop applying the filter once it finds the first match.
+ if (isOplogTsLowerBoundPred(query.root())) {
+ csn->stopApplyingFilterAfterFirstMatch = true;
+ }
+ }
+
return std::move(csn);
}
diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h
index 16098b0146b..cba57fd1263 100644
--- a/src/mongo/db/query/query_solution.h
+++ b/src/mongo/db/query/query_solution.h
@@ -319,6 +319,15 @@ struct CollectionScanNode : public QuerySolutionNode {
// Name of the namespace.
std::string name;
+ // If present, the collection scan will seek directly to the RecordId of an oplog entry as
+ // close to 'minTs' as possible without going higher. Should only be set on forward oplog scans.
+ boost::optional<Timestamp> minTs;
+
+ // If present the collection scan will stop and return EOF the first time it sees a document
+ // that does not pass the filter and has 'ts' greater than 'maxTs'. Should only be set on
+ // forward oplog scans.
+ boost::optional<Timestamp> maxTs;
+
// Should we make a tailable cursor?
bool tailable;
@@ -327,10 +336,13 @@ struct CollectionScanNode : public QuerySolutionNode {
// across a sharded cluster.
bool shouldTrackLatestOplogTimestamp = false;
- int direction;
+ int direction{1};
// Whether or not to wait for oplog visibility on oplog collection scans.
bool shouldWaitForOplogVisibility = false;
+
+ // Once the first matching document is found, assume that all documents after it must match.
+ bool stopApplyingFilterAfterFirstMatch = false;
};
struct AndHashNode : public QuerySolutionNode {
diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp
index 8ab1ca42cfb..2e73e2509ef 100644
--- a/src/mongo/db/query/stage_builder.cpp
+++ b/src/mongo/db/query/stage_builder.cpp
@@ -60,6 +60,7 @@
#include "mongo/db/index/fts_access_method.h"
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/storage/oplog_hack.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -81,6 +82,9 @@ PlanStage* buildStages(OperationContext* opCtx,
params.direction = (csn->direction == 1) ? CollectionScanParams::FORWARD
: CollectionScanParams::BACKWARD;
params.shouldWaitForOplogVisibility = csn->shouldWaitForOplogVisibility;
+ params.minTs = csn->minTs;
+ params.maxTs = csn->maxTs;
+ params.stopApplyingFilterAfterFirstMatch = csn->stopApplyingFilterAfterFirstMatch;
return new CollectionScan(opCtx, collection, params, ws, csn->filter.get());
}
case STAGE_IXSCAN: {
diff --git a/src/mongo/db/transaction_history_iterator.cpp b/src/mongo/db/transaction_history_iterator.cpp
index cbef1bafc4d..81c8095eeac 100644
--- a/src/mongo/db/transaction_history_iterator.cpp
+++ b/src/mongo/db/transaction_history_iterator.cpp
@@ -58,7 +58,6 @@ BSONObj findOneOplogEntry(OperationContext* opCtx,
auto qr = std::make_unique<QueryRequest>(NamespaceString::kRsOplogNamespace);
qr->setFilter(opTime.asQuery());
- qr->setOplogReplay(true); // QueryOption_OplogReplay
if (prevOpOnly) {
qr->setProj(
diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp
index 2dca15f78b7..0114a3563d5 100644
--- a/src/mongo/dbtests/querytests.cpp
+++ b/src/mongo/dbtests/querytests.cpp
@@ -689,7 +689,7 @@ public:
class OplogReplayMode : public ClientBase {
public:
~OplogReplayMode() {
- _client.dropCollection("unittests.querytests.OplogReplayMode");
+ _client.dropCollection(ns);
}
void run() {
// Skip the test if the storage engine doesn't support capped collections.
@@ -697,11 +697,22 @@ public:
return;
}
- const char* ns = "unittests.querytests.OplogReplayMode";
-
// Create a capped collection of size 10.
_client.dropCollection(ns);
_client.createCollection(ns, 10, true);
+ // WiredTiger storage engines forbid dropping of the oplog. Evergreen reuses nodes for
+ // testing, so the oplog may already exist on the test node; in this case, trying to create
+ // the oplog once again would fail.
+ //
+ // To ensure we are working with a clean oplog (an oplog without entries), we resort
+ // to truncating the oplog instead.
+ if (getGlobalServiceContext()->getStorageEngine()->supportsRecoveryTimestamp()) {
+ BSONObj info;
+ _client.runCommand("local",
+ BSON("emptycapped"
+ << "oplog.querytests.OplogReplayMode"),
+ info);
+ }
insert(ns, BSON("ts" << Timestamp(1000, 0)));
insert(ns, BSON("ts" << Timestamp(1000, 1)));
@@ -711,8 +722,7 @@ public:
QUERY("ts" << GT << Timestamp(1000, 1)).hint(BSON("$natural" << 1)),
0,
0,
- nullptr,
- QueryOption_OplogReplay);
+ nullptr);
ASSERT(c->more());
ASSERT_EQUALS(2u, c->next()["ts"].timestamp().getInc());
ASSERT(!c->more());
@@ -722,18 +732,20 @@ public:
QUERY("ts" << GT << Timestamp(1000, 1)).hint(BSON("$natural" << 1)),
0,
0,
- nullptr,
- QueryOption_OplogReplay);
+ nullptr);
ASSERT(c->more());
ASSERT_EQUALS(2u, c->next()["ts"].timestamp().getInc());
ASSERT(c->more());
}
+
+private:
+ const char* ns = "local.oplog.querytests.OplogReplayMode";
};
class OplogReplayExplain : public ClientBase {
public:
~OplogReplayExplain() {
- _client.dropCollection("unittests.querytests.OplogReplayExplain");
+ _client.dropCollection(string(ns));
}
void run() {
// Skip the test if the storage engine doesn't support capped collections.
@@ -741,11 +753,22 @@ public:
return;
}
- const char* ns = "unittests.querytests.OplogReplayExplain";
-
// Create a capped collection of size 10.
_client.dropCollection(ns);
_client.createCollection(ns, 10, true);
+ // WiredTiger storage engines forbid dropping of the oplog. Evergreen reuses nodes for
+ // testing, so the oplog may already exist on the test node; in this case, trying to create
+ // the oplog once again would fail.
+ //
+ // To ensure we are working with a clean oplog (an oplog without entries), we resort
+ // to truncating the oplog instead.
+ if (getGlobalServiceContext()->getStorageEngine()->supportsRecoveryTimestamp()) {
+ BSONObj info;
+ _client.runCommand("local",
+ BSON("emptycapped"
+ << "oplog.querytests.OplogReplayExplain"),
+ info);
+ }
insert(ns, BSON("ts" << Timestamp(1000, 0)));
insert(ns, BSON("ts" << Timestamp(1000, 1)));
@@ -755,8 +778,7 @@ public:
QUERY("ts" << GT << Timestamp(1000, 1)).hint(BSON("$natural" << 1)).explain(),
0,
0,
- nullptr,
- QueryOption_OplogReplay);
+ nullptr);
ASSERT(c->more());
// Check number of results and filterSet flag in explain.
@@ -768,6 +790,9 @@ public:
ASSERT(!c->more());
}
+
+private:
+ const char* ns = "local.oplog.querytests.OplogReplayExplain";
};
class BasicCount : public ClientBase {
@@ -1498,7 +1523,7 @@ class FindingStart : public CollectionBase {
public:
FindingStart() : CollectionBase("findingstart") {}
static const char* ns() {
- return "local.querytests.findingstart";
+ return "local.oplog.querytests.findingstart";
}
void run() {
@@ -1509,16 +1534,28 @@ public:
BSONObj info;
// Must use local db so that the collection is not replicated, to allow autoIndexId:false.
- ASSERT(_client.runCommand("local",
- BSON("create"
- << "querytests.findingstart"
- << "capped"
- << true
- << "size"
- << 4096
- << "autoIndexId"
- << false),
- info));
+ _client.runCommand("local",
+ BSON("create"
+ << "oplog.querytests.findingstart"
+ << "capped"
+ << true
+ << "size"
+ << 4096
+ << "autoIndexId"
+ << false),
+ info);
+ // WiredTiger storage engines forbid dropping of the oplog. Evergreen reuses nodes for
+ // testing, so the oplog may already exist on the test node; in this case, trying to create
+ // the oplog once again would fail.
+ //
+ // To ensure we are working with a clean oplog (an oplog without entries), we resort
+ // to truncating the oplog instead.
+ if (getGlobalServiceContext()->getStorageEngine()->supportsRecoveryTimestamp()) {
+ _client.runCommand("local",
+ BSON("emptycapped"
+ << "oplog.querytests.findingstart"),
+ info);
+ }
unsigned i = 0;
int max = 1;
@@ -1542,20 +1579,15 @@ public:
.timestamp()
.getInc();
for (unsigned j = -1; j < i; ++j) {
- unique_ptr<DBClientCursor> c =
- _client.query(NamespaceString(ns()),
- QUERY("ts" << GTE << Timestamp(1000, j)),
- 0,
- 0,
- nullptr,
- QueryOption_OplogReplay);
+ unique_ptr<DBClientCursor> c = _client.query(
+ NamespaceString(ns()), QUERY("ts" << GTE << Timestamp(1000, j)), 0, 0, nullptr);
ASSERT(c->more());
BSONObj next = c->next();
ASSERT(!next["ts"].eoo());
ASSERT_EQUALS((j > min ? j : min), next["ts"].timestamp().getInc());
}
}
- ASSERT(_client.dropCollection(ns()));
+ _client.dropCollection(ns());
}
};
@@ -1563,7 +1595,7 @@ class FindingStartPartiallyFull : public CollectionBase {
public:
FindingStartPartiallyFull() : CollectionBase("findingstart") {}
static const char* ns() {
- return "local.querytests.findingstart";
+ return "local.oplog.querytests.findingstart";
}
void run() {
@@ -1576,16 +1608,28 @@ public:
BSONObj info;
// Must use local db so that the collection is not replicated, to allow autoIndexId:false.
- ASSERT(_client.runCommand("local",
- BSON("create"
- << "querytests.findingstart"
- << "capped"
- << true
- << "size"
- << 4096
- << "autoIndexId"
- << false),
- info));
+ _client.runCommand("local",
+ BSON("create"
+ << "oplog.querytests.findingstart"
+ << "capped"
+ << true
+ << "size"
+ << 4096
+ << "autoIndexId"
+ << false),
+ info);
+ // WiredTiger storage engines forbid dropping of the oplog. Evergreen reuses nodes for
+ // testing, so the oplog may already exist on the test node; in this case, trying to create
+ // the oplog once again would fail.
+ //
+ // To ensure we are working with a clean oplog (an oplog without entries), we resort
+ // to truncating the oplog instead.
+ if (getGlobalServiceContext()->getStorageEngine()->supportsRecoveryTimestamp()) {
+ _client.runCommand("local",
+ BSON("emptycapped"
+ << "oplog.querytests.findingstart"),
+ info);
+ }
unsigned i = 0;
for (; i < 150; _client.insert(ns(), BSON("ts" << Timestamp(1000, i++))))
@@ -1599,13 +1643,8 @@ public:
.timestamp()
.getInc();
for (unsigned j = -1; j < i; ++j) {
- unique_ptr<DBClientCursor> c =
- _client.query(NamespaceString(ns()),
- QUERY("ts" << GTE << Timestamp(1000, j)),
- 0,
- 0,
- nullptr,
- QueryOption_OplogReplay);
+ unique_ptr<DBClientCursor> c = _client.query(
+ NamespaceString(ns()), QUERY("ts" << GTE << Timestamp(1000, j)), 0, 0, nullptr);
ASSERT(c->more());
BSONObj next = c->next();
ASSERT(!next["ts"].eoo());
@@ -1614,19 +1653,19 @@ public:
}
ASSERT_EQUALS(startNumCursors, numCursorsOpen());
- ASSERT(_client.dropCollection(ns()));
+ _client.dropCollection(ns());
}
};
/**
- * Check OplogReplay mode where query timestamp is earlier than the earliest
+ * Check oplog replay mode where query timestamp is earlier than the earliest
* entry in the collection.
*/
class FindingStartStale : public CollectionBase {
public:
FindingStartStale() : CollectionBase("findingstart") {}
static const char* ns() {
- return "local.querytests.findingstart";
+ return "local.oplog.querytests.findingstart";
}
void run() {
@@ -1637,53 +1676,57 @@ public:
size_t startNumCursors = numCursorsOpen();
- // Check OplogReplay mode with missing collection.
- unique_ptr<DBClientCursor> c0 = _client.query(NamespaceString(ns()),
- QUERY("ts" << GTE << Timestamp(1000, 50)),
- 0,
- 0,
- nullptr,
- QueryOption_OplogReplay);
+ // Check oplog replay mode with missing collection.
+ unique_ptr<DBClientCursor> c0 =
+ _client.query(NamespaceString("local.oplog.querytests.missing"),
+ QUERY("ts" << GTE << Timestamp(1000, 50)),
+ 0,
+ 0,
+ nullptr);
ASSERT(!c0->more());
BSONObj info;
// Must use local db so that the collection is not replicated, to allow autoIndexId:false.
- ASSERT(_client.runCommand("local",
- BSON("create"
- << "querytests.findingstart"
- << "capped"
- << true
- << "size"
- << 4096
- << "autoIndexId"
- << false),
- info));
+ _client.runCommand("local",
+ BSON("create"
+ << "oplog.querytests.findingstart"
+ << "capped"
+ << true
+ << "size"
+ << 4096
+ << "autoIndexId"
+ << false),
+ info);
+ // WiredTiger storage engines forbid dropping of the oplog. Evergreen reuses nodes for
+ // testing, so the oplog may already exist on the test node; in this case, trying to create
+ // the oplog once again would fail.
+ //
+ // To ensure we are working with a clean oplog (an oplog without entries), we resort
+ // to truncating the oplog instead.
+ if (getGlobalServiceContext()->getStorageEngine()->supportsRecoveryTimestamp()) {
+ _client.runCommand("local",
+ BSON("emptycapped"
+ << "oplog.querytests.findingstart"),
+ info);
+ }
- // Check OplogReplay mode with empty collection.
- unique_ptr<DBClientCursor> c = _client.query(NamespaceString(ns()),
- QUERY("ts" << GTE << Timestamp(1000, 50)),
- 0,
- 0,
- nullptr,
- QueryOption_OplogReplay);
+ // Check oplog replay mode with empty collection.
+ unique_ptr<DBClientCursor> c = _client.query(
+ NamespaceString(ns()), QUERY("ts" << GTE << Timestamp(1000, 50)), 0, 0, nullptr);
ASSERT(!c->more());
// Check with some docs in the collection.
for (int i = 100; i < 150; _client.insert(ns(), BSON("ts" << Timestamp(1000, i++))))
;
- c = _client.query(NamespaceString(ns()),
- QUERY("ts" << GTE << Timestamp(1000, 50)),
- 0,
- 0,
- nullptr,
- QueryOption_OplogReplay);
+ c = _client.query(
+ NamespaceString(ns()), QUERY("ts" << GTE << Timestamp(1000, 50)), 0, 0, nullptr);
ASSERT(c->more());
ASSERT_EQUALS(100u, c->next()["ts"].timestamp().getInc());
// Check that no persistent cursors outlast our queries above.
ASSERT_EQUALS(startNumCursors, numCursorsOpen());
- ASSERT(_client.dropCollection(ns()));
+ _client.dropCollection(ns());
}
};
@@ -1761,8 +1804,7 @@ public:
0,
0,
nullptr,
- QueryOption_OplogReplay | QueryOption_CursorTailable |
- QueryOption_Exhaust,
+ QueryOption_CursorTailable | QueryOption_Exhaust,
message);
DbMessage dbMessage(message);
QueryMessage queryMessage(dbMessage);
diff --git a/src/mongo/shell/query.js b/src/mongo/shell/query.js
index def25f8f65a..7bd167b5437 100644
--- a/src/mongo/shell/query.js
+++ b/src/mongo/shell/query.js
@@ -238,10 +238,6 @@ DBQuery.prototype._convertToCommand = function(canAttachReadPref) {
cmd["tailable"] = true;
}
- if ((this._options & DBQuery.Option.oplogReplay) != 0) {
- cmd["oplogReplay"] = true;
- }
-
if ((this._options & DBQuery.Option.noTimeout) != 0) {
cmd["noCursorTimeout"] = true;
}
@@ -598,19 +594,6 @@ DBQuery.prototype.noCursorTimeout = function() {
};
/**
-* Internal replication use only - driver should not set
-*
-* @method
-* @see http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#op-query
-* @return {DBQuery}
-*/
-DBQuery.prototype.oplogReplay = function() {
- this._checkModify();
- this.addOption(DBQuery.Option.oplogReplay);
- return this;
-};
-
-/**
* Limits the fields to return for all matching documents.
*
* @method
@@ -694,7 +677,9 @@ DBQuery.shellBatchSize = 20;
DBQuery.Option = {
tailable: 0x2,
slaveOk: 0x4,
- oplogReplay: 0x8,
+ // 0x8 is reserved for oplogReplay, but not explicitly defined. This is because the flag no
+ // longer has any meaning to the server, and will be ignored, so there is no reason for it to
+ // be set by clients.
noTimeout: 0x10,
awaitData: 0x20,
exhaust: 0x40,