/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery #include "mongo/platform/basic.h" #include "mongo/s/query/router_stage_merge.h" #include "mongo/db/query/find_common.h" #include "mongo/util/scopeguard.h" namespace mongo { RouterStageMerge::RouterStageMerge(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) : RouterExecStage(opCtx), _executor(executor), _params(params), _arm(opCtx, executor, params->extractARMParams()) {} StatusWith RouterStageMerge::next(ExecContext execCtx) { // Non-tailable and tailable non-awaitData cursors always block until ready(). AwaitData // cursors wait for ready() only until a specified time limit is exceeded. return (_params->tailableMode == TailableModeEnum::kTailableAndAwaitData ? awaitNextWithTimeout(execCtx) : _arm.blockingNext()); } StatusWith RouterStageMerge::awaitNextWithTimeout(ExecContext execCtx) { invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData); // If we are in kInitialFind or kGetMoreWithAtLeastOneResultInBatch context and the ARM is not // ready, we don't block. Fall straight through to the return statement. while (!_arm.ready() && execCtx == ExecContext::kGetMoreNoResultsYet) { auto nextEventStatus = getNextEvent(); if (!nextEventStatus.isOK()) { return nextEventStatus.getStatus(); } auto event = nextEventStatus.getValue(); // Block until there are further results to return, or our time limit is exceeded. auto waitStatus = _executor->waitForEvent( getOpCtx(), event, awaitDataState(getOpCtx()).waitForInsertsDeadline); if (!waitStatus.isOK()) { return waitStatus.getStatus(); } // Swallow timeout errors for tailable awaitData cursors, stash the event that we were // waiting on, and return EOF. if (waitStatus == stdx::cv_status::timeout) { _leftoverEventFromLastTimeout = std::move(event); return ClusterQueryResult{}; } } // We reach this point either if the ARM is ready, or if the ARM is !ready and we are in // kInitialFind or kGetMoreWithAtLeastOneResultInBatch ExecContext. In the latter case, we // return EOF immediately rather than blocking for further results. return _arm.ready() ? _arm.nextReady() : ClusterQueryResult{}; } StatusWith RouterStageMerge::getNextEvent() { // If we abandoned a previous event due to a mongoS-side timeout, wait for it first. if (_leftoverEventFromLastTimeout) { invariant(_params->tailableMode == TailableModeEnum::kTailableAndAwaitData); // If we have an outstanding event from last time, then we might have to manually schedule // some getMores for the cursors. If a remote response came back while we were between // getMores (from the user to mongos), the response may have been an empty batch, and the // ARM would not be able to ask for the next batch immediately since it is not attached to // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores // ourselves. Status getMoreStatus = _arm.scheduleGetMores(); if (!getMoreStatus.isOK()) { return getMoreStatus; } // Return the leftover event and clear '_leftoverEventFromLastTimeout'. auto event = _leftoverEventFromLastTimeout; _leftoverEventFromLastTimeout = EventHandle(); return event; } return _arm.nextEvent(); } void RouterStageMerge::kill(OperationContext* opCtx) { _arm.blockingKill(opCtx); } bool RouterStageMerge::remotesExhausted() { return _arm.remotesExhausted(); } std::size_t RouterStageMerge::getNumRemotes() const { return _arm.getNumRemotes(); } Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) { return _arm.setAwaitDataTimeout(awaitDataTimeout); } void RouterStageMerge::addNewShardCursors(std::vector&& newShards) { _arm.addNewShardCursors(std::move(newShards)); } } // namespace mongo