/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include
#include
#include "mongo/base/disallow_copying.h"
#include "mongo/client/dbclientinterface.h"
#include "mongo/stdx/functional.h"
namespace mongo {
//
// NOTE TO DEVS
// This is probably not what we want long-term - think very carefully before letting any of the
// functionality below escape this file.
//
class HostThreadPools;
class HostThreadPool;
/**
* A MultiHostQueryOp manages a query operation across multiple hosts. Supports returning
* immediately when any host has results or when all hosts have (connectivity) errors.
*
* The QueryOp itself dispatches work to the thread pool, and does not wait for all work to be
* complete before destruction. This class is not intended to be used by multiple clients at
* once without external synchronization (for now).
*
* Cannot be reused once all query results and errors have been returned.
*/
class MultiHostQueryOp {
MONGO_DISALLOW_COPYING(MultiHostQueryOp);
public:
/**
* Network and time services interface
*/
class SystemEnv;
/**
* Constructs a MultiHostQueryOp. Allows running a query across multiple hosts with a
* blocking interface. The lifetime of this class can be shorter than the lifetime of the
* queries sent via queryAny, freeing up the caller to do further work when any host is fast
* to respond.
*
* The systemEnv and hostThreads must remain in scope while the query op remains in scope.
*
* NOTE: SystemEnv* MUST remain valid for as long as hostThreads remains valid, since this
* operation may schedule background queries but fall out of scope while one of those
* background queries is still in-progress.
*/
MultiHostQueryOp(SystemEnv* systemEnv, HostThreadPools* hostThreads);
~MultiHostQueryOp();
/**
* Blocks for a query to be run on any of the hosts, and returns the fastest result as soon
* as it becomes available. This function may only be executed once.
*
* If one or more hosts have an error sending/recv'ing the query, the error or composite
* error is returned if no other hosts are responsive after the timeout period. Note that
* this does not apply to errors successfully returned from remote hosts - this is a
* successful query with an error.
*
* Caller owns the returned result if OK.
*/
StatusWith queryAny(const std::vector& hosts,
const QuerySpec& query,
int timeoutMillis);
//
// Below is exposed for testing *only*
//
/**
* Schedules the query work on each of the hosts using the thread pool, with a timeout
* indicating how long the work is useful for. Can be called only once.
*/
void scheduleQuery(const std::vector& hosts,
const QuerySpec& query,
Date_t timeoutAtMillis);
/**
* Blocks and waits for the next successful query result or any errors once the timeout is
* reached.
* Can be called multiple times until results from all hosts are returned or !OK.
*/
StatusWith waitForNextResult(Date_t timeoutAtMillis);
private:
/**
* Data required to execute a query operation by a callback on an arbitrary thread.
* Information from the dispatching parent op may not be available if the parent is no
* longer in scope.
*/
struct PendingQueryContext {
PendingQueryContext(const ConnectionString& host,
const QuerySpec& query,
const Date_t timeoutAtMillis,
MultiHostQueryOp* parentOp);
void doBlockingQuery();
const ConnectionString host;
const QuerySpec query;
const Date_t timeoutAtMillis;
// Must be held to access the parent pointer below
boost::mutex parentMutex;
// Set and unset by the parent operation on scheduling and destruction
MultiHostQueryOp* parentOp;
};
/**
* Called by a scheduled query (generally on a different thread from the waiting client)
* when a result is ready from a particular host.
*/
void noteResult(const ConnectionString& host, StatusWith result);
/**
* Helper to check if any result is ready and extract that result
* Synchronized by _resultsMutex
*/
bool releaseResult_inlock(StatusWith* nextResult);
/**
* Helper to return an error status from zero or more results
* Synchronized by _resultsMutex
*/
Status combineErrorResults_inlock();
// Not owned here
SystemEnv* _systemEnv;
// Not owned here
HostThreadPools* _hostThreads;
// Outstanding requests
typedef std::map > PendingMap;
PendingMap _pending;
// Synchronizes below
boost::mutex _resultsMutex;
// Current results recv'd
typedef std::map > ResultMap;
ResultMap _results;
boost::condition_variable _nextResultCV;
};
/**
* Provides network and time services to allow unit testing of MultiHostQueryOp.
*/
class MultiHostQueryOp::SystemEnv {
public:
virtual ~SystemEnv() {
}
/**
* Returns the current time in milliseconds
*/
virtual Date_t currentTimeMillis() = 0;
/**
* Executes a query against a given host. No timeout hint is given, but the query should
* not block forever.
* Note that no guarantees are given as to the state of the connection used after this
* returns, so the cursor must be self-contained.
*
* Caller owns any resulting cursor.
*/
virtual StatusWith doBlockingQuery(const ConnectionString& host,
const QuerySpec& query) = 0;
};
/**
* Object which encapsulates a thread pool per host, and allows scheduling operations against
* each of these hosts.
*
* Optionally supports not waiting for blocked threads before destruction.
*
* Thin wrapper for multiple hosts around HostThreadPool.
*/
class HostThreadPools {
MONGO_DISALLOW_COPYING(HostThreadPools);
public:
typedef stdx::function Callback;
/**
* Construct a HostThreadPools object, which lazily constructs thread pools per-host of the
* specified size.
*
* @param scopeAllWork true if the pool should wait for all work to be finished before
* going out of scope
*/
HostThreadPools(int poolSize, bool scopeAllWork);
~HostThreadPools();
/**
* Schedules some work in the form of a callback for the pool of a particular host.
*/
void schedule(const ConnectionString& host, Callback callback);
/**
* Blocks until pool is idle for a particular host.
* For testing.
*/
void waitUntilIdle(const ConnectionString& host);
private:
const int _poolSize;
const bool _scopeAllWork;
boost::mutex _mutex;
typedef std::map HostPoolMap;
HostPoolMap _pools;
};
/**
* EXPOSED FOR TESTING ONLY.
*
* Thread pool allowing work to be scheduled against various hosts.
* Generic interface, but should not be used outside of this class.
*/
class HostThreadPool {
public:
typedef stdx::function Callback;
/**
* Constructs a thread pool of a given size.
*
* Parameter scopeAllWork indicates whether the pool should wait for all work to be finished
* before going out of scope.
*/
HostThreadPool(int poolSize, bool scopeAllWork);
~HostThreadPool();
/**
* Schedules some work in the form of a callback to be done ASAP.
*/
void schedule(Callback callback);
/**
* Blocks until all threads are idle.
*/
void waitUntilIdle();
private:
/**
* Synchronized work and activity information shared between the pool and the individual
* worker threads.
* This information must be shared, since if !scopeAllWork the parent pool is allowed to
* fall out of scope before the child thread completes.
*/
struct PoolContext {
PoolContext() :
numActiveWorkers(0), isPoolActive(true) {
}
// Synchronizes below
boost::mutex mutex;
// The scheduled work
std::deque scheduled;
boost::condition_variable workScheduledCV;
// How many workers are currently active
int numActiveWorkers;
boost::condition_variable isIdleCV;
// Whether the pool has been disposed of
bool isPoolActive;
};
/**
* Worker loop run by each thread.
*/
static void doWork(std::shared_ptr context);
const bool _scopeAllWork;
// For now, only modified in the constructor and destructor, but non-const
std::vector _threads;
// Shared work and worker activity information
std::shared_ptr _context;
};
}