summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSpencer Jackson <spencer.jackson@mongodb.com>2017-12-14 14:32:18 -0500
committerSpencer Jackson <spencer.jackson@mongodb.com>2017-12-14 14:32:18 -0500
commit1f38fb202b9f8696cf28d39e674242e036c0b75c (patch)
tree249a9c923a97dd7e37d8898c3782bd27a6dd53a3 /src
parent47247293f18ea581954f6fcf4c0018b7828e3c3a (diff)
downloadmongo-1f38fb202b9f8696cf28d39e674242e036c0b75c.tar.gz
Revert "SERVER-31684 Fix unexpected "operation exceeded time limit" errors"
This reverts commit b79e5f04ffc79b5892f89c22b9e5f26a297b1185.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/catalog/collection.cpp24
-rw-r--r--src/mongo/db/catalog/collection.h19
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp21
-rw-r--r--src/mongo/db/operation_context.h8
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp3
-rw-r--r--src/mongo/db/query/find.cpp4
-rw-r--r--src/mongo/db/query/find_common.cpp3
-rw-r--r--src/mongo/db/query/find_common.h20
-rw-r--r--src/mongo/db/query/plan_executor.cpp43
-rw-r--r--src/mongo/db/query/plan_executor.h15
10 files changed, 92 insertions, 68 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index 6493d147605..5d8e64a9bdd 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -152,15 +152,33 @@ void CappedInsertNotifier::notifyAll() {
_notifier.notify_all();
}
-void CappedInsertNotifier::waitUntil(uint64_t prevVersion, Date_t deadline) const {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+void CappedInsertNotifier::_wait(stdx::unique_lock<stdx::mutex>& lk,
+ uint64_t prevVersion,
+ Microseconds timeout) const {
while (!_dead && prevVersion == _version) {
- if (stdx::cv_status::timeout == _notifier.wait_until(lk, deadline.toSystemTimePoint())) {
+ if (timeout == Microseconds::max()) {
+ _notifier.wait(lk);
+ } else if (stdx::cv_status::timeout == _notifier.wait_for(lk, timeout.toSystemDuration())) {
return;
}
}
}
+void CappedInsertNotifier::wait(uint64_t prevVersion, Microseconds timeout) const {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _wait(lk, prevVersion, timeout);
+}
+
+void CappedInsertNotifier::wait(Microseconds timeout) const {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _wait(lk, _version, timeout);
+}
+
+void CappedInsertNotifier::wait() const {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _wait(lk, _version, Microseconds::max());
+}
+
void CappedInsertNotifier::kill() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_dead = true;
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index 1e9ad8ff4cd..216bfc971af 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -112,12 +112,17 @@ public:
void notifyAll();
/**
- * Waits until 'deadline', or until notifyAll() is called to indicate that new
+ * Waits for 'timeout' microseconds, or until notifyAll() is called to indicate that new
* data is available in the capped collection.
*
* NOTE: Waiting threads can be signaled by calling kill or notify* methods.
*/
- void waitUntil(uint64_t prevVersion, Date_t deadline) const;
+ void wait(Microseconds timeout) const;
+
+ /**
+ * Same as above but also ensures that if the version has changed, it also returns.
+ */
+ void wait(uint64_t prevVersion, Microseconds timeout) const;
/**
* Returns the version for use as an additional wake condition when used above.
@@ -127,6 +132,11 @@ public:
}
/**
+ * Same as above but without a timeout.
+ */
+ void wait() const;
+
+ /**
* Cancels the notifier if the collection is dropped/invalidated, and wakes all waiting.
*/
void kill();
@@ -137,6 +147,11 @@ public:
bool isDead();
private:
+ // Helper for wait impls.
+ void _wait(stdx::unique_lock<stdx::mutex>& lk,
+ uint64_t prevVersion,
+ Microseconds timeout) const;
+
// Signalled when a successful insert is made into a capped collection.
mutable stdx::condition_variable _notifier;
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 4bab002396f..cc9416cf621 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -282,21 +282,19 @@ public:
if (cursor->isReadCommitted())
uassertStatusOK(opCtx->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
+ const bool hasOwnMaxTime = opCtx->hasDeadline();
+
const bool disableAwaitDataFailpointActive =
MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd);
-
// We assume that cursors created through a DBDirectClient are always used from their
// original OperationContext, so we do not need to move time to and from the cursor.
- if (!opCtx->getClient()->isInDirectClient()) {
+ if (!hasOwnMaxTime && !opCtx->getClient()->isInDirectClient()) {
// There is no time limit set directly on this getMore command. If the cursor is
// awaitData, then we supply a default time of one second. Otherwise we roll over
// any leftover time from the maxTimeMS of the operation that spawned this cursor,
// applying it to this getMore.
if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) {
- opCtx->clearDeadline();
- awaitDataState(opCtx).waitForInsertsDeadline =
- opCtx->getServiceContext()->getPreciseClockSource()->now() +
- request.awaitDataTimeout.value_or(Seconds{1});
+ opCtx->setDeadlineAfterNowBy(Seconds{1});
} else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) {
opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros());
}
@@ -338,7 +336,7 @@ public:
if (cursor->isAwaitData() && !disableAwaitDataFailpointActive) {
if (request.lastKnownCommittedOpTime)
clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get();
- awaitDataState(opCtx).shouldWaitForInserts = true;
+ shouldWaitForInserts(opCtx) = true;
}
Status batchStatus = generateBatch(opCtx, cursor, request, &nextBatch, &state, &numResults);
@@ -368,7 +366,12 @@ public:
exec->saveState();
exec->detachFromOperationContext();
- cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
+ // If maxTimeMS was set directly on the getMore rather than being rolled over
+ // from a previous find, then don't roll remaining micros over to the next
+ // getMore.
+ if (!hasOwnMaxTime) {
+ cursor->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
+ }
cursor->incPos(numResults);
} else {
@@ -436,7 +439,7 @@ public:
}
// As soon as we get a result, this operation no longer waits.
- awaitDataState(opCtx).shouldWaitForInserts = false;
+ shouldWaitForInserts(opCtx) = false;
// Add result to output buffer.
nextBatch->setLatestOplogTimestamp(exec->getLatestOplogTimestamp());
nextBatch->append(obj);
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 915b8060981..4d5f05ceb85 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -411,14 +411,6 @@ public:
}
/**
- * Reset the deadline for this operation.
- */
- void clearDeadline() {
- _deadline = Date_t::max();
- _maxTime = computeMaxTimeFromDeadline(_deadline);
- }
-
- /**
* Returns the number of milliseconds remaining for this operation's time limit or
* Milliseconds::max() if the operation has no time limit.
*/
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index a25926f97e0..bba8a3dee72 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -34,7 +34,6 @@
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/query/explain.h"
-#include "mongo/db/query/find_common.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/util/scopeguard.h"
@@ -105,7 +104,7 @@ void DocumentSourceCursor::loadBatch() {
// we need the whole pipeline to see each document to see if we should stop waiting.
// Furthermore, if we need to return the latest oplog time (in the tailable and
// needs-merge case), batching will result in a wrong time.
- if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
+ if (shouldWaitForInserts(pExpCtx->opCtx) ||
(pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) ||
memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 394f7fa552a..50db4cff04b 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -428,9 +428,7 @@ Message getMore(OperationContext* opCtx,
// the total operation latency.
curOp.pauseTimer();
Seconds timeout(1);
- notifier->waitUntil(notifierVersion,
- opCtx->getServiceContext()->getPreciseClockSource()->now() +
- timeout);
+ notifier->wait(notifierVersion, timeout);
notifier.reset();
curOp.resumeTimer();
diff --git a/src/mongo/db/query/find_common.cpp b/src/mongo/db/query/find_common.cpp
index b59b6e2a699..fa2049b07a0 100644
--- a/src/mongo/db/query/find_common.cpp
+++ b/src/mongo/db/query/find_common.cpp
@@ -40,9 +40,6 @@ MONGO_FP_DECLARE(keepCursorPinnedDuringGetMore);
MONGO_FP_DECLARE(disableAwaitDataForGetMoreCmd);
-const OperationContext::Decoration<AwaitDataState> awaitDataState =
- OperationContext::declareDecoration<AwaitDataState>();
-
bool FindCommon::enoughForFirstBatch(const QueryRequest& qr, long long numDocs) {
if (!qr.getEffectiveBatchSize()) {
// We enforce a default batch size for the initial find if no batch size is specified.
diff --git a/src/mongo/db/query/find_common.h b/src/mongo/db/query/find_common.h
index 0e83e3cb546..657554a0d94 100644
--- a/src/mongo/db/query/find_common.h
+++ b/src/mongo/db/query/find_common.h
@@ -27,30 +27,10 @@
*/
#include "mongo/bson/bsonobj.h"
-#include "mongo/db/operation_context.h"
#include "mongo/util/fail_point_service.h"
namespace mongo {
-/**
- * The state associated with tailable cursors.
- */
-struct AwaitDataState {
- /**
- * The deadline for how long we wait on the tail of capped collection before returning IS_EOF.
- */
- Date_t waitForInsertsDeadline;
-
- /**
- * If true, when no results are available from a plan, then instead of returning immediately,
- * the system should wait up to the length of the operation deadline for data to be inserted
- * which causes results to become available.
- */
- bool shouldWaitForInserts;
-};
-
-extern const OperationContext::Decoration<AwaitDataState> awaitDataState;
-
class BSONObj;
class QueryRequest;
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index b2619986a46..e1ae5988b3a 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -45,7 +45,6 @@
#include "mongo/db/exec/subplan.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
-#include "mongo/db/query/find_common.h"
#include "mongo/db/query/mock_yield_policies.h"
#include "mongo/db/query/plan_yield_policy.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -63,6 +62,8 @@ using std::string;
using std::unique_ptr;
using std::vector;
+const OperationContext::Decoration<bool> shouldWaitForInserts =
+ OperationContext::declareDecoration<bool>();
const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime =
OperationContext::declareDecoration<repl::OpTime>();
@@ -423,9 +424,8 @@ bool PlanExecutor::shouldWaitForInserts() {
// If this is an awaitData-respecting operation and we have time left and we're not interrupted,
// we should wait for inserts.
if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() &&
- awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() &&
- awaitDataState(_opCtx).waitForInsertsDeadline >
- _opCtx->getServiceContext()->getPreciseClockSource()->now()) {
+ mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() &&
+ _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) {
// We expect awaitData cursors to be yielding.
invariant(_yieldPolicy->canReleaseLocksDuringExecution());
@@ -470,21 +470,15 @@ PlanExecutor::ExecState PlanExecutor::waitForInserts(CappedInsertNotifierData* n
auto opCtx = _opCtx;
uint64_t currentNotifierVersion = notifierData->notifier->getVersion();
auto yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] {
- const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline;
- notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline);
+ const auto timeout = opCtx->getRemainingMaxTimeMicros();
+ notifierData->notifier->wait(notifierData->lastEOFVersion, timeout);
});
notifierData->lastEOFVersion = currentNotifierVersion;
-
if (yieldResult.isOK()) {
// There may be more results, try to get more data.
return ADVANCED;
}
-
- if (errorObj) {
- *errorObj = Snapshotted<BSONObj>(SnapshotId(),
- WorkingSetCommon::buildMemberStatusObject(yieldResult));
- }
- return DEAD;
+ return swallowTimeoutIfAwaitData(yieldResult, errorObj);
}
PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) {
@@ -540,11 +534,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
if (_yieldPolicy->shouldYield()) {
auto yieldStatus = _yieldPolicy->yield(fetcher.get());
if (!yieldStatus.isOK()) {
- if (objOut) {
- *objOut = Snapshotted<BSONObj>(
- SnapshotId(), WorkingSetCommon::buildMemberStatusObject(yieldStatus));
- }
- return PlanExecutor::DEAD;
+ return swallowTimeoutIfAwaitData(yieldStatus, objOut);
}
}
@@ -697,6 +687,23 @@ void PlanExecutor::enqueue(const BSONObj& obj) {
_stash.push(obj.getOwned());
}
+PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData(
+ Status yieldError, Snapshotted<BSONObj>* errorObj) const {
+ if (yieldError == ErrorCodes::ExceededTimeLimit) {
+ if (_cq && _cq->getQueryRequest().isTailableAndAwaitData()) {
+ // If the cursor is tailable then exceeding the time limit should not destroy this
+ // PlanExecutor, we should just stop waiting for inserts.
+ return PlanExecutor::IS_EOF;
+ }
+ }
+
+ if (errorObj) {
+ *errorObj = Snapshotted<BSONObj>(SnapshotId(),
+ WorkingSetCommon::buildMemberStatusObject(yieldError));
+ }
+ return PlanExecutor::DEAD;
+}
+
Timestamp PlanExecutor::getLatestOplogTimestamp() {
if (auto pipelineProxy = getStageByType(_root.get(), STAGE_PIPELINE_PROXY))
return static_cast<PipelineProxyStage*>(pipelineProxy)->getLatestOplogTimestamp();
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index 3de96faf96b..aeed2ea8154 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -53,6 +53,13 @@ struct PlanStageStats;
class WorkingSet;
/**
+ * If true, when no results are available from a plan, then instead of returning immediately, the
+ * system should wait up to the length of the operation deadline for data to be inserted which
+ * causes results to become available.
+ */
+extern const OperationContext::Decoration<bool> shouldWaitForInserts;
+
+/**
* If a getMore command specified a lastKnownCommittedOpTime (as secondaries do), we want to stop
* waiting for new data as soon as the committed op time changes.
*
@@ -533,6 +540,14 @@ private:
*/
Status pickBestPlan(const Collection* collection);
+ /**
+ * Given a non-OK status returned from a yield 'yieldError', checks if this PlanExecutor
+ * represents a tailable, awaitData cursor and whether 'yieldError' is the error object
+ * describing an operation time out. If so, returns IS_EOF. Otherwise returns DEAD, and
+ * populates 'errorObj' with the error - if 'errorObj' is not null.
+ */
+ ExecState swallowTimeoutIfAwaitData(Status yieldError, Snapshotted<BSONObj>* errorObj) const;
+
// The OperationContext that we're executing within. This can be updated if necessary by using
// detachFromOperationContext() and reattachToOperationContext().
OperationContext* _opCtx;