diff options
author | Greg Studer <greg@10gen.com> | 2014-04-28 16:53:10 -0400 |
---|---|---|
committer | Greg Studer <greg@10gen.com> | 2014-05-13 11:07:46 -0400 |
commit | f8f57002f72e38d8595674937cd11df42b4ecba7 (patch) | |
tree | 55c9d2b67d9417b4e1b3cc7f545546831d57c077 | |
parent | 441b3c183c76399f248205989dec757708601394 (diff) | |
download | mongo-f8f57002f72e38d8595674937cd11df42b4ecba7.tar.gz |
SERVER-11332 multi host query from fastest host using thread pools
-rw-r--r-- | src/mongo/SConscript | 8 | ||||
-rw-r--r-- | src/mongo/base/error_codes.err | 3 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 47 | ||||
-rw-r--r-- | src/mongo/s/multi_host_query.cpp | 405 | ||||
-rw-r--r-- | src/mongo/s/multi_host_query.h | 330 | ||||
-rw-r--r-- | src/mongo/s/multi_host_query_test.cpp | 741 |
6 files changed, 1518 insertions, 16 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index a113aa522be..4564cf07316 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -713,7 +713,7 @@ env.Library('coreshard', ['s/distlock.cpp', 's/shard.cpp', 's/shardkey.cpp'], LIBDEPS=['s/base', - 's/cluster_op_impl']); + 's/cluster_ops_impl']); mongosLibraryFiles = [ "s/interrupt_status_mongos.cpp", @@ -742,7 +742,7 @@ env.Library( "mongoscore", LIBDEPS=['db/auth/authmongos', 'db/fts/ftsmongos', 'db/query/lite_parsed_query', - 's/cluster_write_ops', + 's/cluster_ops', 's/cluster_write_op_conversion', 's/upgrade', ] ) @@ -1057,8 +1057,8 @@ test = testEnv.Install( "testframework", "gridfs", "s/upgrade", - "s/cluster_write_ops", - "s/cluster_op_impl", + "s/cluster_ops", + "s/cluster_ops_impl", "mocklib", "db/exec/mock_stage", "$BUILD_DIR/mongo/db/auth/authmocks", diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index a30390bdca0..6d025a9ccfc 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -88,6 +88,7 @@ error_code("IndexOptionsConflict", 85 ) error_code("IndexKeySpecsConflict", 86 ) error_code("CannotSplit", 87) error_code("SplitFailed", 88) +error_code("NetworkTimeout", 89) # Non-sequential error codes (for compatibility only) error_code("NotMaster", 10107) #this comes from assert_util.h @@ -99,7 +100,7 @@ error_code("KeyTooLong", 17280); error_code("BackgroundOperationInProgressForDatabase", 12586); error_code("BackgroundOperationInProgressForNamespace", 12587); -error_class("NetworkError", ["HostUnreachable", "HostNotFound"]) +error_class("NetworkError", ["HostUnreachable", "HostNotFound", "NetworkTimeout"]) error_class("Interruption", ["Interrupted", "InterruptedAtShutdown", "ExceededTimeLimit"]) error_class("IndexCreationError", ["CannotCreateIndex", "IndexOptionsConflict", "IndexKeySpecsConflict", "IndexAlreadyExists"]) diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 2c15f24a1e2..09d30c9169a 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -207,18 +207,25 @@ env.CppUnitTest( ] ) -# Cluster write op state and execution +# +# State and execution of operations across multiple hosts +# +# This functionality is self-contained and independent of any network or system-level +# code. +# env.Library( - target='cluster_write_ops', + target='cluster_ops', source=[ 'write_ops/write_op.cpp', 'write_ops/batch_write_op.cpp', 'write_ops/batch_write_exec.cpp', 'write_ops/config_coordinator.cpp', + 'multi_host_query.cpp' ], LIBDEPS=[ '$BUILD_DIR/mongo/bson', 'batch_write_types', + '$BUILD_DIR/mongo/synchronization' ], ) @@ -229,7 +236,7 @@ env.CppUnitTest( ], LIBDEPS=[ 'base', - 'cluster_write_ops', + 'cluster_ops', '$BUILD_DIR/mongo/db/common', ] ) @@ -241,7 +248,7 @@ env.CppUnitTest( ], LIBDEPS=[ 'base', - 'cluster_write_ops', + 'cluster_ops', '$BUILD_DIR/mongo/db/common', ] ) @@ -253,7 +260,7 @@ env.CppUnitTest( ], LIBDEPS=[ 'base', - 'cluster_write_ops', + 'cluster_ops', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/clientdriver', ], @@ -266,7 +273,20 @@ env.CppUnitTest( ], LIBDEPS=[ 'base', - 'cluster_write_ops', + 'cluster_ops', + '$BUILD_DIR/mongo/db/common', + '$BUILD_DIR/mongo/clientdriver', + ], +) + +env.CppUnitTest( + target='multi_host_query_test', + source=[ + 'multi_host_query_test.cpp', + ], + LIBDEPS=[ + 'base', + 'cluster_ops', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/clientdriver', ], @@ -281,7 +301,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/bson', - 'cluster_write_ops', + 'cluster_ops', '$BUILD_DIR/mongo/db/common', # for Message ], ) @@ -292,7 +312,7 @@ env.CppUnitTest( 'write_ops/batch_upconvert_test.cpp', ], LIBDEPS=[ - 'cluster_write_ops', + 'cluster_ops', 'cluster_write_op_conversion', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/server_options', # DbMessage needs server options @@ -305,16 +325,21 @@ env.CppUnitTest( 'write_ops/batch_downconvert_test.cpp', ], LIBDEPS=[ - 'cluster_write_ops', + 'cluster_ops', 'cluster_write_op_conversion', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/server_options', # DbMessage needs server options ] ) +# # Implementations of components to perform cluster operations in mongos +# +# This is the glue code implementing the interfaces required by cluster ops +# in particular environments. +# env.Library( - target='cluster_op_impl', + target='cluster_ops_impl', source=[ 'chunk_manager_targeter.cpp', 'cluster_write.cpp', @@ -325,7 +350,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/bson', 'batch_write_types', - 'cluster_write_ops', + 'cluster_ops', 'cluster_write_op_conversion', ], ) diff --git a/src/mongo/s/multi_host_query.cpp b/src/mongo/s/multi_host_query.cpp new file mode 100644 index 00000000000..5566177df98 --- /dev/null +++ b/src/mongo/s/multi_host_query.cpp @@ -0,0 +1,405 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/multi_host_query.h" + +#include "mongo/bson/util/builder.h" + +namespace mongo { + + using boost::shared_ptr; + typedef boost::unique_lock<boost::mutex> boost_unique_lock; + + HostThreadPool::HostThreadPool(int poolSize, bool scopeAllWork) : + _scopeAllWork(scopeAllWork), _context(new PoolContext) { + + // All threads start as active to avoid races detecting idleness on thread startup - + // the pool isn't idle until all threads have started waiting. + _context->numActiveWorkers = poolSize; + + for (int i = 0; i < poolSize; ++i) { + + // + // Each thread keeps a shared context allowing them to synchronize even if this + // dispatching pool has already been disposed. + // + + _threads.push_back(new boost::thread(boost::bind(&HostThreadPool::doWork, _context))); + } + } + + void HostThreadPool::schedule(Callback callback) { + boost_unique_lock lk(_context->mutex); + _context->scheduled.push_back(callback); + _context->workScheduledCV.notify_one(); + } + + void HostThreadPool::doWork(boost::shared_ptr<PoolContext> context) { + + while (true) { + + Callback callback; + + { + boost_unique_lock lk(context->mutex); + + --context->numActiveWorkers; + if (context->numActiveWorkers == 0) + context->isIdleCV.notify_all(); + + // Wait for work or until we're finished + while (context->isPoolActive && context->scheduled.empty()) { + context->workScheduledCV.wait(lk); + } + + // + // Either the pool is no longer active, or the queue has some work we should do + // + + if (!context->isPoolActive) + return; + + invariant( !context->scheduled.empty() ); + callback = context->scheduled.front(); + context->scheduled.pop_front(); + + ++context->numActiveWorkers; + } + + callback(); + } + } + + void HostThreadPool::waitUntilIdle() { + boost_unique_lock lk(_context->mutex); + while (_context->numActiveWorkers > 0) { + _context->isIdleCV.wait(lk); + } + } + + HostThreadPool::~HostThreadPool() { + + // Boost can throw on notify(), join(), detach() + + { + boost_unique_lock lk(_context->mutex); + _context->isPoolActive = false; + _context->scheduled.clear(); + } + + DESTRUCTOR_GUARD( _context->workScheduledCV.notify_all(); ) + + for (vector<boost::thread*>::iterator it = _threads.begin(); it != _threads.end(); ++it) { + + if (_scopeAllWork) { + DESTRUCTOR_GUARD( ( *it )->join(); ) + } + else { + DESTRUCTOR_GUARD( ( *it )->detach(); ) + } + + delete *it; + } + } + + HostThreadPools::HostThreadPools(int poolSize, bool scopeAllWork) : + _poolSize(poolSize), _scopeAllWork(scopeAllWork) { + } + + void HostThreadPools::schedule(const ConnectionString& host, + HostThreadPool::Callback callback) { + boost_unique_lock lk(_mutex); + + HostPoolMap::iterator seenIt = _pools.find(host); + if (seenIt == _pools.end()) { + seenIt = _pools.insert(make_pair(host, new HostThreadPool(_poolSize, _scopeAllWork))) + .first; + } + + seenIt->second->schedule(callback); + } + + void HostThreadPools::waitUntilIdle(const ConnectionString& host) { + + // Note that this prevents the creation of any new pools - it is only intended to be used + // for testing. + + boost_unique_lock lk(_mutex); + + HostPoolMap::iterator seenIt = _pools.find(host); + if (seenIt == _pools.end()) + return; + + seenIt->second->waitUntilIdle(); + } + + HostThreadPools::~HostThreadPools() { + + boost_unique_lock lk(_mutex); + for (HostPoolMap::iterator it = _pools.begin(); it != _pools.end(); ++it) { + delete it->second; + } + } + + MultiHostQueryOp::MultiHostQueryOp(SystemEnv* systemEnv, HostThreadPools* hostThreads) : + _systemEnv(systemEnv), _hostThreads(hostThreads) { + } + + StatusWith<DBClientCursor*> MultiHostQueryOp::queryAny(const vector<ConnectionString>& hosts, + const QuerySpec& query, + int timeoutMillis) { + + Date_t nowMillis = _systemEnv->currentTimeMillis(); + Date_t timeoutAtMillis = nowMillis + timeoutMillis; + + // Send out all queries + scheduleQuery(hosts, query, timeoutAtMillis); + + // Wait for them to come back + return waitForNextResult(timeoutAtMillis); + } + + void MultiHostQueryOp::scheduleQuery(const vector<ConnectionString>& hosts, + const QuerySpec& query, + Date_t timeoutAtMillis) { + + invariant( _pending.empty() ); + + for (vector<ConnectionString>::const_iterator it = hosts.begin(); it != hosts.end(); ++it) { + + const ConnectionString& host = *it; + + shared_ptr<PendingQueryContext> pendingOp(new PendingQueryContext(host, + query, + timeoutAtMillis, + this)); + + _pending.insert(make_pair(host, pendingOp)); + + HostThreadPool::Callback callback = + boost::bind(&MultiHostQueryOp::PendingQueryContext::doBlockingQuery, pendingOp); + + _hostThreads->schedule(host, callback); + } + } + + StatusWith<DBClientCursor*> MultiHostQueryOp::waitForNextResult(Date_t timeoutAtMillis) { + + StatusWith<DBClientCursor*> nextResult( NULL); + + boost_unique_lock lk(_resultsMutex); + while (!releaseResult_inlock(&nextResult)) { + + Date_t nowMillis = _systemEnv->currentTimeMillis(); + + if (nowMillis >= timeoutAtMillis) { + nextResult = StatusWith<DBClientCursor*>(combineErrorResults_inlock()); + break; + } + + _nextResultCV.timed_wait(lk, + boost::posix_time::milliseconds(timeoutAtMillis - nowMillis)); + } + + dassert( !nextResult.isOK() || nextResult.getValue() ); + return nextResult; + } + + void MultiHostQueryOp::noteResult(const ConnectionString& host, + StatusWith<DBClientCursor*> result) { + + boost_unique_lock lk(_resultsMutex); + dassert( _results.find( host ) == _results.end() ); + _results.insert(make_pair(host, result)); + + _nextResultCV.notify_one(); + } + + /** + * The results in the result map have four states: + * Nonexistent - query result still pending + * Status::OK w/ pointer - successful query result, not yet released to user + * Status::OK w/ NULL pointer - successful query result, user consumed the result + * Status::Not OK - error during query + * + * This function returns true and the next result to release to the user (or an error + * if there can be no successful results to release) or false to indicate the user + * should keep waiting. + */ + bool MultiHostQueryOp::releaseResult_inlock(StatusWith<DBClientCursor*>* nextResult) { + + int numErrors = 0; + int numReleased = 0; + for (ResultMap::iterator it = _results.begin(); it != _results.end(); ++it) { + + StatusWith<DBClientCursor*>& result = it->second; + + if (result.isOK() && result.getValue() != NULL) { + *nextResult = result; + it->second = StatusWith<DBClientCursor*>( NULL); + return true; + } + else if (result.isOK()) { + ++numReleased; + } + else { + ++numErrors; + } + } + + if (numErrors + numReleased == static_cast<int>(_pending.size())) { + *nextResult = StatusWith<DBClientCursor*>(combineErrorResults_inlock()); + return true; + } + + return false; + } + + /** + * Goes through the set of results and combines all non-OK results into a single Status. + * If a single error is found, just returns that error. + * If no non-OK results are found, assumes the cause is a timeout. + */ + Status MultiHostQueryOp::combineErrorResults_inlock() { + + ErrorCodes::Error code = ErrorCodes::OK; + StringBuilder errMsg; + // Whether we should include human-readable codes in the msg - we don't need them if we're + // not aggregating multiple statuses together + bool includeHRCodes = false; + + for (ResultMap::const_iterator it = _results.begin(); it != _results.end(); ++it) { + + const StatusWith<DBClientCursor*>& result = it->second; + + if (!result.isOK()) { + + if (code == ErrorCodes::OK) { + code = result.getStatus().code(); + } + else { + + if (!includeHRCodes) { + includeHRCodes = true; + // Fixup the single error message to include a code + errMsg.reset(); + errMsg.append(Status(code, errMsg.str()).toString()); + } + + code = ErrorCodes::MultipleErrorsOccurred; + errMsg.append(" :: and :: "); + } + + errMsg.append( + includeHRCodes ? result.getStatus().toString() : result.getStatus().reason()); + errMsg.append(string(", host ") + it->first.toString()); + } + } + + if (code == ErrorCodes::OK) { + return Status(ErrorCodes::NetworkTimeout, "no results were returned in time"); + } + + return Status(code, errMsg.str()); + } + + MultiHostQueryOp::PendingQueryContext::PendingQueryContext(const ConnectionString& host, + const QuerySpec& query, + const Date_t timeoutAtMillis, + MultiHostQueryOp* parentOp) : + host(host), query(query), timeoutAtMillis(timeoutAtMillis), parentOp(parentOp) { + } + + void MultiHostQueryOp::PendingQueryContext::doBlockingQuery() { + + // This *NEEDS* to be around for as long as we're doing queries - i.e. as long as the + // HostThreadPools is. + MultiHostQueryOp::SystemEnv* systemEnv; + + // Extract means of doing query from the parent op + { + boost_unique_lock lk(parentMutex); + + if (!parentOp) + return; + + systemEnv = parentOp->_systemEnv; + } + + // Make sure we're not timed out + Date_t nowMillis = systemEnv->currentTimeMillis(); + if (nowMillis >= timeoutAtMillis) + return; + + // Do query + StatusWith<DBClientCursor*> result = systemEnv->doBlockingQuery(host, query); + + // Push results back to parent op if it's still around + { + boost_unique_lock lk(parentMutex); + + if (!parentOp) + return; + + parentOp->noteResult(host, result); + } + } + + MultiHostQueryOp::~MultiHostQueryOp() { + + // + // Orphan all outstanding query contexts that haven't reported back - these will be gc'd + // once all scheduled query callbacks are finished. + // + + for (PendingMap::iterator it = _pending.begin(); it != _pending.end(); ++it) { + + shared_ptr<PendingQueryContext>& pendingContext = it->second; + + boost_unique_lock lk(pendingContext->parentMutex); + pendingContext->parentOp = NULL; + } + + // + // Nobody else should be modifying _results now - callbacks don't have access to this op, + // and other clients should know the op is going out of scope + // + + for (ResultMap::iterator it = _results.begin(); it != _results.end(); ++it) { + + StatusWith<DBClientCursor*>& result = it->second; + + if (result.isOK()) { + delete result.getValue(); + } + } + } + +} diff --git a/src/mongo/s/multi_host_query.h b/src/mongo/s/multi_host_query.h new file mode 100644 index 00000000000..702764848fd --- /dev/null +++ b/src/mongo/s/multi_host_query.h @@ -0,0 +1,330 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/function.hpp> +#include <boost/smart_ptr/shared_ptr.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/thread.hpp> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/owned_pointer_vector.h" +#include "mongo/client/dbclientinterface.h" + +namespace mongo { + + // + // NOTE TO DEVS + // This is probably not what we want long-term - think very carefully before letting any of the + // functionality below escape this file. + // + + class HostThreadPools; + class HostThreadPool; + + /** + * A MultiHostQueryOp manages a query operation across multiple hosts. Supports returning + * immediately when any host has results or when all hosts have (connectivity) errors. + * + * The QueryOp itself dispatches work to the thread pool, and does not wait for all work to be + * complete before destruction. This class is not intended to be used by multiple clients at + * once without external synchronization (for now). + * + * Cannot be reused once all query results and errors have been returned. + */ + class MultiHostQueryOp { + MONGO_DISALLOW_COPYING(MultiHostQueryOp); + public: + + /** + * Network and time services interface + */ + class SystemEnv; + + /** + * Constructs a MultiHostQueryOp. Allows running a query across multiple hosts with a + * blocking interface. The lifetime of this class can be shorter than the lifetime of the + * queries sent via queryAny, freeing up the caller to do further work when any host is fast + * to respond. + * + * The systemEnv and hostThreads must remain in scope while the query op remains in scope. + * + * NOTE: SystemEnv* MUST remain valid for as long as hostThreads remains valid, since this + * operation may schedule background queries but fall out of scope while one of those + * background queries is still in-progress. + */ + MultiHostQueryOp(SystemEnv* systemEnv, HostThreadPools* hostThreads); + + ~MultiHostQueryOp(); + + /** + * Blocks for a query to be run on any of the hosts, and returns the fastest result as soon + * as it becomes available. This function may only be executed once. + * + * If one or more hosts have an error sending/recv'ing the query, the error or composite + * error is returned if no other hosts are responsive after the timeout period. Note that + * this does not apply to errors successfully returned from remote hosts - this is a + * successful query with an error. + * + * Caller owns the returned result if OK. + */ + StatusWith<DBClientCursor*> queryAny(const std::vector<ConnectionString>& hosts, + const QuerySpec& query, + int timeoutMillis); + + // + // Below is exposed for testing *only* + // + + /** + * Schedules the query work on each of the hosts using the thread pool, with a timeout + * indicating how long the work is useful for. Can be called only once. + */ + void scheduleQuery(const std::vector<ConnectionString>& hosts, + const QuerySpec& query, + Date_t timeoutAtMillis); + + /** + * Blocks and waits for the next successful query result or any errors once the timeout is + * reached. + * Can be called multiple times until results from all hosts are returned or !OK. + */ + StatusWith<DBClientCursor*> waitForNextResult(Date_t timeoutAtMillis); + + private: + + /** + * Data required to execute a query operation by a callback on an arbitrary thread. + * Information from the dispatching parent op may not be available if the parent is no + * longer in scope. + */ + struct PendingQueryContext { + + PendingQueryContext(const ConnectionString& host, + const QuerySpec& query, + const Date_t timeoutAtMillis, + MultiHostQueryOp* parentOp); + + void doBlockingQuery(); + + const ConnectionString host; + const QuerySpec query; + const Date_t timeoutAtMillis; + + // Must be held to access the parent pointer below + boost::mutex parentMutex; + // Set and unset by the parent operation on scheduling and destruction + MultiHostQueryOp* parentOp; + }; + + /** + * Called by a scheduled query (generally on a different thread from the waiting client) + * when a result is ready from a particular host. + */ + void noteResult(const ConnectionString& host, StatusWith<DBClientCursor*> result); + + /** + * Helper to check if any result is ready and extract that result + * Synchronized by _resultsMutex + */ + bool releaseResult_inlock(StatusWith<DBClientCursor*>* nextResult); + + /** + * Helper to return an error status from zero or more results + * Synchronized by _resultsMutex + */ + Status combineErrorResults_inlock(); + + // Not owned here + SystemEnv* _systemEnv; + + // Not owned here + HostThreadPools* _hostThreads; + + // Outstanding requests + typedef std::map<ConnectionString, boost::shared_ptr<PendingQueryContext> > PendingMap; + PendingMap _pending; + + // Synchronizes below + boost::mutex _resultsMutex; + + // Current results recv'd + typedef std::map<ConnectionString, StatusWith<DBClientCursor*> > ResultMap; + ResultMap _results; + + boost::condition_variable _nextResultCV; + }; + + /** + * Provides network and time services to allow unit testing of MultiHostQueryOp. + */ + class MultiHostQueryOp::SystemEnv { + public: + + virtual ~SystemEnv() { + } + + /** + * Returns the current time in milliseconds + */ + virtual Date_t currentTimeMillis() = 0; + + /** + * Executes a query against a given host. No timeout hint is given, but the query should + * not block forever. + * Note that no guarantees are given as to the state of the connection used after this + * returns, so the cursor must be self-contained. + * + * Caller owns any resulting cursor. + */ + virtual StatusWith<DBClientCursor*> doBlockingQuery(const ConnectionString& host, + const QuerySpec& query) = 0; + }; + + /** + * Object which encapsulates a thread pool per host, and allows scheduling operations against + * each of these hosts. + * + * Optionally supports not waiting for blocked threads before destruction. + * + * Thin wrapper for multiple hosts around HostThreadPool. + */ + class HostThreadPools { + MONGO_DISALLOW_COPYING(HostThreadPools); + public: + + typedef boost::function<void(void)> Callback; + + /** + * Construct a HostThreadPools object, which lazily constructs thread pools per-host of the + * specified size. + * + * @param scopeAllWork true if the pool should wait for all work to be finished before + * going out of scope + */ + HostThreadPools(int poolSize, bool scopeAllWork); + ~HostThreadPools(); + + /** + * Schedules some work in the form of a callback for the pool of a particular host. + */ + void schedule(const ConnectionString& host, Callback callback); + + /** + * Blocks until pool is idle for a particular host. + * For testing. + */ + void waitUntilIdle(const ConnectionString& host); + + private: + + const int _poolSize; + const bool _scopeAllWork; + + boost::mutex _mutex; + typedef std::map<ConnectionString, HostThreadPool*> HostPoolMap; + HostPoolMap _pools; + }; + + /** + * EXPOSED FOR TESTING ONLY. + * + * Thread pool allowing work to be scheduled against various hosts. + * Generic interface, but should not be used outside of this class. + */ + class HostThreadPool { + public: + + typedef boost::function<void(void)> Callback; + + /** + * Constructs a thread pool of a given size. + * + * Parameter scopeAllWork indicates whether the pool should wait for all work to be finished + * before going out of scope. + */ + HostThreadPool(int poolSize, bool scopeAllWork); + + ~HostThreadPool(); + + /** + * Schedules some work in the form of a callback to be done ASAP. + */ + void schedule(Callback callback); + + /** + * Blocks until all threads are idle. + */ + void waitUntilIdle(); + + private: + + /** + * Synchronized work and activity information shared between the pool and the individual + * worker threads. + * This information must be shared, since if !scopeAllWork the parent pool is allowed to + * fall out of scope before the child thread completes. + */ + struct PoolContext { + + PoolContext() : + numActiveWorkers(0), isPoolActive(true) { + } + + // Synchronizes below + boost::mutex mutex; + + // The scheduled work + std::deque<Callback> scheduled; + boost::condition_variable workScheduledCV; + + // How many workers are currently active + int numActiveWorkers; + boost::condition_variable isIdleCV; + + // Whether the pool has been disposed of + bool isPoolActive; + }; + + /** + * Worker loop run by each thread. + */ + static void doWork(boost::shared_ptr<PoolContext> context); + + const bool _scopeAllWork; + + // For now, only modified in the constructor and destructor, but non-const + std::vector<boost::thread*> _threads; + + // Shared work and worker activity information + boost::shared_ptr<PoolContext> _context; + }; +} diff --git a/src/mongo/s/multi_host_query_test.cpp b/src/mongo/s/multi_host_query_test.cpp new file mode 100644 index 00000000000..ed03cc6222d --- /dev/null +++ b/src/mongo/s/multi_host_query_test.cpp @@ -0,0 +1,741 @@ +/** + * Copyright (C) 2013 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/s/multi_host_query.h" + +#include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/synchronization.h" + +namespace { + + using namespace mongo; + using boost::shared_ptr; + using std::map; + + class CallbackCheck { + public: + + enum LinkMode { + None, Notify_Other, Wait_For_Other + }; + + CallbackCheck() : + _status(ErrorCodes::OperationIncomplete, ""), _linkMode(None) { + } + + void blockUntil(CallbackCheck* other) { + + _otherNotification.reset(new Notification); + _linkMode = Wait_For_Other; + + other->_otherNotification = _otherNotification; + other->_linkMode = Notify_Other; + } + + HostThreadPool::Callback getCallback() { + return boost::bind(&CallbackCheck::noteCallback, this); + } + + HostThreadPool::Callback getHostCallback(const ConnectionString& host) { + return boost::bind(&CallbackCheck::noteHostCallback, this, host); + } + + void noteHostCallback(const ConnectionString& host) { + _host = host; + noteCallback(); + } + + void noteCallback() { + + _status = Status::OK(); + _notification.notifyOne(); + + if (_linkMode == Wait_For_Other) + _otherNotification->waitToBeNotified(); + else if (_linkMode == Notify_Other) { + _otherNotification->notifyOne(); + } + } + + void waitForCallback() { + _notification.waitToBeNotified(); + } + + Status getStatus() { + return _status; + } + + const ConnectionString& getHost() { + return _host; + } + + private: + + Status _status; + Notification _notification; + + ConnectionString _host; + + shared_ptr<Notification> _otherNotification; + LinkMode _linkMode; + }; + + TEST(HostThreadPool, Schedule) { + + CallbackCheck cbCheck; + + // NOTE: pool must be initialized *after* the cbCheck that it executes - this avoids a + // subtle race where the cbCheck structure is disposed before the callback is complete. + HostThreadPool threadPool(1, true); + + threadPool.schedule(cbCheck.getCallback()); + + cbCheck.waitForCallback(); + ASSERT_OK(cbCheck.getStatus()); + } + + TEST(HostThreadPool, ScheduleTwoSerial) { + + CallbackCheck cbCheckA; + CallbackCheck cbCheckB; + + // NOTE: pool must be initialized *after* the cbCheck that it executes + HostThreadPool threadPool(1, true); + + threadPool.schedule(cbCheckA.getCallback()); + threadPool.schedule(cbCheckB.getCallback()); + + cbCheckB.waitForCallback(); + cbCheckA.waitForCallback(); + + ASSERT_OK(cbCheckA.getStatus()); + ASSERT_OK(cbCheckB.getStatus()); + } + + TEST(HostThreadPool, ScheduleTwoParallel) { + + CallbackCheck cbCheckA; + CallbackCheck cbCheckB; + + // NOTE: pool must be initialized *after* the cbCheck that it executes + HostThreadPool threadPool(2, true); + + // Don't allow cbCheckA's callback to finish until cbCheckB's callback is processed + cbCheckA.blockUntil(&cbCheckB); + + threadPool.schedule(cbCheckA.getCallback()); + cbCheckA.waitForCallback(); + ASSERT_OK(cbCheckA.getStatus()); + // We're still blocking the thread processing cbCheckA's callback + + threadPool.schedule(cbCheckB.getCallback()); + cbCheckB.waitForCallback(); + ASSERT_OK(cbCheckB.getStatus()); + } + + TEST(HostThreadPool, ScheduleTwoHosts) { + + CallbackCheck cbCheckA; + CallbackCheck cbCheckB; + + // NOTE: pool must be initialized *after* the cbCheck that it executes + HostThreadPools threadPool(1, true); + + // Don't allow cbCheckA's callback to finish until cbCheckB's callback is processed. + // This means a single thread pool with a single thread would hang. + cbCheckA.blockUntil(&cbCheckB); + + ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000")); + ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000")); + + threadPool.schedule(hostA, cbCheckA.getHostCallback(hostA)); + cbCheckA.waitForCallback(); + ASSERT_OK(cbCheckA.getStatus()); + ASSERT_EQUALS(cbCheckA.getHost().toString(), hostA.toString()); + // We're still blocking the thread processing cbCheckA's callback + + threadPool.schedule(hostB, cbCheckB.getHostCallback(hostB)); + cbCheckB.waitForCallback(); + ASSERT_OK(cbCheckB.getStatus()); + ASSERT_EQUALS(cbCheckB.getHost().toString(), hostB.toString()); + } + + class MockSystemEnv : public MultiHostQueryOp::SystemEnv { + private: + + struct MockHostInfo; + typedef map<ConnectionString, MockHostInfo*> HostInfoMap; + + public: + + MockSystemEnv(HostThreadPools* threadPool) : + _threadPool(threadPool), _mockTimeMillis(0) { + } + + virtual ~MockSystemEnv() { + for (HostInfoMap::iterator it = _mockHostInfo.begin(); it != _mockHostInfo.end(); + ++it) { + if (_threadPool) + _threadPool->waitUntilIdle(it->first); + delete it->second; + } + } + + void setHostThreadPools(HostThreadPools* threadPool) { + _threadPool = threadPool; + } + + void addMockHostResultAt(const ConnectionString& host, int timeMillis) { + newMockHostResultAt(host, timeMillis, Status::OK(), NULL); + } + + void addMockHostErrorAt(const ConnectionString& host, int timeMillis, Status error) { + newMockHostResultAt(host, timeMillis, error, NULL); + } + + void addMockHungHostAt(const ConnectionString& host, + int hangTimeMillis, + Notification* hangUntilNotify) { + newMockHostResultAt(host, hangTimeMillis, Status::OK(), hangUntilNotify); + } + + Date_t currentTimeMillis() { + return _mockTimeMillis; + } + + StatusWith<DBClientCursor*> doBlockingQuery(const ConnectionString& host, + const QuerySpec& query) { + + ASSERT(_mockHostInfo.find(host) != _mockHostInfo.end()); + + MockHostInfo& info = *(_mockHostInfo.find(host)->second); + + if (info.prevHostActiveNotify) { + info.prevHostActiveNotify->waitToBeNotified(); + if (info.waitForPrevHostIdle) { + _threadPool->waitUntilIdle(info.prevHost); + } + } + + _mockTimeMillis = info.queryTimeMillis; + + if (info.nextHostActiveNotify) { + info.nextHostActiveNotify->notifyOne(); + } + + if (info.hangUntilNotify) { + info.hangUntilNotify->waitToBeNotified(); + return StatusWith<DBClientCursor*>(ErrorCodes::InternalError, ""); + } + + if (!info.error.isOK()) { + return StatusWith<DBClientCursor*>(info.error); + } + + // + // Successful mock query + // + + if (!info.conn) { + info.conn.reset(new DBClientConnection(false)); + // Need to do a connect failure so that we get an empty MessagingPort on the conn and + // the host name is set. + string errMsg; + ASSERT(!info.conn->connect(host.toString(), errMsg)); + } + + return StatusWith<DBClientCursor*>(new DBClientCursor(info.conn.get(), + query.ns(), + query.query(), + query.ntoreturn(), + query.ntoskip(), + query.fieldsPtr(), + query.options(), + 0 /* batchSize */)); + } + + private: + + MockHostInfo* newMockHostResultAt(const ConnectionString& host, + int timeMillis, + const Status& error, + Notification* hangUntilNotify) { + + ASSERT(_mockHostInfo.find(host) == _mockHostInfo.end()); + + MockHostInfo* info = new MockHostInfo(timeMillis); + _mockHostInfo.insert(make_pair(host, info)); + info->error = error; + info->hangUntilNotify = hangUntilNotify; + + linkMockTimes(host, info); + return info; + } + + void linkMockTimes(const ConnectionString& host, MockHostInfo* info) { + + // + // This just basically sets up notifications between the processing of results such that + // the results are returned in the order defined by the _mockQueryTimes map. + // + // Idea is (second host result) waits for (first host result) thread to start and end, + // (third host result) waits for (second host result) thread to start and end, + // (fourth host result) waits for (third host result) thread to start and end, + // ... and so on ... + // + + ASSERT(_mockQueryTimes.find(info->queryTimeMillis) == _mockQueryTimes.end()); + + HostQueryTimes::iterator prev = _mockQueryTimes.insert(make_pair(info->queryTimeMillis, + host)).first; + + if (prev != _mockQueryTimes.begin()) + --prev; + else + prev = _mockQueryTimes.end(); + + HostQueryTimes::iterator next = _mockQueryTimes.upper_bound(info->queryTimeMillis); + + if (prev != _mockQueryTimes.end()) { + + const ConnectionString& prevHost = prev->second; + MockHostInfo* prevInfo = _mockHostInfo.find(prevHost)->second; + + linkToNext(prevHost, prevInfo, info); + } + + if (next != _mockQueryTimes.end()) { + + const ConnectionString& nextHost = next->second; + MockHostInfo* nextInfo = _mockHostInfo.find(nextHost)->second; + + linkToNext(host, info, nextInfo); + } + } + + void linkToNext(const ConnectionString& host, MockHostInfo* info, MockHostInfo* nextInfo) { + + nextInfo->prevHost = host; + + nextInfo->prevHostActiveNotify.reset(new Notification()); + info->nextHostActiveNotify = nextInfo->prevHostActiveNotify.get(); + + nextInfo->waitForPrevHostIdle = info->hangUntilNotify == NULL; + } + + // Not owned here, needed to allow ordering of mock queries + HostThreadPools* _threadPool; + + int _mockTimeMillis; + + typedef map<int, ConnectionString> HostQueryTimes; + HostQueryTimes _mockQueryTimes; + + struct MockHostInfo { + + MockHostInfo(int queryTimeMillis) : + nextHostActiveNotify( NULL), + waitForPrevHostIdle(false), + queryTimeMillis(queryTimeMillis), + hangUntilNotify( NULL), + error(Status::OK()) { + } + + Notification* nextHostActiveNotify; + + ConnectionString prevHost; + scoped_ptr<Notification> prevHostActiveNotify; + bool waitForPrevHostIdle; + + int queryTimeMillis; + + scoped_ptr<DBClientConnection> conn; + Notification* hangUntilNotify; + Status error; + }; + + HostInfoMap _mockHostInfo; + + }; + + QuerySpec buildSpec(const StringData& ns, const BSONObj& query) { + return QuerySpec(ns.toString(), query, BSONObj(), 0, 0, 0); + } + + // + // Tests for the MultiHostQueryOp + // + + TEST(MultiHostQueryOp, SingleHost) { + + HostThreadPools threadPool(1, true); + MockSystemEnv mockSystem(&threadPool); + + ConnectionString host = ConnectionString::mock(HostAndPort("$host:1000")); + vector<ConnectionString> hosts; + hosts.push_back(host); + + mockSystem.addMockHostResultAt(host, 1000); + + MultiHostQueryOp queryOp(&mockSystem, &threadPool); + + QuerySpec query; + StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 2000); + + ASSERT_OK(result.getStatus()); + ASSERT(NULL != result.getValue()); + ASSERT_EQUALS(result.getValue()->originalHost(), host.toString()); + delete result.getValue(); + } + + TEST(MultiHostQueryOp, SingleHostError) { + + HostThreadPools threadPool(1, true); + MockSystemEnv mockSystem(&threadPool); + + ConnectionString host = ConnectionString::mock(HostAndPort("$host:1000")); + vector<ConnectionString> hosts; + hosts.push_back(host); + + Status hostError = Status(ErrorCodes::InternalError, ""); + mockSystem.addMockHostErrorAt(host, 1000, hostError); + + MultiHostQueryOp queryOp(&mockSystem, &threadPool); + + QuerySpec query; + StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 2000); + + ASSERT_EQUALS(result.getStatus().code(), hostError.code()); + } + + TEST(MultiHostQueryOp, SingleHostHang) { + + // Initialize notifier before the thread pool, otherwise we may dispose while threads are + // active + Notification unhangNotify; + + HostThreadPools threadPool(1, true); + MockSystemEnv mockSystem(&threadPool); + + ConnectionString host = ConnectionString::mock(HostAndPort("$host:1000")); + vector<ConnectionString> hosts; + hosts.push_back(host); + + mockSystem.addMockHungHostAt(host, 4000, &unhangNotify); + + MultiHostQueryOp queryOp(&mockSystem, &threadPool); + + QuerySpec query; + StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 2000); + // Unhang before checking status, in case it throws + unhangNotify.notifyOne(); + + ASSERT_EQUALS(result.getStatus().code(), ErrorCodes::NetworkTimeout); + } + + TEST(MultiHostQueryOp, TwoHostResponses) { + + HostThreadPools threadPool(1, true); + MockSystemEnv mockSystem(&threadPool); + + ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000")); + ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000")); + vector<ConnectionString> hosts; + hosts.push_back(hostA); + hosts.push_back(hostB); + + // Make sure we return the first response, from hostB at time 1000 + mockSystem.addMockHostResultAt(hostA, 2000); + mockSystem.addMockHostResultAt(hostB, 1000); + + MultiHostQueryOp queryOp(&mockSystem, &threadPool); + + QuerySpec query; + StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 3000); + + ASSERT_OK(result.getStatus()); + ASSERT(NULL != result.getValue()); + ASSERT_EQUALS(result.getValue()->originalHost(), hostB.toString()); + delete result.getValue(); + } + + TEST(MultiHostQueryOp, TwoHostsOneErrorResponse) { + + HostThreadPools threadPool(1, true); + MockSystemEnv mockSystem(&threadPool); + + ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000")); + ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000")); + vector<ConnectionString> hosts; + hosts.push_back(hostA); + hosts.push_back(hostB); + + // The first response is a host error, the second is a successful result + Status hostError = Status(ErrorCodes::InternalError, ""); + mockSystem.addMockHostErrorAt(hostA, 1000, hostError); + mockSystem.addMockHostResultAt(hostB, 2000); + + MultiHostQueryOp queryOp(&mockSystem, &threadPool); + + QuerySpec query; + StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 3000); + + ASSERT_OK(result.getStatus()); + ASSERT(NULL != result.getValue()); + ASSERT_EQUALS(result.getValue()->originalHost(), hostB.toString()); + delete result.getValue(); + } + + TEST(MultiHostQueryOp, TwoHostsBothErrors) { + + HostThreadPools threadPool(1, true); + MockSystemEnv mockSystem(&threadPool); + + ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000")); + ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000")); + vector<ConnectionString> hosts; + hosts.push_back(hostA); + hosts.push_back(hostB); + + // Both responses are errors + Status hostError = Status(ErrorCodes::InternalError, ""); + mockSystem.addMockHostErrorAt(hostA, 1000, hostError); + mockSystem.addMockHostErrorAt(hostB, 2000, hostError); + + MultiHostQueryOp queryOp(&mockSystem, &threadPool); + + QuerySpec query; + StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 3000); + + ASSERT_EQUALS(result.getStatus().code(), ErrorCodes::MultipleErrorsOccurred); + } + + TEST(MultiHostQueryOp, TwoHostsOneHang) { + + // Initialize notifier before the thread pool + Notification unhangNotify; + + HostThreadPools threadPool(1, true); + MockSystemEnv mockSystem(&threadPool); + + ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000")); + ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000")); + vector<ConnectionString> hosts; + hosts.push_back(hostA); + hosts.push_back(hostB); + + // One host hangs + mockSystem.addMockHungHostAt(hostA, 1000, &unhangNotify); + mockSystem.addMockHostResultAt(hostB, 2000); + + MultiHostQueryOp queryOp(&mockSystem, &threadPool); + + QuerySpec query; + StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 3000); + // Unhang before checking status, in case it throws + unhangNotify.notifyOne(); + + ASSERT_OK(result.getStatus()); + ASSERT(NULL != result.getValue()); + ASSERT_EQUALS(result.getValue()->originalHost(), hostB.toString()); + delete result.getValue(); + } + + TEST(MultiHostQueryOp, TwoHostsOneHangOneError) { + + // Initialize notifier before the thread pool + Notification unhangNotify; + + HostThreadPools threadPool(1, true); + MockSystemEnv mockSystem(&threadPool); + + ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000")); + ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000")); + vector<ConnectionString> hosts; + hosts.push_back(hostA); + hosts.push_back(hostB); + + // One host hangs, one host has an error (at the mock timeout point so the query finishes) + Status hostError = Status(ErrorCodes::InternalError, ""); + mockSystem.addMockHungHostAt(hostA, 1000, &unhangNotify); + mockSystem.addMockHostErrorAt(hostB, 3000, hostError); + + MultiHostQueryOp queryOp(&mockSystem, &threadPool); + + QuerySpec query; + StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 3000); + // Unhang before checking status, in case it throws + unhangNotify.notifyOne(); + + ASSERT_EQUALS(result.getStatus().code(), hostError.code()); + } + + TEST(MultiHostQueryOp, ThreeHostsOneHang) { + + // Initialize notifier before the thread pool + Notification unhangNotify; + + HostThreadPools threadPool(1, true); + MockSystemEnv mockSystem(&threadPool); + + ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000")); + ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000")); + ConnectionString hostC = ConnectionString::mock(HostAndPort("$hostC:1000")); + vector<ConnectionString> hosts; + hosts.push_back(hostA); + hosts.push_back(hostB); + hosts.push_back(hostC); + + // One host hangs, last host is fastest with result + mockSystem.addMockHungHostAt(hostA, 1000, &unhangNotify); + mockSystem.addMockHostResultAt(hostB, 3000); + mockSystem.addMockHostResultAt(hostC, 2000); + + MultiHostQueryOp queryOp(&mockSystem, &threadPool); + + QuerySpec query; + StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 4000); + // Unhang before checking status, in case it throws + unhangNotify.notifyOne(); + + ASSERT_OK(result.getStatus()); + ASSERT(NULL != result.getValue()); + ASSERT_EQUALS(result.getValue()->originalHost(), hostC.toString()); + delete result.getValue(); + } + + TEST(MultiHostQueryOp, ThreeHostsTwoErrors) { + + // Initialize notifier before the thread pool + Notification unhangNotify; + + HostThreadPools threadPool(1, true); + MockSystemEnv mockSystem(&threadPool); + + ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000")); + ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000")); + ConnectionString hostC = ConnectionString::mock(HostAndPort("$hostC:1000")); + vector<ConnectionString> hosts; + hosts.push_back(hostA); + hosts.push_back(hostB); + hosts.push_back(hostC); + + // One host hangs, two hosts have errors (finish at mock timeout point so query ends) + Status hostError = Status(ErrorCodes::InternalError, ""); + mockSystem.addMockHungHostAt(hostA, 1000, &unhangNotify); + mockSystem.addMockHostErrorAt(hostB, 4000, hostError); + mockSystem.addMockHostErrorAt(hostC, 2000, hostError); + + MultiHostQueryOp queryOp(&mockSystem, &threadPool); + + QuerySpec query; + StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 4000); + // Unhang before checking status, in case it throws + unhangNotify.notifyOne(); + + ASSERT_EQUALS(result.getStatus().code(), ErrorCodes::MultipleErrorsOccurred); + } + + TEST(MultiHostQueryOp, ThreeHostsOneHangOneError) { + + // Initialize notifier before the thread pool + Notification unhangNotify; + + HostThreadPools threadPool(1, true); + MockSystemEnv mockSystem(&threadPool); + + ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000")); + ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000")); + ConnectionString hostC = ConnectionString::mock(HostAndPort("$hostC:1000")); + vector<ConnectionString> hosts; + hosts.push_back(hostA); + hosts.push_back(hostB); + hosts.push_back(hostC); + + // One host hangs, two hosts have errors (finish at mock timeout point so query ends) + Status hostError = Status(ErrorCodes::InternalError, ""); + mockSystem.addMockHungHostAt(hostA, 1000, &unhangNotify); + mockSystem.addMockHostErrorAt(hostB, 2000, hostError); + mockSystem.addMockHostResultAt(hostC, 3000); + + MultiHostQueryOp queryOp(&mockSystem, &threadPool); + + QuerySpec query; + StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 4000); + // Unhang before checking status, in case it throws + unhangNotify.notifyOne(); + + ASSERT_OK(result.getStatus()); + ASSERT(NULL != result.getValue()); + ASSERT_EQUALS(result.getValue()->originalHost(), hostC.toString()); + delete result.getValue(); + } + + TEST(MultiHostQueryOp, TwoHostsOneHangUnscoped) { + + // Initialize notifier before the thread pool + Notification unhangNotify; + + // Create a thread pool which detaches itself from outstanding work on cleanup + scoped_ptr<HostThreadPools> threadPool(new HostThreadPools(1, false)); + MockSystemEnv mockSystem(threadPool.get()); + + ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000")); + ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000")); + vector<ConnectionString> hosts; + hosts.push_back(hostA); + hosts.push_back(hostB); + + // One host hangs + mockSystem.addMockHungHostAt(hostA, 1000, &unhangNotify); + mockSystem.addMockHostResultAt(hostB, 2000); + + MultiHostQueryOp queryOp(&mockSystem, threadPool.get()); + + QuerySpec query; + StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 3000); + + // Clean up the thread pool + mockSystem.setHostThreadPools( NULL); + threadPool.reset(); + + // Unhang before checking status, in case it throws + unhangNotify.notifyOne(); + + ASSERT_OK(result.getStatus()); + ASSERT(NULL != result.getValue()); + ASSERT_EQUALS(result.getValue()->originalHost(), hostB.toString()); + delete result.getValue(); + + // Make sure we get the next result + result = queryOp.waitForNextResult(4000); + + ASSERT_EQUALS(result.getStatus().code(), ErrorCodes::InternalError); + } + +} // unnamed namespace |