summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@mongodb.com>2017-11-15 15:38:14 -0500
committerMartin Neupauer <martin.neupauer@mongodb.com>2017-12-12 15:49:15 -0500
commitb79e5f04ffc79b5892f89c22b9e5f26a297b1185 (patch)
treeba924a0eb8e03751f3dd95a4461e53f877ceba19 /src/mongo
parent847104b8775af27762b35ff99da0d78511c01376 (diff)
downloadmongo-b79e5f04ffc79b5892f89c22b9e5f26a297b1185.tar.gz
SERVER-31684 Fix unexpected "operation exceeded time limit" errors
The changestream queries used an operation context deadline to track a wait time before returning EOF. This occasionaly interfered with normal operation deadlines leading to unexpected errors.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/catalog/collection.cpp24
-rw-r--r--src/mongo/db/catalog/collection.h19
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp21
-rw-r--r--src/mongo/db/operation_context.h8
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp3
-rw-r--r--src/mongo/db/query/find.cpp4
-rw-r--r--src/mongo/db/query/find_common.cpp3
-rw-r--r--src/mongo/db/query/find_common.h20
-rw-r--r--src/mongo/db/query/plan_executor.cpp43
-rw-r--r--src/mongo/db/query/plan_executor.h15
10 files changed, 68 insertions, 92 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index 5d8e64a9bdd..6493d147605 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -152,33 +152,15 @@ void CappedInsertNotifier::notifyAll() {
_notifier.notify_all();
}
-void CappedInsertNotifier::_wait(stdx::unique_lock<stdx::mutex>& lk,
- uint64_t prevVersion,
- Microseconds timeout) const {
+void CappedInsertNotifier::waitUntil(uint64_t prevVersion, Date_t deadline) const {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
while (!_dead && prevVersion == _version) {
- if (timeout == Microseconds::max()) {
- _notifier.wait(lk);
- } else if (stdx::cv_status::timeout == _notifier.wait_for(lk, timeout.toSystemDuration())) {
+ if (stdx::cv_status::timeout == _notifier.wait_until(lk, deadline.toSystemTimePoint())) {
return;
}
}
}
-void CappedInsertNotifier::wait(uint64_t prevVersion, Microseconds timeout) const {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _wait(lk, prevVersion, timeout);
-}
-
-void CappedInsertNotifier::wait(Microseconds timeout) const {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _wait(lk, _version, timeout);
-}
-
-void CappedInsertNotifier::wait() const {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _wait(lk, _version, Microseconds::max());
-}
-
void CappedInsertNotifier::kill() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_dead = true;
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index 216bfc971af..1e9ad8ff4cd 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -112,17 +112,12 @@ public:
void notifyAll();
/**
- * Waits for 'timeout' microseconds, or until notifyAll() is called to indicate that new
+ * Waits until 'deadline', or until notifyAll() is called to indicate that new
* data is available in the capped collection.
*
* NOTE: Waiting threads can be signaled by calling kill or notify* methods.
*/
- void wait(Microseconds timeout) const;
-
- /**
- * Same as above but also ensures that if the version has changed, it also returns.
- */
- void wait(uint64_t prevVersion, Microseconds timeout) const;
+ void waitUntil(uint64_t prevVersion, Date_t deadline) const;
/**
* Returns the version for use as an additional wake condition when used above.
@@ -132,11 +127,6 @@ public:
}
/**
- * Same as above but without a timeout.
- */
- void wait() const;
-
- /**
* Cancels the notifier if the collection is dropped/invalidated, and wakes all waiting.
*/
void kill();
@@ -147,11 +137,6 @@ public:
bool isDead();
private:
- // Helper for wait impls.
- void _wait(stdx::unique_lock<stdx::mutex>& lk,
- uint64_t prevVersion,
- Microseconds timeout) const;
-
// Signalled when a successful insert is made into a capped collection.
mutable stdx::condition_variable _notifier;
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index cc9416cf621..4bab002396f 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -282,19 +282,21 @@ public:
if (cursor->isReadCommitted())
uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
- const bool hasOwnMaxTime = opCtx->hasDeadline();
-
const bool disableAwaitDataFailpointActive =
MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd);
+
// We assume that cursors created through a DBDirectClient are always used from their
// original OperationContext, so we do not need to move time to and from the cursor.
- if (!hasOwnMaxTime && !opCtx->getClient()->isInDirectClient()) {
+ if (!opCtx->getClient()->isInDirectClient()) {
// There is no time limit set directly on this getMore command. If the cursor is
// awaitData, then we supply a default time of one second. Otherwise we roll over
// any leftover time from the maxTimeMS of the operation that spawned this cursor,
// applying it to this getMore.
if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) {
- opCtx->setDeadlineAfterNowBy(Seconds{1});
+ opCtx->clearDeadline();
+ awaitDataState(opCtx).waitForInsertsDeadline =
+ opCtx->getServiceContext()->getPreciseClockSource()->now() +
+ request.awaitDataTimeout.value_or(Seconds{1});
} else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) {
opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros());
}
@@ -336,7 +338,7 @@ public:
if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) {
if (request.lastKnownCommittedOpTime)
clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get();
- shouldWaitForInserts(opCtx) = true;
+ awaitDataState(opCtx).shouldWaitForInserts = true;
}
Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults);
@@ -366,12 +368,7 @@ public:
exec->saveState();
exec->detachFromOperationContext();
- // If maxTimeMS was set directly on the getMore rather than being rolled over
- // from a previous find, then don't roll remaining micros over to the next
- // getMore.
- if (!hasOwnMaxTime) {
- cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
- }
+ cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
cursor->incPos(numResults);
} else {
@@ -439,7 +436,7 @@ public:
}
// As soon as we get a result, this operation no longer waits.
- shouldWaitForInserts(opCtx) = false;
+ awaitDataState(opCtx).shouldWaitForInserts = false;
// Add result to output buffer.
nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
nextBatch->append(obj);
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 4d5f05ceb85..915b8060981 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -411,6 +411,14 @@ public:
}
/**
+ * Reset the deadline for this operation.
+ */
+ void clearDeadline() {
+ _deadline = Date_t::max();
+ _maxTime = computeMaxTimeFromDeadline(_deadline);
+ }
+
+ /**
* Returns the number of milliseconds remaining for this operation's time limit or
* Milliseconds::max() if the operation has no time limit.
*/
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index bba8a3dee72..a25926f97e0 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/query/explain.h"
+#include "mongo/db/query/find_common.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/util/scopeguard.h"
@@ -104,7 +105,7 @@ void DocumentSourceCursor::loadBatch() {
// we need the whole pipeline to see each document to see if we should stop waiting.
// Furthermore, if we need to return the latest oplog time (in the tailable and
// needs-merge case), batching will result in a wrong time.
- if (shouldWaitForInserts(pExpCtx->opCtx) ||
+ if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
(pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) ||
memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 50db4cff04b..394f7fa552a 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -428,7 +428,9 @@ Message getMore(OperationContext* opCtx,
// the total operation latency.
curOp.pauseTimer();
Seconds timeout(1);
- notifier->wait(notifierVersion, timeout);
+ notifier->waitUntil(notifierVersion,
+ opCtx->getServiceContext()->getPreciseClockSource()->now() +
+ timeout);
notifier.reset();
curOp.resumeTimer();
diff --git a/src/mongo/db/query/find_common.cpp b/src/mongo/db/query/find_common.cpp
index fa2049b07a0..b59b6e2a699 100644
--- a/src/mongo/db/query/find_common.cpp
+++ b/src/mongo/db/query/find_common.cpp
@@ -40,6 +40,9 @@ MONGO_FP_DECLARE(keepCursorPinnedDuringGetMore);
MONGO_FP_DECLARE(disableAwaitDataForGetMoreCmd);
+const OperationContext::Decoration<AwaitDataState> awaitDataState =
+ OperationContext::declareDecoration<AwaitDataState>();
+
bool FindCommon::enoughForFirstBatch(const QueryRequest& qr, long long numDocs) {
if (!qr.getEffectiveBatchSize()) {
// We enforce a default batch size for the initial find if no batch size is specified.
diff --git a/src/mongo/db/query/find_common.h b/src/mongo/db/query/find_common.h
index 657554a0d94..0e83e3cb546 100644
--- a/src/mongo/db/query/find_common.h
+++ b/src/mongo/db/query/find_common.h
@@ -27,10 +27,30 @@
*/
#include "mongo/bson/bsonobj.h"
+#include "mongo/db/operation_context.h"
#include "mongo/util/fail_point_service.h"
namespace mongo {
+/**
+ * The state associated with tailable cursors.
+ */
+struct AwaitDataState {
+ /**
+ * The deadline for how long we wait on the tail of capped collection before returning IS_EOF.
+ */
+ Date_t waitForInsertsDeadline;
+
+ /**
+ * If true, when no results are available from a plan, then instead of returning immediately,
+ * the system should wait up to the length of the operation deadline for data to be inserted
+ * which causes results to become available.
+ */
+ bool shouldWaitForInserts;
+};
+
+extern const OperationContext::Decoration<AwaitDataState> awaitDataState;
+
class BSONObj;
class QueryRequest;
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index e1ae5988b3a..b2619986a46 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/exec/subplan.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/query/find_common.h"
#include "mongo/db/query/mock_yield_policies.h"
#include "mongo/db/query/plan_yield_policy.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -62,8 +63,6 @@ using std::string;
using std::unique_ptr;
using std::vector;
-const OperationContext::Decoration<bool> shouldWaitForInserts =
- OperationContext::declareDecoration<bool>();
const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime =
OperationContext::declareDecoration<repl::OpTime>();
@@ -424,8 +423,9 @@ bool PlanExecutor::shouldWaitForInserts() {
// If this is an awaitData-respecting operation and we have time left and we're not interrupted,
// we should wait for inserts.
if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() &&
- mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() &&
- _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) {
+ awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() &&
+ awaitDataState(_opCtx).waitForInsertsDeadline >
+ _opCtx->getServiceContext()->getPreciseClockSource()->now()) {
// We expect awaitData cursors to be yielding.
invariant(_yieldPolicy->canReleaseLocksDuringExecution());
@@ -470,15 +470,21 @@ PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* n
auto opCtx = _opCtx;
uint64_t currentNotifierVersion = notifierData->notifier->getVersion();
auto yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] {
- const auto timeout = opCtx->getRemainingMaxTimeMicros();
- notifierData->notifier->wait(notifierData->lastEOFVersion, timeout);
+ const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline;
+ notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline);
});
notifierData->lastEOFVersion = currentNotifierVersion;
+
if (yieldResult.isOK()) {
// There may be more results, try to get more data.
return ADVANCED;
}
- return swallowTimeoutIfAwaitData(yieldResult, errorObj);
+
+ if (errorObj) {
+ *errorObj = Snapshotted<BSONObj>(SnapshotId(),
+ WorkingSetCommon::buildMemberStatusObject(yieldResult));
+ }
+ return DEAD;
}
PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) {
@@ -534,7 +540,11 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
if (_yieldPolicy->shouldYield()) {
auto yieldStatus = _yieldPolicy->yield(fetcher.get());
if (!yieldStatus.isOK()) {
- return swallowTimeoutIfAwaitData(yieldStatus, objOut);
+ if (objOut) {
+ *objOut = Snapshotted<BSONObj>(
+ SnapshotId(), WorkingSetCommon::buildMemberStatusObject(yieldStatus));
+ }
+ return PlanExecutor::DEAD;
}
}
@@ -687,23 +697,6 @@ void PlanExecutor::enqueue(const BSONObj& obj) {
_stash.push(obj.getOwned());
}
-PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData(
- Status yieldError, Snapshotted<BSONObj>* errorObj) const {
- if (yieldError == ErrorCodes::ExceededTimeLimit) {
- if (_cq && _cq->getQueryRequest().isTailableAndAwaitData()) {
- // If the cursor is tailable then exceeding the time limit should not destroy this
- // PlanExecutor, we should just stop waiting for inserts.
- return PlanExecutor::IS_EOF;
- }
- }
-
- if (errorObj) {
- *errorObj = Snapshotted<BSONObj>(SnapshotId(),
- WorkingSetCommon::buildMemberStatusObject(yieldError));
- }
- return PlanExecutor::DEAD;
-}
-
Timestamp PlanExecutor::getLatestOplogTimestamp() {
if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY))
return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp();
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index aeed2ea8154..3de96faf96b 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -53,13 +53,6 @@ struct PlanStageStats;
class WorkingSet;
/**
- * If true, when no results are available from a plan, then instead of returning immediately, the
- * system should wait up to the length of the operation deadline for data to be inserted which
- * causes results to become available.
- */
-extern const OperationContext::Decoration<bool> shouldWaitForInserts;
-
-/**
* If a getMore command specified a lastKnownCommittedOpTime (as secondaries do), we want to stop
* waiting for new data as soon as the committed op time changes.
*
@@ -540,14 +533,6 @@ private:
*/
Status pickBestPlan(const Collection* collection);
- /**
- * Given a non-OK status returned from a yield 'yieldError', checks if this PlanExecutor
- * represents a tailable, awaitData cursor and whether 'yieldError' is the error object
- * describing an operation time out. If so, returns IS_EOF. Otherwise returns DEAD, and
- * populates 'errorObj' with the error - if 'errorObj' is not null.
- */
- ExecState swallowTimeoutIfAwaitData(Status yieldError, Snapshotted<BSONObj>* errorObj) const;
-
// The OperationContext that we're executing within. This can be updated if necessary by using
// detachFromOperationContext() and reattachToOperationContext().
OperationContext* _opCtx;