/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/cursor_id.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
/**
* AsyncResultsMerger is used to generate results from cursor-generating commands on one or more
* remote hosts. A cursor-generating command (e.g. the find command) is one that establishes a
* ClientCursor and a matching cursor id on the remote host. In order to retrieve all command
* results, getMores must be issued against each of the remote cursors until they are exhausted. The
* results from the remote nodes are merged to present either a single sorted or unsorted stream.
*
* The ARM offers a non-blocking interface: if no results are immediately available on this host for
* retrieval, calling nextEvent() schedules work on the remote hosts in order to generate further
* results. The event is signaled when further results are available.
*
* Work on remote nodes is accomplished by scheduling remote work in TaskExecutor's event loop.
*
* Task-scheduling behavior differs depending on whether there is a sort. If the result documents
* must be sorted, we pass the sort through to the remote nodes and then merge the sorted streams.
* This requires waiting until we have a response from every remote before returning results.
* Without a sort, we are ready to return results as soon as we have *any* response from a remote.
*
* On any error, the caller is responsible for shutting down the ARM using the kill() method.
*
* Does not throw exceptions.
*/
class AsyncResultsMerger {
MONGO_DISALLOW_COPYING(AsyncResultsMerger);
public:
/**
* Constructs a new AsyncResultsMerger. The TaskExecutor* and ClusterClientCursorParams& must
* remain valid for the lifetime of the ARM.
*/
AsyncResultsMerger(executor::TaskExecutor* executor, ClusterClientCursorParams params);
/**
* In order to be destroyed, either
* --the cursor must have been kill()'ed and the event return from kill() must have been
* signaled, or
* --all cursors must have been exhausted.
*/
virtual ~AsyncResultsMerger();
/**
* Returns true if there is no need to schedule remote work in order to take the next action.
* This means that either
* --there is a buffered result which we can return,
* --or all of the remote cursors have been closed and we are done,
* --or an error was received and the next call to nextReady() will return an error status,
* --or the ARM has been killed and is in the process of shutting down. In this case,
* nextReady() will report an error when called.
*
* A return value of true indicates that it is safe to call nextReady().
*/
bool ready();
/**
* If there is a result available that has already been retrieved from a remote node and
* buffered, then return it along with an ok status.
*
* If we have reached the end of the stream of results, returns boost::none along with an ok
* status.
*
* If this AsyncResultsMerger is fetching results from a remote cursor tailing a capped
* collection, may return boost::none before end-of-stream. (Tailable cursors remain open even
* when there are no further results, and may subsequently return more results when they become
* available.) The calling code is responsible for handling multiple boost::none return values,
* keeping the cursor open in the tailable case.
*
* If there has been an error received from one of the shards, or there is an error in
* processing results from a shard, then a non-ok status is returned.
*
* Invalid to call unless ready() has returned true (i.e., invalid to call if getting the next
* result requires scheduling remote work).
*/
StatusWith> nextReady();
/**
* Schedules remote work as required in order to make further results available. If there is an
* error in scheduling this work, returns a non-ok status. On success, returns an event handle.
* The caller can pass this event handle to 'executor' in order to be blocked until further
* results are available.
*
* Invalid to call unless ready() has returned false (i.e. invalid to call if the next result is
* available without scheduling remote work).
*
* Also invalid to call if there is an outstanding event, created by a previous call to this
* function, that has not yet been signaled. If there is an outstanding unsignaled event,
* returns an error.
*/
StatusWith nextEvent();
/**
* Starts shutting down this ARM. Returns a handle to an event which is signaled when this
* cursor is safe to destroy.
*
* Returns an invalid handle if the underlying task executor is shutting down. In this case, it
* is legal to destroy the cursor only after the task executor shutdown process is complete.
*
* An ARM can only be destroyed if either 1) all its results have been exhausted or 2) the kill
* event returned by this method has been signaled.
*
* May be called multiple times (idempotent).
*/
executor::TaskExecutor::EventHandle kill();
private:
/**
* We instantiate one of these per remote host. It contains the buffer of results we've
* retrieved from the host but not yet returned, as well as the cursor id, and any error
* reported from the remote.
*/
struct RemoteCursorData {
RemoteCursorData(const ClusterClientCursorParams::Remote& params);
/**
* Returns whether there is another buffered result available for this remote node.
*/
bool hasNext() const;
/**
* Returns whether the remote has given us all of its results (i.e. whether it has closed
* its cursor).
*/
bool exhausted() const;
HostAndPort hostAndPort;
BSONObj cmdObj;
boost::optional cursorId;
std::queue docBuffer;
executor::TaskExecutor::CallbackHandle cbHandle;
Status status = Status::OK();
// Set to true once we have heard from the remote node at least once.
bool gotFirstResponse = false;
// Count of fetched docs during ARM processing of the current batch. Used to reduce the
// batchSize in getMore when mongod returned less docs than the requested batchSize.
long long fetchedCount = 0;
};
class MergingComparator {
public:
MergingComparator(const std::vector& remotes, const BSONObj& sort)
: _remotes(remotes), _sort(sort) {}
bool operator()(const size_t& lhs, const size_t& rhs);
private:
const std::vector& _remotes;
const BSONObj& _sort;
};
enum LifecycleState { kAlive, kKillStarted, kKillComplete };
/**
* Callback run to handle a response from a killCursors command.
*/
static void handleKillCursorsResponse(
const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData);
/**
* Helper to schedule a command asking the remote node for another batch of results.
*
* The 'remoteIndex' gives the position of the remote node from which we are retrieving the
* batch in '_remotes'.
*
* Returns success if the command to retrieve the next batch was scheduled successfully.
*/
Status askForNextBatch_inlock(size_t remoteIndex);
//
// Helpers for ready().
//
bool ready_inlock();
bool readySorted_inlock();
bool readyUnsorted_inlock();
//
// Helpers for nextReady().
//
boost::optional nextReadySorted();
boost::optional nextReadyUnsorted();
/**
* When nextEvent() schedules remote work, it passes this method as a callback. The TaskExecutor
* will call this function, passing the response from the remote.
*
* 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore
* indicates which node the response came from and where the new result documents should be
* buffered.
*/
void handleBatchResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
size_t remoteIndex);
/**
* If there is a valid unsignaled event that has been requested via nextReady() and there are
* buffered results that are ready to return, signals that event.
*
* Invalidates the current event, as we must signal the event exactly once and we only keep a
* handle to a valid event if it is unsignaled.
*/
void signalCurrentEventIfReady_inlock();
/**
* Returns true if this async cursor is waiting to receive another batch from a remote.
*/
bool haveOutstandingBatchRequests_inlock();
/**
* Schedules a killCursors command to be run on all remote hosts that have open cursors.
*/
void scheduleKillCursors_inlock();
// Not owned here.
executor::TaskExecutor* _executor;
ClusterClientCursorParams _params;
// Must be acquired before accessing any data members (other than _params, which is read-only).
// Must also be held when calling any of the '_inlock()' helper functions.
stdx::mutex _mutex;
// Data tracking the state of our communication with each of the remote nodes.
std::vector _remotes;
// The top of this priority queue is the index into '_remotes' for the remote host that has the
// next document to return, according to the sort order. Used only if there is a sort.
std::priority_queue, MergingComparator> _mergeQueue;
// The index into '_remotes' for the remote from which we are currently retrieving results.
// Used only if there is *not* a sort.
size_t _gettingFromRemote = 0;
Status _status = Status::OK();
executor::TaskExecutor::EventHandle _currentEvent;
// For tailable cursors, set to true if the next result returned from nextReady() should be
// boost::none.
bool _eofNext = false;
//
// Killing
//
LifecycleState _lifecycleState = kAlive;
// Signaled when all outstanding batch request callbacks have run, and all killCursors commands
// have been scheduled. This means that the ARM is safe to delete.
executor::TaskExecutor::EventHandle _killCursorsScheduledEvent;
};
} // namespace mongo