/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/namespace_string.h"
#include "mongo/executor/task_executor.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
namespace {
using executor::RemoteCommandRequest;
}
class Fetcher {
MONGO_DISALLOW_COPYING(Fetcher);
public:
/**
* Container for BSON documents extracted from cursor results.
*/
typedef std::vector Documents;
/**
* Documents in current query response with cursor ID and associated namespace name.
* If cursor ID is zero, there are no additional batches.
*/
struct QueryResponse {
CursorId cursorId = 0;
NamespaceString nss;
Documents documents;
struct OtherFields {
BSONObj metadata;
} otherFields;
Milliseconds elapsedMillis = Milliseconds(0);
bool first = false;
};
using QueryResponseStatus = StatusWith;
/**
* Represents next steps of fetcher.
*/
enum class NextAction : int { kInvalid = 0, kNoAction = 1, kGetMore = 2 };
/**
* Type of a fetcher callback function.
*/
typedef stdx::function&, NextAction*, BSONObjBuilder*)>
CallbackFn;
/**
* Creates Fetcher task but does not schedule it to be run by the executor.
*
* First remote command to be run by the executor will be 'cmdObj'. The results
* of 'cmdObj' must contain a cursor response object.
* See Commands::appendCursorResponseObject.
*
* Callback function 'work' will be called 1 or more times after a successful
* schedule() call depending on the results of the remote command.
*
* Depending on the cursor ID in the initial cursor response object, the fetcher may run
* subsequent getMore commands on the remote server in order to obtain a complete
* set of results.
*
* Failed remote commands will also cause 'work' to be invoked with the
* error details provided by the remote server. On failure, the fetcher will stop
* sending getMore requests to the remote server.
*
* If the fetcher is canceled (either by calling cancel() or shutting down the executor),
* 'work' will not be invoked.
*
* Fetcher uses the NextAction and BSONObjBuilder arguments to inform client via callback
* if a follow-up (like getMore) command will be scheduled to be run by the executor to
* retrieve additional results. The BSONObjBuilder pointer will be valid only if NextAction
* is kGetMore.
* Otherwise, the BSONObjBuilder pointer will be null.
* Also, note that the NextAction is both an input and output argument to allow
* the client to suggest a different action for the fetcher to take post-callback.
*
* The callback function 'work' is not allowed to call into the Fetcher instance. This
* behavior is undefined and may result in a deadlock.
*
* An optional retry policy may be provided for the first remote command request so that
* the remote command scheduler will re-send the command in case of transient network errors.
*/
Fetcher(executor::TaskExecutor* executor,
const HostAndPort& source,
const std::string& dbname,
const BSONObj& cmdObj,
const CallbackFn& work,
const BSONObj& metadata = rpc::makeEmptyMetadata(),
Milliseconds timeout = RemoteCommandRequest::kNoTimeout,
std::unique_ptr firstCommandRetryPolicy =
RemoteCommandRetryScheduler::makeNoRetryPolicy());
virtual ~Fetcher();
/**
* Returns host where remote commands will be sent to.
*/
HostAndPort getSource() const;
/**
* Returns command object sent in first remote command.
*/
BSONObj getCommandObject() const;
/**
* Returns metadata object sent in remote commands.
*/
BSONObj getMetadataObject() const;
/**
* Returns timeout for remote commands to complete.
*/
Milliseconds getTimeout() const;
/**
* Returns diagnostic information.
*/
std::string getDiagnosticString() const;
/**
* Returns true if a remote command has been scheduled (but not completed)
* with the executor.
*/
bool isActive() const;
/**
* Schedules 'cmdObj' to be run on the remote server.
*/
Status schedule();
/**
* Cancels remote command request.
* Returns immediately if fetcher is not active.
*/
void cancel();
/**
* Waits for remote command requests to complete.
* Returns immediately if fetcher is not active.
*/
void wait();
private:
/**
* Schedules getMore command to be run by the executor
*/
Status _scheduleGetMore(const BSONObj& cmdObj);
/**
* Callback for remote command.
*/
void _callback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcbd,
const char* batchFieldName);
/**
* Sets fetcher state to inactive and notifies waiters.
*/
void _finishCallback();
/**
* Sends a kill cursor for the specified id and collection (namespace)
*
* Note: Errors are ignored and no retry is done
*/
void _sendKillCursors(const CursorId id, const NamespaceString& nss);
// Not owned by us.
executor::TaskExecutor* _executor;
HostAndPort _source;
std::string _dbname;
BSONObj _cmdObj;
BSONObj _metadata;
CallbackFn _work;
// Protects member data of this Fetcher.
mutable stdx::mutex _mutex;
mutable stdx::condition_variable _condition;
// _active is true when Fetcher is scheduled to be run by the executor.
bool _active = false;
// _first is true for first query response and false for subsequent responses.
// Using boolean instead of a counter to avoid issues with wrap around.
bool _first = true;
// Callback handle to the scheduled getMore command.
executor::TaskExecutor::CallbackHandle _getMoreCallbackHandle;
// Socket timeout
Milliseconds _timeout;
// First remote command scheduler.
RemoteCommandRetryScheduler _firstRemoteCommandScheduler;
};
} // namespace mongo