summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/async_results_merger.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-07-02 18:23:25 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-08-15 13:30:12 -0400
commitee06e6cbe5a75775f76836449558be2f6a98ddfd (patch)
treed4dbf37110d25f7f4876337a7b1e11abe251fac5 /src/mongo/s/query/async_results_merger.cpp
parenta5bde2f3e9afc3f72da01788b76829fb29c2f4e7 (diff)
downloadmongo-ee06e6cbe5a75775f76836449558be2f6a98ddfd.tar.gz
SERVER-33323 Refactor agg cursor merging on mongos
This commit makes it so that aggregations will always use a $mergeCursors as a wrapper around a AsyncResultsMerger, which is new behavior for mongos. As part of this refactor, we can delete the concept of a 'merging presorted' $sort stage (which is now handled by the AsyncResultsMerger) and delete the DocumentSourceRouterAdapter stage which talked to a RouterStageMerge, instead directly using a $mergeCursors stage.
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp39
1 files changed, 3 insertions, 36 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index f5268ac3408..3cc6756c843 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -88,8 +88,7 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
// This strange initialization is to work around the fact that the IDL does not currently
// support a default value for an enum. The default tailable mode should be 'kNormal', but
// since that is not supported we treat boost::none (unspecified) to mean 'kNormal'.
- _tailableMode(params.getTailableMode() ? *params.getTailableMode()
- : TailableModeEnum::kNormal),
+ _tailableMode(params.getTailableMode().value_or(TailableModeEnum::kNormal)),
_params(std::move(params)),
_mergeQueue(MergingComparator(_remotes,
_params.getSort() ? *_params.getSort() : BSONObj(),
@@ -116,12 +115,12 @@ AsyncResultsMerger::~AsyncResultsMerger() {
invariant(_remotesExhausted(lk) || _lifecycleState == kKillComplete);
}
-bool AsyncResultsMerger::remotesExhausted() {
+bool AsyncResultsMerger::remotesExhausted() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return _remotesExhausted(lk);
}
-bool AsyncResultsMerger::_remotesExhausted(WithLock) {
+bool AsyncResultsMerger::_remotesExhausted(WithLock) const {
for (const auto& remote : _remotes) {
if (!remote.exhausted()) {
return false;
@@ -769,36 +768,4 @@ bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const
_sort) > 0;
}
-void AsyncResultsMerger::blockingKill(OperationContext* opCtx) {
- auto killEvent = kill(opCtx);
- if (!killEvent) {
- // We are shutting down.
- return;
- }
- _executor->waitForEvent(killEvent);
-}
-
-StatusWith<ClusterQueryResult> AsyncResultsMerger::blockingNext() {
- while (!ready()) {
- auto nextEventStatus = nextEvent();
- if (!nextEventStatus.isOK()) {
- return nextEventStatus.getStatus();
- }
- auto event = nextEventStatus.getValue();
-
- // Block until there are further results to return.
- auto status = _executor->waitForEvent(_opCtx, event);
-
- if (!status.isOK()) {
- return status.getStatus();
- }
-
- // We have not provided a deadline, so if the wait returns without interruption, we do not
- // expect to have timed out.
- invariant(status.getValue() == stdx::cv_status::no_timeout);
- }
-
- return nextReady();
-}
-
} // namespace mongo