summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/async_results_merger.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-04-13 15:13:36 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-04-30 14:09:01 -0400
commita43fe9ae73752fbd98107cef5421341fe291ab32 (patch)
tree8972aabe0b36655e67c8b6c52fa2b9a2916d6eed /src/mongo/s/query/async_results_merger.cpp
parent7a2217a54d59c5d97e9e79cc40639c2589a18deb (diff)
downloadmongo-a43fe9ae73752fbd98107cef5421341fe291ab32.tar.gz
SERVER-34204 Always pass non-null opCtx when scheduling getMores in ARM
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp50
1 files changed, 33 insertions, 17 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 3e41a36c089..f5268ac3408 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -335,6 +335,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) {
}
Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
+ invariant(_opCtx, "Cannot schedule a getMore without an OperationContext");
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
@@ -387,6 +388,32 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
return Status::OK();
}
+Status AsyncResultsMerger::scheduleGetMores() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _scheduleGetMores(lk);
+}
+
+Status AsyncResultsMerger::_scheduleGetMores(WithLock lk) {
+ // Schedule remote work on hosts for which we need more results.
+ for (size_t i = 0; i < _remotes.size(); ++i) {
+ auto& remote = _remotes[i];
+
+ if (!remote.status.isOK()) {
+ return remote.status;
+ }
+
+ if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) {
+ // If this remote is not exhausted and there is no outstanding request for it, schedule
+ // work to retrieve the next batch.
+ auto nextBatchStatus = _askForNextBatch(lk, i);
+ if (!nextBatchStatus.isOK()) {
+ return nextBatchStatus;
+ }
+ }
+ }
+ return Status::OK();
+}
+
/*
* Note: When nextEvent() is called to do retries, only the remotes with retriable errors will
* be rescheduled because:
@@ -411,22 +438,9 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent()
"nextEvent() called before an outstanding event was signaled");
}
- // Schedule remote work on hosts for which we need more results.
- for (size_t i = 0; i < _remotes.size(); ++i) {
- auto& remote = _remotes[i];
-
- if (!remote.status.isOK()) {
- return remote.status;
- }
-
- if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) {
- // If this remote is not exhausted and there is no outstanding request for it, schedule
- // work to retrieve the next batch.
- auto nextBatchStatus = _askForNextBatch(lk, i);
- if (!nextBatchStatus.isOK()) {
- return nextBatchStatus;
- }
- }
+ auto getMoresStatus = _scheduleGetMores(lk);
+ if (!getMoresStatus.isOK()) {
+ return getMoresStatus;
}
auto eventStatus = _executor->makeEvent();
@@ -592,9 +606,11 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk,
if (_tailableMode == TailableModeEnum::kTailable && !remote.hasNext()) {
invariant(_remotes.size() == 1);
_eofNext = true;
- } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) {
+ } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive && _opCtx) {
// If this is normal or tailable-awaitData cursor and we still don't have anything buffered
// after receiving this batch, we can schedule work to retrieve the next batch right away.
+ // Be careful only to do this when '_opCtx' is non-null, since it is illegal to schedule a
+ // remote command on a user's behalf without a non-null OperationContext.
remote.status = _askForNextBatch(lk, remoteIndex);
}
}