summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-07-14 17:15:52 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-07-17 08:52:57 -0400
commit3d38a6ff86b47b71d735b77f39704adec3ef3da7 (patch)
tree8f318b2b52852a1511ed6da6ede9ac62cbe67d4d /src
parenta1c67941bf08c69cab04eba20bc9ce9a763e1c7f (diff)
downloadmongo-3d38a6ff86b47b71d735b77f39704adec3ef3da7.tar.gz
SERVER-29128 Fix performance regression on awaitData with lastKnownCommittedOpTime
Revert "Revert "SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries"" This reverts commit d29e92cffcb4db3cdd77b1e53d5d005db6cc309d.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/clientcursor.h27
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp72
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp31
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification_test.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp12
-rw-r--r--src/mongo/db/pipeline/expression_context.h11
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp8
-rw-r--r--src/mongo/db/query/SConscript1
-rw-r--r--src/mongo/db/query/find.cpp5
-rw-r--r--src/mongo/db/query/plan_executor.cpp71
-rw-r--r--src/mongo/db/query/plan_executor.h27
-rw-r--r--src/mongo/db/query/plan_yield_policy.cpp18
-rw-r--r--src/mongo/db/query/plan_yield_policy.h7
-rw-r--r--src/mongo/db/query/query_yield.cpp20
-rw-r--r--src/mongo/db/query/query_yield.h5
16 files changed, 232 insertions, 92 deletions
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index 32fb018a8e1..f75d41e3089 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -71,6 +71,20 @@ struct ClientCursorParams {
}
}
+ void setTailable(bool tailable) {
+ if (tailable)
+ queryOptions |= QueryOption_CursorTailable;
+ else
+ queryOptions &= ~QueryOption_CursorTailable;
+ }
+
+ void setAwaitData(bool awaitData) {
+ if (awaitData)
+ queryOptions |= QueryOption_AwaitData;
+ else
+ queryOptions &= ~QueryOption_AwaitData;
+ }
+
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
const NamespaceString nss;
std::vector<UserName> authenticatedUsers;
@@ -127,10 +141,23 @@ public:
return _exec.get();
}
+ /**
+ * Returns the query options bitmask. If you'd like to know if the cursor is tailable or
+ * awaitData, prefer using the specific methods isTailable() and isAwaitData() over using this
+ * method.
+ */
int queryOptions() const {
return _queryOptions;
}
+ bool isTailable() const {
+ return _queryOptions & QueryOption_CursorTailable;
+ }
+
+ bool isAwaitData() const {
+ return _queryOptions & QueryOption_AwaitData;
+ }
+
const BSONObj& getOriginatingCommandObj() const {
return _originatingCommand;
}
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index e429c268dd6..d41867d16b9 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/query/find.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/getmore_request.h"
+#include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator_global.h"
@@ -269,11 +270,6 @@ public:
// Validation related to awaitData.
if (isCursorAwaitData(cursor)) {
invariant(isCursorTailable(cursor));
-
- if (CursorManager::isGloballyManagedCursor(request.cursorid)) {
- Status status(ErrorCodes::BadValue, "awaitData cannot be set on this cursor");
- return appendCommandStatus(result, status);
- }
}
if (request.awaitDataTimeout && !isCursorAwaitData(cursor)) {
@@ -322,21 +318,6 @@ public:
}
}
- uint64_t notifierVersion = 0;
- std::shared_ptr<CappedInsertNotifier> notifier;
- if (isCursorAwaitData(cursor)) {
- invariant(readLock->getCollection()->isCapped());
- // Retrieve the notifier which we will wait on until new data arrives. We make sure
- // to do this in the lock because once we drop the lock it is possible for the
- // collection to become invalid. The notifier itself will outlive the collection if
- // the collection is dropped, as we keep a shared_ptr to it.
- notifier = readLock->getCollection()->getCappedInsertNotifier();
-
- // Must get the version before we call generateBatch in case a write comes in after
- // that call and before we call wait on the notifier.
- notifierVersion = notifier->getVersion();
- }
-
CursorId respondWithId = 0;
CursorResponseBuilder nextBatch(/*isInitialResponse*/ false, &result);
BSONObj obj;
@@ -352,46 +333,16 @@ public:
PlanSummaryStats preExecutionStats;
Explain::getSummaryStats(*exec, &preExecutionStats);
- Status batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults);
- if (!batchStatus.isOK()) {
- return appendCommandStatus(result, batchStatus);
+ // Mark this as an AwaitData operation if appropriate.
+ if (isCursorAwaitData(cursor)) {
+ if (request.lastKnownCommittedOpTime)
+ clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get();
+ shouldWaitForInserts(opCtx) = true;
}
- // If this is an await data cursor, and we hit EOF without generating any results, then
- // we block waiting for new data to arrive.
- if (isCursorAwaitData(cursor) && state == PlanExecutor::IS_EOF && numResults == 0) {
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- // Return immediately if we need to update the commit time.
- if (!request.lastKnownCommittedOpTime ||
- (request.lastKnownCommittedOpTime == replCoord->getLastCommittedOpTime())) {
- // Retrieve the notifier which we will wait on until new data arrives. We make sure
- // to do this in the lock because once we drop the lock it is possible for the
- // collection to become invalid. The notifier itself will outlive the collection if
- // the collection is dropped, as we keep a shared_ptr to it.
- auto notifier = readLock->getCollection()->getCappedInsertNotifier();
-
- // Save the PlanExecutor and drop our locks.
- exec->saveState();
- readLock.reset();
-
- // Block waiting for data. Time spent blocking is not counted towards the total
- // operation latency.
- curOp->pauseTimer();
- const auto timeout = opCtx->getRemainingMaxTimeMicros();
- notifier->wait(notifierVersion, timeout);
- notifier.reset();
- curOp->resumeTimer();
-
- readLock.emplace(opCtx, request.nss);
- exec->restoreState();
-
- // We woke up because either the timed_wait expired, or there was more data. Either
- // way, attempt to generate another batch of results.
- batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults);
- if (!batchStatus.isOK()) {
- return appendCommandStatus(result, batchStatus);
- }
- }
+ Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults);
+ if (!batchStatus.isOK()) {
+ return appendCommandStatus(result, batchStatus);
}
PlanSummaryStats postExecutionStats;
@@ -472,7 +423,8 @@ public:
* Returns an OK status if the batch was successfully generated, and a non-OK status if the
* PlanExecutor encounters a failure.
*/
- Status generateBatch(ClientCursor* cursor,
+ Status generateBatch(OperationContext* opCtx,
+ ClientCursor* cursor,
const GetMoreRequest& request,
CursorResponseBuilder* nextBatch,
PlanExecutor::ExecState* state,
@@ -494,6 +446,8 @@ public:
break;
}
+ // As soon as we get a result, this operation no longer waits.
+ shouldWaitForInserts(opCtx) = false;
// Add result to output buffer.
nextBatch->append(obj);
(*numResults)++;
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 7b4d40f0bdb..1bee5cb5f07 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -99,8 +99,10 @@ bool handleCursorCommand(OperationContext* opCtx,
// do it when batchSize is 0 since that indicates a desire for a fast return.
PlanExecutor::ExecState state;
if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) {
- // make it an obvious error to use cursor or executor after this point
- cursor = nullptr;
+ if (!cursor->isTailable()) {
+ // make it an obvious error to use cursor or executor after this point
+ cursor = nullptr;
+ }
break;
}
@@ -393,6 +395,10 @@ Status runAggregate(OperationContext* opCtx,
uassertStatusOK(resolveInvolvedNamespaces(opCtx, request))));
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
+ if (liteParsedPipeline.startsWithChangeNotification()) {
+ expCtx->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData;
+ }
+
// Parse the pipeline.
auto statusWithPipeline = Pipeline::parse(request.getPipeline(), expCtx);
if (!statusWithPipeline.isOK()) {
@@ -451,13 +457,20 @@ Status runAggregate(OperationContext* opCtx,
// cursor manager. The global cursor manager does not deliver invalidations or kill
// notifications; the underlying PlanExecutor(s) used by the pipeline will be receiving
// invalidations and kill notifications themselves, not the cursor we create here.
- auto pin = CursorManager::getGlobalCursorManager()->registerCursor(
- opCtx,
- {std::move(exec),
- origNss,
- AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
- cmdObj});
+ ClientCursorParams cursorParams(
+ std::move(exec),
+ origNss,
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ cmdObj);
+ if (expCtx->tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData) {
+ cursorParams.setTailable(true);
+ cursorParams.setAwaitData(true);
+ }
+
+ auto pin =
+ CursorManager::getGlobalCursorManager()->registerCursor(opCtx, std::move(cursorParams));
+
ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, &pin);
// If both explain and cursor are specified, explain wins.
diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp
index faf5bcf97ac..b8d7027fc5f 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification.cpp
@@ -94,12 +94,10 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr
!expCtx->getCollator());
BSONObj matchObj = buildMatch(elem, expCtx->ns);
- BSONObj sortObj = BSON("$sort" << BSON("ts" << -1));
auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx);
- auto sortSource = DocumentSourceSort::createFromBson(sortObj.firstElement(), expCtx);
auto transformSource = createTransformationStage(expCtx);
- return {matchSource, sortSource, transformSource};
+ return {matchSource, transformSource};
}
intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage(
diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
index 0e2b53ea591..0fd14fd9a92 100644
--- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
@@ -94,14 +94,15 @@ public:
}
};
-TEST_F(ChangeNotificationStageTest, Basic) {
+TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) {
const auto spec = fromjson("{$changeNotification: {}}");
vector<intrusive_ptr<DocumentSource>> result =
DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
- ASSERT_EQUALS(result.size(), 3UL);
+ ASSERT_EQUALS(result.size(), 2UL);
ASSERT_EQUALS(string(result[0]->getSourceName()), "$match");
+ ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification");
// TODO: Check explain result.
}
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 9704d826e53..f15bafd5b8b 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -31,7 +31,6 @@
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/catalog/collection.h"
-#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/pipeline/document.h"
@@ -99,12 +98,21 @@ void DocumentSourceCursor::loadBatch() {
memUsageBytes += _currentBatch.back().getApproximateSize();
- if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
+ // As long as we're waiting for inserts, we shouldn't do any batching at this level
+ // we need the whole pipeline to see each document to see if we should stop waiting.
+ if (shouldWaitForInserts(pExpCtx->opCtx) ||
+ memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
return;
}
}
+ // Special case for tailable cursor -- EOF doesn't preclude more results, so keep
+ // the PlanExecutor alive.
+ if (state == PlanExecutor::IS_EOF && pExpCtx->isTailable()) {
+ _exec->saveState();
+ return;
+ }
}
}
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index e17128ec8a6..d04b095aa4b 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -58,6 +58,8 @@ public:
std::vector<BSONObj> pipeline;
};
+ enum class TailableMode { kNormal, kTailableAndAwaitData };
+
/**
* Constructs an ExpressionContext to be used for Pipeline parsing and evaluation.
* 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces.
@@ -101,6 +103,13 @@ public:
return it->second;
};
+ /**
+ * Convenience call that returns true if the tailableMode indicate a tailable query.
+ */
+ bool isTailable() const {
+ return tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData;
+ }
+
// The explain verbosity requested by the user, or boost::none if no explain was requested.
boost::optional<ExplainOptions::Verbosity> explain;
@@ -121,6 +130,8 @@ public:
Variables variables;
VariablesParseState variablesParseState;
+ TailableMode tailableMode = TailableMode::kNormal;
+
protected:
static const int kInterruptCheckPeriod = 128;
ExpressionContext() : variablesParseState(variables.useIdGenerator()) {}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index ec471f41df5..697c639ccf7 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -392,6 +392,14 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
const AggregationRequest* aggRequest,
const size_t plannerOpts) {
auto qr = stdx::make_unique<QueryRequest>(nss);
+ switch (pExpCtx->tailableMode) {
+ case ExpressionContext::TailableMode::kNormal:
+ break;
+ case ExpressionContext::TailableMode::kTailableAndAwaitData:
+ qr->setTailable(true);
+ qr->setAwaitData(true);
+ break;
+ }
qr->setFilter(queryObj);
qr->setProj(projectionObj);
qr->setSort(sortObj);
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index 854f3045506..60eab4d4f79 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -81,6 +81,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/index_catalog_entry',
"$BUILD_DIR/mongo/db/curop",
"$BUILD_DIR/mongo/db/exec/exec",
+ "$BUILD_DIR/mongo/db/repl/repl_coordinator_interface",
"$BUILD_DIR/mongo/db/s/sharding",
"$BUILD_DIR/mongo/db/storage/oplog_hack",
"$BUILD_DIR/mongo/util/elapsed_tracker",
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index da1d0d5c6ea..da2a71e2068 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -364,6 +364,11 @@ Message getMore(OperationContext* opCtx,
if (cc->isReadCommitted())
uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
+ uassert(40548,
+ "OP_GET_MORE operations are not supported on tailable aggregations. Only clients "
+ "which support the getMore command can be used on tailable aggregations.",
+ readLock || !isCursorAwaitData(cc));
+
// If the operation that spawned this cursor had a time limit set, apply leftover
// time to this getmore.
if (cc->getLeftoverMaxTimeMicros() < Microseconds::max()) {
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index 2789e660b76..17aa8d42f02 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -32,6 +32,8 @@
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
@@ -43,10 +45,12 @@
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/query/plan_yield_policy.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/record_fetcher.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/stacktrace.h"
namespace mongo {
@@ -56,6 +60,11 @@ using std::string;
using std::unique_ptr;
using std::vector;
+const OperationContext::Decoration<bool> shouldWaitForInserts =
+ OperationContext::declareDecoration<bool>();
+const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime =
+ OperationContext::declareDecoration<repl::OpTime>();
+
namespace {
namespace {
@@ -380,6 +389,50 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o
return getNextImpl(objOut, dlOut);
}
+
+bool PlanExecutor::shouldWaitForInserts() {
+ // If this is an awaitData-respecting operation and we have time left and we're not interrupted,
+ // we should wait for inserts.
+ if (mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() &&
+ _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) {
+ // For operations with a last committed opTime, we should not wait if the replication
+ // coordinator's lastCommittedOpTime has changed.
+ if (!clientsLastKnownCommittedOpTime(_opCtx).isNull()) {
+ auto replCoord = repl::ReplicationCoordinator::get(_opCtx);
+ return clientsLastKnownCommittedOpTime(_opCtx) == replCoord->getLastCommittedOpTime();
+ }
+ return true;
+ }
+ return false;
+}
+
+bool PlanExecutor::waitForInserts() {
+ // If we cannot yield, we should retry immediately.
+ if (!_yieldPolicy->canReleaseLocksDuringExecution())
+ return true;
+
+ // We can only wait if we have a collection; otherwise retry immediately.
+ dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS));
+ auto db = dbHolder().get(_opCtx, _nss.db());
+ if (!db)
+ return true;
+ auto collection = db->getCollection(_opCtx, _nss);
+ if (!collection)
+ return true;
+
+ auto notifier = collection->getCappedInsertNotifier();
+ uint64_t notifierVersion = notifier->getVersion();
+ auto curOp = CurOp::get(_opCtx);
+ curOp->pauseTimer();
+ ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); });
+ auto opCtx = _opCtx;
+ bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifier, notifierVersion] {
+ const auto timeout = opCtx->getRemainingMaxTimeMicros();
+ notifier->wait(notifierVersion, timeout);
+ });
+ return yieldResult;
+}
+
PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) {
if (MONGO_FAIL_POINT(planExecutorAlwaysFails)) {
Status status(ErrorCodes::OperationFailed,
@@ -508,7 +561,23 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
} else if (PlanStage::NEED_TIME == code) {
// Fall through to yield check at end of large conditional.
} else if (PlanStage::IS_EOF == code) {
- return PlanExecutor::IS_EOF;
+ if (shouldWaitForInserts()) {
+ const bool locksReacquiredAfterYield = waitForInserts();
+ if (locksReacquiredAfterYield) {
+ // There may be more results, try to get more data.
+ continue;
+ }
+ invariant(isMarkedAsKilled());
+ if (objOut) {
+ Status status(ErrorCodes::OperationFailed,
+ str::stream() << "Operation aborted because: " << *_killReason);
+ *objOut = Snapshotted<BSONObj>(
+ SnapshotId(), WorkingSetCommon::buildMemberStatusObject(status));
+ }
+ return PlanExecutor::DEAD;
+ } else {
+ return PlanExecutor::IS_EOF;
+ }
} else {
invariant(PlanStage::DEAD == code || PlanStage::FAILURE == code);
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index 48fed0eab0b..ba565e6b49c 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -51,6 +51,23 @@ struct PlanStageStats;
class WorkingSet;
/**
+ * If true, when no results are available from a plan, then instead of returning immediately, the
+ * system should wait up to the length of the operation deadline for data to be inserted which
+ * causes results to become available.
+ */
+extern const OperationContext::Decoration<bool> shouldWaitForInserts;
+
+/**
+ * If a getMore command specified a lastKnownCommittedOpTime (as secondaries do), we want to stop
+ * waiting for new data as soon as the committed op time changes.
+ *
+ * 'clientsLastKnownCommittedOpTime' represents the time passed to the getMore command.
+ * If the replication coordinator ever reports a higher committed op time, we should stop waiting
+ * for inserts and return immediately to speed up the propagation of commit level changes.
+ */
+extern const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime;
+
+/**
* A PlanExecutor is the abstraction that knows how to crank a tree of stages into execution.
* The executor is usually part of a larger abstraction that is interacting with the cache
* and/or the query optimizer.
@@ -425,6 +442,16 @@ public:
}
private:
+ // Returns true if the PlanExecutor should wait for data to be inserted, which is when a getMore
+ // is called on a tailable and awaitData cursor on a capped collection. Returns false if an EOF
+ // should be returned immediately.
+ bool shouldWaitForInserts();
+
+ // Yields locks and waits for inserts to the collection. Returns true if there may be new
+ // inserts, false if there is a timeout or an interrupt. If this planExecutor cannot yield,
+ // returns true immediately.
+ bool waitForInserts();
+
ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut);
/**
diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp
index 52633ea0185..c5b54a90afb 100644
--- a/src/mongo/db/query/plan_yield_policy.cpp
+++ b/src/mongo/db/query/plan_yield_policy.cpp
@@ -71,7 +71,19 @@ void PlanYieldPolicy::resetTimer() {
_elapsedTracker.resetLastTime();
}
-bool PlanYieldPolicy::yield(RecordFetcher* fetcher) {
+bool PlanYieldPolicy::yield(RecordFetcher* recordFetcher) {
+ invariant(_planYielding);
+ if (recordFetcher) {
+ OperationContext* opCtx = _planYielding->getOpCtx();
+ return yield([recordFetcher, opCtx] { recordFetcher->setup(opCtx); },
+ [recordFetcher] { recordFetcher->fetch(); });
+ } else {
+ return yield(nullptr, nullptr);
+ }
+}
+
+bool PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn,
+ stdx::function<void()> whileYieldingFn) {
invariant(_planYielding);
invariant(canAutoYield());
@@ -107,7 +119,9 @@ bool PlanYieldPolicy::yield(RecordFetcher* fetcher) {
opCtx->recoveryUnit()->abandonSnapshot();
} else {
// Release and reacquire locks.
- QueryYield::yieldAllLocks(opCtx, fetcher, _planYielding->nss());
+ if (beforeYieldingFn)
+ beforeYieldingFn();
+ QueryYield::yieldAllLocks(opCtx, whileYieldingFn, _planYielding->nss());
}
return _planYielding->restoreStateWithoutRetrying();
diff --git a/src/mongo/db/query/plan_yield_policy.h b/src/mongo/db/query/plan_yield_policy.h
index 892fbafe350..cdf93f0219a 100644
--- a/src/mongo/db/query/plan_yield_policy.h
+++ b/src/mongo/db/query/plan_yield_policy.h
@@ -30,6 +30,7 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/query/plan_executor.h"
+#include "mongo/stdx/functional.h"
#include "mongo/util/elapsed_tracker.h"
namespace mongo {
@@ -74,6 +75,12 @@ public:
bool yield(RecordFetcher* fetcher = NULL);
/**
+ * More generic version of yield() above. This version calls 'beforeYieldingFn' immediately
+ * before locks are yielded (if they are), and 'whileYieldingFn' before locks are restored.
+ */
+ bool yield(stdx::function<void()> beforeYieldingFn, stdx::function<void()> whileYieldingFn);
+
+ /**
* All calls to shouldYield() will return true until the next call to yield.
*/
void forceYield() {
diff --git a/src/mongo/db/query/query_yield.cpp b/src/mongo/db/query/query_yield.cpp
index 6e1de753010..e731383a92d 100644
--- a/src/mongo/db/query/query_yield.cpp
+++ b/src/mongo/db/query/query_yield.cpp
@@ -25,7 +25,6 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-
#include "mongo/platform/basic.h"
#include "mongo/db/query/query_yield.h"
@@ -46,23 +45,18 @@ MONGO_FP_DECLARE(setYieldAllLocksWait);
// static
void QueryYield::yieldAllLocks(OperationContext* opCtx,
- RecordFetcher* fetcher,
+ stdx::function<void()> whileYieldingFn,
const NamespaceString& planExecNS) {
// Things have to happen here in a specific order:
- // 1) Tell the RecordFetcher to do any setup which needs to happen inside locks
- // 2) Release lock mgr locks
- // 3) Go to sleep
- // 4) Touch the record we're yielding on, if there is one (RecordFetcher::fetch)
- // 5) Reacquire lock mgr locks
+ // * Release lock mgr locks
+ // * Go to sleep
+ // * Call the whileYieldingFn
+ // * Reacquire lock mgr locks
Locker* locker = opCtx->lockState();
Locker::LockSnapshot snapshot;
- if (fetcher) {
- fetcher->setup(opCtx);
- }
-
// Nothing was unlocked, just return, yielding is pointless.
if (!locker->saveLockStateAndUnlock(&snapshot)) {
return;
@@ -85,8 +79,8 @@ void QueryYield::yieldAllLocks(OperationContext* opCtx,
}
}
- if (fetcher) {
- fetcher->fetch();
+ if (whileYieldingFn) {
+ whileYieldingFn();
}
locker->restoreLockState(snapshot);
diff --git a/src/mongo/db/query/query_yield.h b/src/mongo/db/query/query_yield.h
index 7d98b299484..a4fbba72fe7 100644
--- a/src/mongo/db/query/query_yield.h
+++ b/src/mongo/db/query/query_yield.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/db/namespace_string.h"
+#include "mongo/stdx/functional.h"
namespace mongo {
@@ -47,9 +48,11 @@ public:
* switch to another thread, and then reacquires all locks.
*
* If in a nested context (eg DBDirectClient), does nothing.
+ *
+ * The whileYieldingFn will be executed after unlocking the locks and before re-acquiring them.
*/
static void yieldAllLocks(OperationContext* opCtx,
- RecordFetcher* fetcher,
+ stdx::function<void()> whileYieldingFn,
const NamespaceString& planExecNS);
};