summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Studer <greg@10gen.com>2014-04-28 16:53:10 -0400
committerGreg Studer <greg@10gen.com>2014-05-13 11:07:46 -0400
commitf8f57002f72e38d8595674937cd11df42b4ecba7 (patch)
tree55c9d2b67d9417b4e1b3cc7f545546831d57c077
parent441b3c183c76399f248205989dec757708601394 (diff)
downloadmongo-f8f57002f72e38d8595674937cd11df42b4ecba7.tar.gz
SERVER-11332 multi host query from fastest host using thread pools
-rw-r--r--src/mongo/SConscript8
-rw-r--r--src/mongo/base/error_codes.err3
-rw-r--r--src/mongo/s/SConscript47
-rw-r--r--src/mongo/s/multi_host_query.cpp405
-rw-r--r--src/mongo/s/multi_host_query.h330
-rw-r--r--src/mongo/s/multi_host_query_test.cpp741
6 files changed, 1518 insertions, 16 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index a113aa522be..4564cf07316 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -713,7 +713,7 @@ env.Library('coreshard', ['s/distlock.cpp',
's/shard.cpp',
's/shardkey.cpp'],
LIBDEPS=['s/base',
- 's/cluster_op_impl']);
+ 's/cluster_ops_impl']);
mongosLibraryFiles = [
"s/interrupt_status_mongos.cpp",
@@ -742,7 +742,7 @@ env.Library( "mongoscore",
LIBDEPS=['db/auth/authmongos',
'db/fts/ftsmongos',
'db/query/lite_parsed_query',
- 's/cluster_write_ops',
+ 's/cluster_ops',
's/cluster_write_op_conversion',
's/upgrade',
] )
@@ -1057,8 +1057,8 @@ test = testEnv.Install(
"testframework",
"gridfs",
"s/upgrade",
- "s/cluster_write_ops",
- "s/cluster_op_impl",
+ "s/cluster_ops",
+ "s/cluster_ops_impl",
"mocklib",
"db/exec/mock_stage",
"$BUILD_DIR/mongo/db/auth/authmocks",
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index a30390bdca0..6d025a9ccfc 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -88,6 +88,7 @@ error_code("IndexOptionsConflict", 85 )
error_code("IndexKeySpecsConflict", 86 )
error_code("CannotSplit", 87)
error_code("SplitFailed", 88)
+error_code("NetworkTimeout", 89)
# Non-sequential error codes (for compatibility only)
error_code("NotMaster", 10107) #this comes from assert_util.h
@@ -99,7 +100,7 @@ error_code("KeyTooLong", 17280);
error_code("BackgroundOperationInProgressForDatabase", 12586);
error_code("BackgroundOperationInProgressForNamespace", 12587);
-error_class("NetworkError", ["HostUnreachable", "HostNotFound"])
+error_class("NetworkError", ["HostUnreachable", "HostNotFound", "NetworkTimeout"])
error_class("Interruption", ["Interrupted", "InterruptedAtShutdown", "ExceededTimeLimit"])
error_class("IndexCreationError", ["CannotCreateIndex", "IndexOptionsConflict",
"IndexKeySpecsConflict", "IndexAlreadyExists"])
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 2c15f24a1e2..09d30c9169a 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -207,18 +207,25 @@ env.CppUnitTest(
]
)
-# Cluster write op state and execution
+#
+# State and execution of operations across multiple hosts
+#
+# This functionality is self-contained and independent of any network or system-level
+# code.
+#
env.Library(
- target='cluster_write_ops',
+ target='cluster_ops',
source=[
'write_ops/write_op.cpp',
'write_ops/batch_write_op.cpp',
'write_ops/batch_write_exec.cpp',
'write_ops/config_coordinator.cpp',
+ 'multi_host_query.cpp'
],
LIBDEPS=[
'$BUILD_DIR/mongo/bson',
'batch_write_types',
+ '$BUILD_DIR/mongo/synchronization'
],
)
@@ -229,7 +236,7 @@ env.CppUnitTest(
],
LIBDEPS=[
'base',
- 'cluster_write_ops',
+ 'cluster_ops',
'$BUILD_DIR/mongo/db/common',
]
)
@@ -241,7 +248,7 @@ env.CppUnitTest(
],
LIBDEPS=[
'base',
- 'cluster_write_ops',
+ 'cluster_ops',
'$BUILD_DIR/mongo/db/common',
]
)
@@ -253,7 +260,7 @@ env.CppUnitTest(
],
LIBDEPS=[
'base',
- 'cluster_write_ops',
+ 'cluster_ops',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/clientdriver',
],
@@ -266,7 +273,20 @@ env.CppUnitTest(
],
LIBDEPS=[
'base',
- 'cluster_write_ops',
+ 'cluster_ops',
+ '$BUILD_DIR/mongo/db/common',
+ '$BUILD_DIR/mongo/clientdriver',
+ ],
+)
+
+env.CppUnitTest(
+ target='multi_host_query_test',
+ source=[
+ 'multi_host_query_test.cpp',
+ ],
+ LIBDEPS=[
+ 'base',
+ 'cluster_ops',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/clientdriver',
],
@@ -281,7 +301,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/bson',
- 'cluster_write_ops',
+ 'cluster_ops',
'$BUILD_DIR/mongo/db/common', # for Message
],
)
@@ -292,7 +312,7 @@ env.CppUnitTest(
'write_ops/batch_upconvert_test.cpp',
],
LIBDEPS=[
- 'cluster_write_ops',
+ 'cluster_ops',
'cluster_write_op_conversion',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/server_options', # DbMessage needs server options
@@ -305,16 +325,21 @@ env.CppUnitTest(
'write_ops/batch_downconvert_test.cpp',
],
LIBDEPS=[
- 'cluster_write_ops',
+ 'cluster_ops',
'cluster_write_op_conversion',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/server_options', # DbMessage needs server options
]
)
+#
# Implementations of components to perform cluster operations in mongos
+#
+# This is the glue code implementing the interfaces required by cluster ops
+# in particular environments.
+#
env.Library(
- target='cluster_op_impl',
+ target='cluster_ops_impl',
source=[
'chunk_manager_targeter.cpp',
'cluster_write.cpp',
@@ -325,7 +350,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/bson',
'batch_write_types',
- 'cluster_write_ops',
+ 'cluster_ops',
'cluster_write_op_conversion',
],
)
diff --git a/src/mongo/s/multi_host_query.cpp b/src/mongo/s/multi_host_query.cpp
new file mode 100644
index 00000000000..5566177df98
--- /dev/null
+++ b/src/mongo/s/multi_host_query.cpp
@@ -0,0 +1,405 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/multi_host_query.h"
+
+#include "mongo/bson/util/builder.h"
+
+namespace mongo {
+
+ using boost::shared_ptr;
+ typedef boost::unique_lock<boost::mutex> boost_unique_lock;
+
+ HostThreadPool::HostThreadPool(int poolSize, bool scopeAllWork) :
+ _scopeAllWork(scopeAllWork), _context(new PoolContext) {
+
+ // All threads start as active to avoid races detecting idleness on thread startup -
+ // the pool isn't idle until all threads have started waiting.
+ _context->numActiveWorkers = poolSize;
+
+ for (int i = 0; i < poolSize; ++i) {
+
+ //
+ // Each thread keeps a shared context allowing them to synchronize even if this
+ // dispatching pool has already been disposed.
+ //
+
+ _threads.push_back(new boost::thread(boost::bind(&HostThreadPool::doWork, _context)));
+ }
+ }
+
+ void HostThreadPool::schedule(Callback callback) {
+ boost_unique_lock lk(_context->mutex);
+ _context->scheduled.push_back(callback);
+ _context->workScheduledCV.notify_one();
+ }
+
+ void HostThreadPool::doWork(boost::shared_ptr<PoolContext> context) {
+
+ while (true) {
+
+ Callback callback;
+
+ {
+ boost_unique_lock lk(context->mutex);
+
+ --context->numActiveWorkers;
+ if (context->numActiveWorkers == 0)
+ context->isIdleCV.notify_all();
+
+ // Wait for work or until we're finished
+ while (context->isPoolActive && context->scheduled.empty()) {
+ context->workScheduledCV.wait(lk);
+ }
+
+ //
+ // Either the pool is no longer active, or the queue has some work we should do
+ //
+
+ if (!context->isPoolActive)
+ return;
+
+ invariant( !context->scheduled.empty() );
+ callback = context->scheduled.front();
+ context->scheduled.pop_front();
+
+ ++context->numActiveWorkers;
+ }
+
+ callback();
+ }
+ }
+
+ void HostThreadPool::waitUntilIdle() {
+ boost_unique_lock lk(_context->mutex);
+ while (_context->numActiveWorkers > 0) {
+ _context->isIdleCV.wait(lk);
+ }
+ }
+
+ HostThreadPool::~HostThreadPool() {
+
+ // Boost can throw on notify(), join(), detach()
+
+ {
+ boost_unique_lock lk(_context->mutex);
+ _context->isPoolActive = false;
+ _context->scheduled.clear();
+ }
+
+ DESTRUCTOR_GUARD( _context->workScheduledCV.notify_all(); )
+
+ for (vector<boost::thread*>::iterator it = _threads.begin(); it != _threads.end(); ++it) {
+
+ if (_scopeAllWork) {
+ DESTRUCTOR_GUARD( ( *it )->join(); )
+ }
+ else {
+ DESTRUCTOR_GUARD( ( *it )->detach(); )
+ }
+
+ delete *it;
+ }
+ }
+
+ HostThreadPools::HostThreadPools(int poolSize, bool scopeAllWork) :
+ _poolSize(poolSize), _scopeAllWork(scopeAllWork) {
+ }
+
+ void HostThreadPools::schedule(const ConnectionString& host,
+ HostThreadPool::Callback callback) {
+ boost_unique_lock lk(_mutex);
+
+ HostPoolMap::iterator seenIt = _pools.find(host);
+ if (seenIt == _pools.end()) {
+ seenIt = _pools.insert(make_pair(host, new HostThreadPool(_poolSize, _scopeAllWork)))
+ .first;
+ }
+
+ seenIt->second->schedule(callback);
+ }
+
+ void HostThreadPools::waitUntilIdle(const ConnectionString& host) {
+
+ // Note that this prevents the creation of any new pools - it is only intended to be used
+ // for testing.
+
+ boost_unique_lock lk(_mutex);
+
+ HostPoolMap::iterator seenIt = _pools.find(host);
+ if (seenIt == _pools.end())
+ return;
+
+ seenIt->second->waitUntilIdle();
+ }
+
+ HostThreadPools::~HostThreadPools() {
+
+ boost_unique_lock lk(_mutex);
+ for (HostPoolMap::iterator it = _pools.begin(); it != _pools.end(); ++it) {
+ delete it->second;
+ }
+ }
+
+ MultiHostQueryOp::MultiHostQueryOp(SystemEnv* systemEnv, HostThreadPools* hostThreads) :
+ _systemEnv(systemEnv), _hostThreads(hostThreads) {
+ }
+
+ StatusWith<DBClientCursor*> MultiHostQueryOp::queryAny(const vector<ConnectionString>& hosts,
+ const QuerySpec& query,
+ int timeoutMillis) {
+
+ Date_t nowMillis = _systemEnv->currentTimeMillis();
+ Date_t timeoutAtMillis = nowMillis + timeoutMillis;
+
+ // Send out all queries
+ scheduleQuery(hosts, query, timeoutAtMillis);
+
+ // Wait for them to come back
+ return waitForNextResult(timeoutAtMillis);
+ }
+
+ void MultiHostQueryOp::scheduleQuery(const vector<ConnectionString>& hosts,
+ const QuerySpec& query,
+ Date_t timeoutAtMillis) {
+
+ invariant( _pending.empty() );
+
+ for (vector<ConnectionString>::const_iterator it = hosts.begin(); it != hosts.end(); ++it) {
+
+ const ConnectionString& host = *it;
+
+ shared_ptr<PendingQueryContext> pendingOp(new PendingQueryContext(host,
+ query,
+ timeoutAtMillis,
+ this));
+
+ _pending.insert(make_pair(host, pendingOp));
+
+ HostThreadPool::Callback callback =
+ boost::bind(&MultiHostQueryOp::PendingQueryContext::doBlockingQuery, pendingOp);
+
+ _hostThreads->schedule(host, callback);
+ }
+ }
+
+ StatusWith<DBClientCursor*> MultiHostQueryOp::waitForNextResult(Date_t timeoutAtMillis) {
+
+ StatusWith<DBClientCursor*> nextResult( NULL);
+
+ boost_unique_lock lk(_resultsMutex);
+ while (!releaseResult_inlock(&nextResult)) {
+
+ Date_t nowMillis = _systemEnv->currentTimeMillis();
+
+ if (nowMillis >= timeoutAtMillis) {
+ nextResult = StatusWith<DBClientCursor*>(combineErrorResults_inlock());
+ break;
+ }
+
+ _nextResultCV.timed_wait(lk,
+ boost::posix_time::milliseconds(timeoutAtMillis - nowMillis));
+ }
+
+ dassert( !nextResult.isOK() || nextResult.getValue() );
+ return nextResult;
+ }
+
+ void MultiHostQueryOp::noteResult(const ConnectionString& host,
+ StatusWith<DBClientCursor*> result) {
+
+ boost_unique_lock lk(_resultsMutex);
+ dassert( _results.find( host ) == _results.end() );
+ _results.insert(make_pair(host, result));
+
+ _nextResultCV.notify_one();
+ }
+
+ /**
+ * The results in the result map have four states:
+ * Nonexistent - query result still pending
+ * Status::OK w/ pointer - successful query result, not yet released to user
+ * Status::OK w/ NULL pointer - successful query result, user consumed the result
+ * Status::Not OK - error during query
+ *
+ * This function returns true and the next result to release to the user (or an error
+ * if there can be no successful results to release) or false to indicate the user
+ * should keep waiting.
+ */
+ bool MultiHostQueryOp::releaseResult_inlock(StatusWith<DBClientCursor*>* nextResult) {
+
+ int numErrors = 0;
+ int numReleased = 0;
+ for (ResultMap::iterator it = _results.begin(); it != _results.end(); ++it) {
+
+ StatusWith<DBClientCursor*>& result = it->second;
+
+ if (result.isOK() && result.getValue() != NULL) {
+ *nextResult = result;
+ it->second = StatusWith<DBClientCursor*>( NULL);
+ return true;
+ }
+ else if (result.isOK()) {
+ ++numReleased;
+ }
+ else {
+ ++numErrors;
+ }
+ }
+
+ if (numErrors + numReleased == static_cast<int>(_pending.size())) {
+ *nextResult = StatusWith<DBClientCursor*>(combineErrorResults_inlock());
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Goes through the set of results and combines all non-OK results into a single Status.
+ * If a single error is found, just returns that error.
+ * If no non-OK results are found, assumes the cause is a timeout.
+ */
+ Status MultiHostQueryOp::combineErrorResults_inlock() {
+
+ ErrorCodes::Error code = ErrorCodes::OK;
+ StringBuilder errMsg;
+ // Whether we should include human-readable codes in the msg - we don't need them if we're
+ // not aggregating multiple statuses together
+ bool includeHRCodes = false;
+
+ for (ResultMap::const_iterator it = _results.begin(); it != _results.end(); ++it) {
+
+ const StatusWith<DBClientCursor*>& result = it->second;
+
+ if (!result.isOK()) {
+
+ if (code == ErrorCodes::OK) {
+ code = result.getStatus().code();
+ }
+ else {
+
+ if (!includeHRCodes) {
+ includeHRCodes = true;
+ // Fixup the single error message to include a code
+ errMsg.reset();
+ errMsg.append(Status(code, errMsg.str()).toString());
+ }
+
+ code = ErrorCodes::MultipleErrorsOccurred;
+ errMsg.append(" :: and :: ");
+ }
+
+ errMsg.append(
+ includeHRCodes ? result.getStatus().toString() : result.getStatus().reason());
+ errMsg.append(string(", host ") + it->first.toString());
+ }
+ }
+
+ if (code == ErrorCodes::OK) {
+ return Status(ErrorCodes::NetworkTimeout, "no results were returned in time");
+ }
+
+ return Status(code, errMsg.str());
+ }
+
+ MultiHostQueryOp::PendingQueryContext::PendingQueryContext(const ConnectionString& host,
+ const QuerySpec& query,
+ const Date_t timeoutAtMillis,
+ MultiHostQueryOp* parentOp) :
+ host(host), query(query), timeoutAtMillis(timeoutAtMillis), parentOp(parentOp) {
+ }
+
+ void MultiHostQueryOp::PendingQueryContext::doBlockingQuery() {
+
+ // This *NEEDS* to be around for as long as we're doing queries - i.e. as long as the
+ // HostThreadPools is.
+ MultiHostQueryOp::SystemEnv* systemEnv;
+
+ // Extract means of doing query from the parent op
+ {
+ boost_unique_lock lk(parentMutex);
+
+ if (!parentOp)
+ return;
+
+ systemEnv = parentOp->_systemEnv;
+ }
+
+ // Make sure we're not timed out
+ Date_t nowMillis = systemEnv->currentTimeMillis();
+ if (nowMillis >= timeoutAtMillis)
+ return;
+
+ // Do query
+ StatusWith<DBClientCursor*> result = systemEnv->doBlockingQuery(host, query);
+
+ // Push results back to parent op if it's still around
+ {
+ boost_unique_lock lk(parentMutex);
+
+ if (!parentOp)
+ return;
+
+ parentOp->noteResult(host, result);
+ }
+ }
+
+ MultiHostQueryOp::~MultiHostQueryOp() {
+
+ //
+ // Orphan all outstanding query contexts that haven't reported back - these will be gc'd
+ // once all scheduled query callbacks are finished.
+ //
+
+ for (PendingMap::iterator it = _pending.begin(); it != _pending.end(); ++it) {
+
+ shared_ptr<PendingQueryContext>& pendingContext = it->second;
+
+ boost_unique_lock lk(pendingContext->parentMutex);
+ pendingContext->parentOp = NULL;
+ }
+
+ //
+ // Nobody else should be modifying _results now - callbacks don't have access to this op,
+ // and other clients should know the op is going out of scope
+ //
+
+ for (ResultMap::iterator it = _results.begin(); it != _results.end(); ++it) {
+
+ StatusWith<DBClientCursor*>& result = it->second;
+
+ if (result.isOK()) {
+ delete result.getValue();
+ }
+ }
+ }
+
+}
diff --git a/src/mongo/s/multi_host_query.h b/src/mongo/s/multi_host_query.h
new file mode 100644
index 00000000000..702764848fd
--- /dev/null
+++ b/src/mongo/s/multi_host_query.h
@@ -0,0 +1,330 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/function.hpp>
+#include <boost/smart_ptr/shared_ptr.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/owned_pointer_vector.h"
+#include "mongo/client/dbclientinterface.h"
+
+namespace mongo {
+
+ //
+ // NOTE TO DEVS
+ // This is probably not what we want long-term - think very carefully before letting any of the
+ // functionality below escape this file.
+ //
+
+ class HostThreadPools;
+ class HostThreadPool;
+
+ /**
+ * A MultiHostQueryOp manages a query operation across multiple hosts. Supports returning
+ * immediately when any host has results or when all hosts have (connectivity) errors.
+ *
+ * The QueryOp itself dispatches work to the thread pool, and does not wait for all work to be
+ * complete before destruction. This class is not intended to be used by multiple clients at
+ * once without external synchronization (for now).
+ *
+ * Cannot be reused once all query results and errors have been returned.
+ */
+ class MultiHostQueryOp {
+ MONGO_DISALLOW_COPYING(MultiHostQueryOp);
+ public:
+
+ /**
+ * Network and time services interface
+ */
+ class SystemEnv;
+
+ /**
+ * Constructs a MultiHostQueryOp. Allows running a query across multiple hosts with a
+ * blocking interface. The lifetime of this class can be shorter than the lifetime of the
+ * queries sent via queryAny, freeing up the caller to do further work when any host is fast
+ * to respond.
+ *
+ * The systemEnv and hostThreads must remain in scope while the query op remains in scope.
+ *
+ * NOTE: SystemEnv* MUST remain valid for as long as hostThreads remains valid, since this
+ * operation may schedule background queries but fall out of scope while one of those
+ * background queries is still in-progress.
+ */
+ MultiHostQueryOp(SystemEnv* systemEnv, HostThreadPools* hostThreads);
+
+ ~MultiHostQueryOp();
+
+ /**
+ * Blocks for a query to be run on any of the hosts, and returns the fastest result as soon
+ * as it becomes available. This function may only be executed once.
+ *
+ * If one or more hosts have an error sending/recv'ing the query, the error or composite
+ * error is returned if no other hosts are responsive after the timeout period. Note that
+ * this does not apply to errors successfully returned from remote hosts - this is a
+ * successful query with an error.
+ *
+ * Caller owns the returned result if OK.
+ */
+ StatusWith<DBClientCursor*> queryAny(const std::vector<ConnectionString>& hosts,
+ const QuerySpec& query,
+ int timeoutMillis);
+
+ //
+ // Below is exposed for testing *only*
+ //
+
+ /**
+ * Schedules the query work on each of the hosts using the thread pool, with a timeout
+ * indicating how long the work is useful for. Can be called only once.
+ */
+ void scheduleQuery(const std::vector<ConnectionString>& hosts,
+ const QuerySpec& query,
+ Date_t timeoutAtMillis);
+
+ /**
+ * Blocks and waits for the next successful query result or any errors once the timeout is
+ * reached.
+ * Can be called multiple times until results from all hosts are returned or !OK.
+ */
+ StatusWith<DBClientCursor*> waitForNextResult(Date_t timeoutAtMillis);
+
+ private:
+
+ /**
+ * Data required to execute a query operation by a callback on an arbitrary thread.
+ * Information from the dispatching parent op may not be available if the parent is no
+ * longer in scope.
+ */
+ struct PendingQueryContext {
+
+ PendingQueryContext(const ConnectionString& host,
+ const QuerySpec& query,
+ const Date_t timeoutAtMillis,
+ MultiHostQueryOp* parentOp);
+
+ void doBlockingQuery();
+
+ const ConnectionString host;
+ const QuerySpec query;
+ const Date_t timeoutAtMillis;
+
+ // Must be held to access the parent pointer below
+ boost::mutex parentMutex;
+ // Set and unset by the parent operation on scheduling and destruction
+ MultiHostQueryOp* parentOp;
+ };
+
+ /**
+ * Called by a scheduled query (generally on a different thread from the waiting client)
+ * when a result is ready from a particular host.
+ */
+ void noteResult(const ConnectionString& host, StatusWith<DBClientCursor*> result);
+
+ /**
+ * Helper to check if any result is ready and extract that result
+ * Synchronized by _resultsMutex
+ */
+ bool releaseResult_inlock(StatusWith<DBClientCursor*>* nextResult);
+
+ /**
+ * Helper to return an error status from zero or more results
+ * Synchronized by _resultsMutex
+ */
+ Status combineErrorResults_inlock();
+
+ // Not owned here
+ SystemEnv* _systemEnv;
+
+ // Not owned here
+ HostThreadPools* _hostThreads;
+
+ // Outstanding requests
+ typedef std::map<ConnectionString, boost::shared_ptr<PendingQueryContext> > PendingMap;
+ PendingMap _pending;
+
+ // Synchronizes below
+ boost::mutex _resultsMutex;
+
+ // Current results recv'd
+ typedef std::map<ConnectionString, StatusWith<DBClientCursor*> > ResultMap;
+ ResultMap _results;
+
+ boost::condition_variable _nextResultCV;
+ };
+
+ /**
+ * Provides network and time services to allow unit testing of MultiHostQueryOp.
+ */
+ class MultiHostQueryOp::SystemEnv {
+ public:
+
+ virtual ~SystemEnv() {
+ }
+
+ /**
+ * Returns the current time in milliseconds
+ */
+ virtual Date_t currentTimeMillis() = 0;
+
+ /**
+ * Executes a query against a given host. No timeout hint is given, but the query should
+ * not block forever.
+ * Note that no guarantees are given as to the state of the connection used after this
+ * returns, so the cursor must be self-contained.
+ *
+ * Caller owns any resulting cursor.
+ */
+ virtual StatusWith<DBClientCursor*> doBlockingQuery(const ConnectionString& host,
+ const QuerySpec& query) = 0;
+ };
+
+ /**
+ * Object which encapsulates a thread pool per host, and allows scheduling operations against
+ * each of these hosts.
+ *
+ * Optionally supports not waiting for blocked threads before destruction.
+ *
+ * Thin wrapper for multiple hosts around HostThreadPool.
+ */
+ class HostThreadPools {
+ MONGO_DISALLOW_COPYING(HostThreadPools);
+ public:
+
+ typedef boost::function<void(void)> Callback;
+
+ /**
+ * Construct a HostThreadPools object, which lazily constructs thread pools per-host of the
+ * specified size.
+ *
+ * @param scopeAllWork true if the pool should wait for all work to be finished before
+ * going out of scope
+ */
+ HostThreadPools(int poolSize, bool scopeAllWork);
+ ~HostThreadPools();
+
+ /**
+ * Schedules some work in the form of a callback for the pool of a particular host.
+ */
+ void schedule(const ConnectionString& host, Callback callback);
+
+ /**
+ * Blocks until pool is idle for a particular host.
+ * For testing.
+ */
+ void waitUntilIdle(const ConnectionString& host);
+
+ private:
+
+ const int _poolSize;
+ const bool _scopeAllWork;
+
+ boost::mutex _mutex;
+ typedef std::map<ConnectionString, HostThreadPool*> HostPoolMap;
+ HostPoolMap _pools;
+ };
+
+ /**
+ * EXPOSED FOR TESTING ONLY.
+ *
+ * Thread pool allowing work to be scheduled against various hosts.
+ * Generic interface, but should not be used outside of this class.
+ */
+ class HostThreadPool {
+ public:
+
+ typedef boost::function<void(void)> Callback;
+
+ /**
+ * Constructs a thread pool of a given size.
+ *
+ * Parameter scopeAllWork indicates whether the pool should wait for all work to be finished
+ * before going out of scope.
+ */
+ HostThreadPool(int poolSize, bool scopeAllWork);
+
+ ~HostThreadPool();
+
+ /**
+ * Schedules some work in the form of a callback to be done ASAP.
+ */
+ void schedule(Callback callback);
+
+ /**
+ * Blocks until all threads are idle.
+ */
+ void waitUntilIdle();
+
+ private:
+
+ /**
+ * Synchronized work and activity information shared between the pool and the individual
+ * worker threads.
+ * This information must be shared, since if !scopeAllWork the parent pool is allowed to
+ * fall out of scope before the child thread completes.
+ */
+ struct PoolContext {
+
+ PoolContext() :
+ numActiveWorkers(0), isPoolActive(true) {
+ }
+
+ // Synchronizes below
+ boost::mutex mutex;
+
+ // The scheduled work
+ std::deque<Callback> scheduled;
+ boost::condition_variable workScheduledCV;
+
+ // How many workers are currently active
+ int numActiveWorkers;
+ boost::condition_variable isIdleCV;
+
+ // Whether the pool has been disposed of
+ bool isPoolActive;
+ };
+
+ /**
+ * Worker loop run by each thread.
+ */
+ static void doWork(boost::shared_ptr<PoolContext> context);
+
+ const bool _scopeAllWork;
+
+ // For now, only modified in the constructor and destructor, but non-const
+ std::vector<boost::thread*> _threads;
+
+ // Shared work and worker activity information
+ boost::shared_ptr<PoolContext> _context;
+ };
+}
diff --git a/src/mongo/s/multi_host_query_test.cpp b/src/mongo/s/multi_host_query_test.cpp
new file mode 100644
index 00000000000..ed03cc6222d
--- /dev/null
+++ b/src/mongo/s/multi_host_query_test.cpp
@@ -0,0 +1,741 @@
+/**
+ * Copyright (C) 2013 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/s/multi_host_query.h"
+
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/synchronization.h"
+
+namespace {
+
+ using namespace mongo;
+ using boost::shared_ptr;
+ using std::map;
+
+ class CallbackCheck {
+ public:
+
+ enum LinkMode {
+ None, Notify_Other, Wait_For_Other
+ };
+
+ CallbackCheck() :
+ _status(ErrorCodes::OperationIncomplete, ""), _linkMode(None) {
+ }
+
+ void blockUntil(CallbackCheck* other) {
+
+ _otherNotification.reset(new Notification);
+ _linkMode = Wait_For_Other;
+
+ other->_otherNotification = _otherNotification;
+ other->_linkMode = Notify_Other;
+ }
+
+ HostThreadPool::Callback getCallback() {
+ return boost::bind(&CallbackCheck::noteCallback, this);
+ }
+
+ HostThreadPool::Callback getHostCallback(const ConnectionString& host) {
+ return boost::bind(&CallbackCheck::noteHostCallback, this, host);
+ }
+
+ void noteHostCallback(const ConnectionString& host) {
+ _host = host;
+ noteCallback();
+ }
+
+ void noteCallback() {
+
+ _status = Status::OK();
+ _notification.notifyOne();
+
+ if (_linkMode == Wait_For_Other)
+ _otherNotification->waitToBeNotified();
+ else if (_linkMode == Notify_Other) {
+ _otherNotification->notifyOne();
+ }
+ }
+
+ void waitForCallback() {
+ _notification.waitToBeNotified();
+ }
+
+ Status getStatus() {
+ return _status;
+ }
+
+ const ConnectionString& getHost() {
+ return _host;
+ }
+
+ private:
+
+ Status _status;
+ Notification _notification;
+
+ ConnectionString _host;
+
+ shared_ptr<Notification> _otherNotification;
+ LinkMode _linkMode;
+ };
+
+ TEST(HostThreadPool, Schedule) {
+
+ CallbackCheck cbCheck;
+
+ // NOTE: pool must be initialized *after* the cbCheck that it executes - this avoids a
+ // subtle race where the cbCheck structure is disposed before the callback is complete.
+ HostThreadPool threadPool(1, true);
+
+ threadPool.schedule(cbCheck.getCallback());
+
+ cbCheck.waitForCallback();
+ ASSERT_OK(cbCheck.getStatus());
+ }
+
+ TEST(HostThreadPool, ScheduleTwoSerial) {
+
+ CallbackCheck cbCheckA;
+ CallbackCheck cbCheckB;
+
+ // NOTE: pool must be initialized *after* the cbCheck that it executes
+ HostThreadPool threadPool(1, true);
+
+ threadPool.schedule(cbCheckA.getCallback());
+ threadPool.schedule(cbCheckB.getCallback());
+
+ cbCheckB.waitForCallback();
+ cbCheckA.waitForCallback();
+
+ ASSERT_OK(cbCheckA.getStatus());
+ ASSERT_OK(cbCheckB.getStatus());
+ }
+
+ TEST(HostThreadPool, ScheduleTwoParallel) {
+
+ CallbackCheck cbCheckA;
+ CallbackCheck cbCheckB;
+
+ // NOTE: pool must be initialized *after* the cbCheck that it executes
+ HostThreadPool threadPool(2, true);
+
+ // Don't allow cbCheckA's callback to finish until cbCheckB's callback is processed
+ cbCheckA.blockUntil(&cbCheckB);
+
+ threadPool.schedule(cbCheckA.getCallback());
+ cbCheckA.waitForCallback();
+ ASSERT_OK(cbCheckA.getStatus());
+ // We're still blocking the thread processing cbCheckA's callback
+
+ threadPool.schedule(cbCheckB.getCallback());
+ cbCheckB.waitForCallback();
+ ASSERT_OK(cbCheckB.getStatus());
+ }
+
+ TEST(HostThreadPool, ScheduleTwoHosts) {
+
+ CallbackCheck cbCheckA;
+ CallbackCheck cbCheckB;
+
+ // NOTE: pool must be initialized *after* the cbCheck that it executes
+ HostThreadPools threadPool(1, true);
+
+ // Don't allow cbCheckA's callback to finish until cbCheckB's callback is processed.
+ // This means a single thread pool with a single thread would hang.
+ cbCheckA.blockUntil(&cbCheckB);
+
+ ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000"));
+ ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000"));
+
+ threadPool.schedule(hostA, cbCheckA.getHostCallback(hostA));
+ cbCheckA.waitForCallback();
+ ASSERT_OK(cbCheckA.getStatus());
+ ASSERT_EQUALS(cbCheckA.getHost().toString(), hostA.toString());
+ // We're still blocking the thread processing cbCheckA's callback
+
+ threadPool.schedule(hostB, cbCheckB.getHostCallback(hostB));
+ cbCheckB.waitForCallback();
+ ASSERT_OK(cbCheckB.getStatus());
+ ASSERT_EQUALS(cbCheckB.getHost().toString(), hostB.toString());
+ }
+
+ class MockSystemEnv : public MultiHostQueryOp::SystemEnv {
+ private:
+
+ struct MockHostInfo;
+ typedef map<ConnectionString, MockHostInfo*> HostInfoMap;
+
+ public:
+
+ MockSystemEnv(HostThreadPools* threadPool) :
+ _threadPool(threadPool), _mockTimeMillis(0) {
+ }
+
+ virtual ~MockSystemEnv() {
+ for (HostInfoMap::iterator it = _mockHostInfo.begin(); it != _mockHostInfo.end();
+ ++it) {
+ if (_threadPool)
+ _threadPool->waitUntilIdle(it->first);
+ delete it->second;
+ }
+ }
+
+ void setHostThreadPools(HostThreadPools* threadPool) {
+ _threadPool = threadPool;
+ }
+
+ void addMockHostResultAt(const ConnectionString& host, int timeMillis) {
+ newMockHostResultAt(host, timeMillis, Status::OK(), NULL);
+ }
+
+ void addMockHostErrorAt(const ConnectionString& host, int timeMillis, Status error) {
+ newMockHostResultAt(host, timeMillis, error, NULL);
+ }
+
+ void addMockHungHostAt(const ConnectionString& host,
+ int hangTimeMillis,
+ Notification* hangUntilNotify) {
+ newMockHostResultAt(host, hangTimeMillis, Status::OK(), hangUntilNotify);
+ }
+
+ Date_t currentTimeMillis() {
+ return _mockTimeMillis;
+ }
+
+ StatusWith<DBClientCursor*> doBlockingQuery(const ConnectionString& host,
+ const QuerySpec& query) {
+
+ ASSERT(_mockHostInfo.find(host) != _mockHostInfo.end());
+
+ MockHostInfo& info = *(_mockHostInfo.find(host)->second);
+
+ if (info.prevHostActiveNotify) {
+ info.prevHostActiveNotify->waitToBeNotified();
+ if (info.waitForPrevHostIdle) {
+ _threadPool->waitUntilIdle(info.prevHost);
+ }
+ }
+
+ _mockTimeMillis = info.queryTimeMillis;
+
+ if (info.nextHostActiveNotify) {
+ info.nextHostActiveNotify->notifyOne();
+ }
+
+ if (info.hangUntilNotify) {
+ info.hangUntilNotify->waitToBeNotified();
+ return StatusWith<DBClientCursor*>(ErrorCodes::InternalError, "");
+ }
+
+ if (!info.error.isOK()) {
+ return StatusWith<DBClientCursor*>(info.error);
+ }
+
+ //
+ // Successful mock query
+ //
+
+ if (!info.conn) {
+ info.conn.reset(new DBClientConnection(false));
+ // Need to do a connect failure so that we get an empty MessagingPort on the conn and
+ // the host name is set.
+ string errMsg;
+ ASSERT(!info.conn->connect(host.toString(), errMsg));
+ }
+
+ return StatusWith<DBClientCursor*>(new DBClientCursor(info.conn.get(),
+ query.ns(),
+ query.query(),
+ query.ntoreturn(),
+ query.ntoskip(),
+ query.fieldsPtr(),
+ query.options(),
+ 0 /* batchSize */));
+ }
+
+ private:
+
+ MockHostInfo* newMockHostResultAt(const ConnectionString& host,
+ int timeMillis,
+ const Status& error,
+ Notification* hangUntilNotify) {
+
+ ASSERT(_mockHostInfo.find(host) == _mockHostInfo.end());
+
+ MockHostInfo* info = new MockHostInfo(timeMillis);
+ _mockHostInfo.insert(make_pair(host, info));
+ info->error = error;
+ info->hangUntilNotify = hangUntilNotify;
+
+ linkMockTimes(host, info);
+ return info;
+ }
+
+ void linkMockTimes(const ConnectionString& host, MockHostInfo* info) {
+
+ //
+ // This just basically sets up notifications between the processing of results such that
+ // the results are returned in the order defined by the _mockQueryTimes map.
+ //
+ // Idea is (second host result) waits for (first host result) thread to start and end,
+ // (third host result) waits for (second host result) thread to start and end,
+ // (fourth host result) waits for (third host result) thread to start and end,
+ // ... and so on ...
+ //
+
+ ASSERT(_mockQueryTimes.find(info->queryTimeMillis) == _mockQueryTimes.end());
+
+ HostQueryTimes::iterator prev = _mockQueryTimes.insert(make_pair(info->queryTimeMillis,
+ host)).first;
+
+ if (prev != _mockQueryTimes.begin())
+ --prev;
+ else
+ prev = _mockQueryTimes.end();
+
+ HostQueryTimes::iterator next = _mockQueryTimes.upper_bound(info->queryTimeMillis);
+
+ if (prev != _mockQueryTimes.end()) {
+
+ const ConnectionString& prevHost = prev->second;
+ MockHostInfo* prevInfo = _mockHostInfo.find(prevHost)->second;
+
+ linkToNext(prevHost, prevInfo, info);
+ }
+
+ if (next != _mockQueryTimes.end()) {
+
+ const ConnectionString& nextHost = next->second;
+ MockHostInfo* nextInfo = _mockHostInfo.find(nextHost)->second;
+
+ linkToNext(host, info, nextInfo);
+ }
+ }
+
+ void linkToNext(const ConnectionString& host, MockHostInfo* info, MockHostInfo* nextInfo) {
+
+ nextInfo->prevHost = host;
+
+ nextInfo->prevHostActiveNotify.reset(new Notification());
+ info->nextHostActiveNotify = nextInfo->prevHostActiveNotify.get();
+
+ nextInfo->waitForPrevHostIdle = info->hangUntilNotify == NULL;
+ }
+
+ // Not owned here, needed to allow ordering of mock queries
+ HostThreadPools* _threadPool;
+
+ int _mockTimeMillis;
+
+ typedef map<int, ConnectionString> HostQueryTimes;
+ HostQueryTimes _mockQueryTimes;
+
+ struct MockHostInfo {
+
+ MockHostInfo(int queryTimeMillis) :
+ nextHostActiveNotify( NULL),
+ waitForPrevHostIdle(false),
+ queryTimeMillis(queryTimeMillis),
+ hangUntilNotify( NULL),
+ error(Status::OK()) {
+ }
+
+ Notification* nextHostActiveNotify;
+
+ ConnectionString prevHost;
+ scoped_ptr<Notification> prevHostActiveNotify;
+ bool waitForPrevHostIdle;
+
+ int queryTimeMillis;
+
+ scoped_ptr<DBClientConnection> conn;
+ Notification* hangUntilNotify;
+ Status error;
+ };
+
+ HostInfoMap _mockHostInfo;
+
+ };
+
+ QuerySpec buildSpec(const StringData& ns, const BSONObj& query) {
+ return QuerySpec(ns.toString(), query, BSONObj(), 0, 0, 0);
+ }
+
+ //
+ // Tests for the MultiHostQueryOp
+ //
+
+ TEST(MultiHostQueryOp, SingleHost) {
+
+ HostThreadPools threadPool(1, true);
+ MockSystemEnv mockSystem(&threadPool);
+
+ ConnectionString host = ConnectionString::mock(HostAndPort("$host:1000"));
+ vector<ConnectionString> hosts;
+ hosts.push_back(host);
+
+ mockSystem.addMockHostResultAt(host, 1000);
+
+ MultiHostQueryOp queryOp(&mockSystem, &threadPool);
+
+ QuerySpec query;
+ StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 2000);
+
+ ASSERT_OK(result.getStatus());
+ ASSERT(NULL != result.getValue());
+ ASSERT_EQUALS(result.getValue()->originalHost(), host.toString());
+ delete result.getValue();
+ }
+
+ TEST(MultiHostQueryOp, SingleHostError) {
+
+ HostThreadPools threadPool(1, true);
+ MockSystemEnv mockSystem(&threadPool);
+
+ ConnectionString host = ConnectionString::mock(HostAndPort("$host:1000"));
+ vector<ConnectionString> hosts;
+ hosts.push_back(host);
+
+ Status hostError = Status(ErrorCodes::InternalError, "");
+ mockSystem.addMockHostErrorAt(host, 1000, hostError);
+
+ MultiHostQueryOp queryOp(&mockSystem, &threadPool);
+
+ QuerySpec query;
+ StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 2000);
+
+ ASSERT_EQUALS(result.getStatus().code(), hostError.code());
+ }
+
+ TEST(MultiHostQueryOp, SingleHostHang) {
+
+ // Initialize notifier before the thread pool, otherwise we may dispose while threads are
+ // active
+ Notification unhangNotify;
+
+ HostThreadPools threadPool(1, true);
+ MockSystemEnv mockSystem(&threadPool);
+
+ ConnectionString host = ConnectionString::mock(HostAndPort("$host:1000"));
+ vector<ConnectionString> hosts;
+ hosts.push_back(host);
+
+ mockSystem.addMockHungHostAt(host, 4000, &unhangNotify);
+
+ MultiHostQueryOp queryOp(&mockSystem, &threadPool);
+
+ QuerySpec query;
+ StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 2000);
+ // Unhang before checking status, in case it throws
+ unhangNotify.notifyOne();
+
+ ASSERT_EQUALS(result.getStatus().code(), ErrorCodes::NetworkTimeout);
+ }
+
+ TEST(MultiHostQueryOp, TwoHostResponses) {
+
+ HostThreadPools threadPool(1, true);
+ MockSystemEnv mockSystem(&threadPool);
+
+ ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000"));
+ ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000"));
+ vector<ConnectionString> hosts;
+ hosts.push_back(hostA);
+ hosts.push_back(hostB);
+
+ // Make sure we return the first response, from hostB at time 1000
+ mockSystem.addMockHostResultAt(hostA, 2000);
+ mockSystem.addMockHostResultAt(hostB, 1000);
+
+ MultiHostQueryOp queryOp(&mockSystem, &threadPool);
+
+ QuerySpec query;
+ StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 3000);
+
+ ASSERT_OK(result.getStatus());
+ ASSERT(NULL != result.getValue());
+ ASSERT_EQUALS(result.getValue()->originalHost(), hostB.toString());
+ delete result.getValue();
+ }
+
+ TEST(MultiHostQueryOp, TwoHostsOneErrorResponse) {
+
+ HostThreadPools threadPool(1, true);
+ MockSystemEnv mockSystem(&threadPool);
+
+ ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000"));
+ ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000"));
+ vector<ConnectionString> hosts;
+ hosts.push_back(hostA);
+ hosts.push_back(hostB);
+
+ // The first response is a host error, the second is a successful result
+ Status hostError = Status(ErrorCodes::InternalError, "");
+ mockSystem.addMockHostErrorAt(hostA, 1000, hostError);
+ mockSystem.addMockHostResultAt(hostB, 2000);
+
+ MultiHostQueryOp queryOp(&mockSystem, &threadPool);
+
+ QuerySpec query;
+ StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 3000);
+
+ ASSERT_OK(result.getStatus());
+ ASSERT(NULL != result.getValue());
+ ASSERT_EQUALS(result.getValue()->originalHost(), hostB.toString());
+ delete result.getValue();
+ }
+
+ TEST(MultiHostQueryOp, TwoHostsBothErrors) {
+
+ HostThreadPools threadPool(1, true);
+ MockSystemEnv mockSystem(&threadPool);
+
+ ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000"));
+ ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000"));
+ vector<ConnectionString> hosts;
+ hosts.push_back(hostA);
+ hosts.push_back(hostB);
+
+ // Both responses are errors
+ Status hostError = Status(ErrorCodes::InternalError, "");
+ mockSystem.addMockHostErrorAt(hostA, 1000, hostError);
+ mockSystem.addMockHostErrorAt(hostB, 2000, hostError);
+
+ MultiHostQueryOp queryOp(&mockSystem, &threadPool);
+
+ QuerySpec query;
+ StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 3000);
+
+ ASSERT_EQUALS(result.getStatus().code(), ErrorCodes::MultipleErrorsOccurred);
+ }
+
+ TEST(MultiHostQueryOp, TwoHostsOneHang) {
+
+ // Initialize notifier before the thread pool
+ Notification unhangNotify;
+
+ HostThreadPools threadPool(1, true);
+ MockSystemEnv mockSystem(&threadPool);
+
+ ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000"));
+ ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000"));
+ vector<ConnectionString> hosts;
+ hosts.push_back(hostA);
+ hosts.push_back(hostB);
+
+ // One host hangs
+ mockSystem.addMockHungHostAt(hostA, 1000, &unhangNotify);
+ mockSystem.addMockHostResultAt(hostB, 2000);
+
+ MultiHostQueryOp queryOp(&mockSystem, &threadPool);
+
+ QuerySpec query;
+ StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 3000);
+ // Unhang before checking status, in case it throws
+ unhangNotify.notifyOne();
+
+ ASSERT_OK(result.getStatus());
+ ASSERT(NULL != result.getValue());
+ ASSERT_EQUALS(result.getValue()->originalHost(), hostB.toString());
+ delete result.getValue();
+ }
+
+ TEST(MultiHostQueryOp, TwoHostsOneHangOneError) {
+
+ // Initialize notifier before the thread pool
+ Notification unhangNotify;
+
+ HostThreadPools threadPool(1, true);
+ MockSystemEnv mockSystem(&threadPool);
+
+ ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000"));
+ ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000"));
+ vector<ConnectionString> hosts;
+ hosts.push_back(hostA);
+ hosts.push_back(hostB);
+
+ // One host hangs, one host has an error (at the mock timeout point so the query finishes)
+ Status hostError = Status(ErrorCodes::InternalError, "");
+ mockSystem.addMockHungHostAt(hostA, 1000, &unhangNotify);
+ mockSystem.addMockHostErrorAt(hostB, 3000, hostError);
+
+ MultiHostQueryOp queryOp(&mockSystem, &threadPool);
+
+ QuerySpec query;
+ StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 3000);
+ // Unhang before checking status, in case it throws
+ unhangNotify.notifyOne();
+
+ ASSERT_EQUALS(result.getStatus().code(), hostError.code());
+ }
+
+ TEST(MultiHostQueryOp, ThreeHostsOneHang) {
+
+ // Initialize notifier before the thread pool
+ Notification unhangNotify;
+
+ HostThreadPools threadPool(1, true);
+ MockSystemEnv mockSystem(&threadPool);
+
+ ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000"));
+ ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000"));
+ ConnectionString hostC = ConnectionString::mock(HostAndPort("$hostC:1000"));
+ vector<ConnectionString> hosts;
+ hosts.push_back(hostA);
+ hosts.push_back(hostB);
+ hosts.push_back(hostC);
+
+ // One host hangs, last host is fastest with result
+ mockSystem.addMockHungHostAt(hostA, 1000, &unhangNotify);
+ mockSystem.addMockHostResultAt(hostB, 3000);
+ mockSystem.addMockHostResultAt(hostC, 2000);
+
+ MultiHostQueryOp queryOp(&mockSystem, &threadPool);
+
+ QuerySpec query;
+ StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 4000);
+ // Unhang before checking status, in case it throws
+ unhangNotify.notifyOne();
+
+ ASSERT_OK(result.getStatus());
+ ASSERT(NULL != result.getValue());
+ ASSERT_EQUALS(result.getValue()->originalHost(), hostC.toString());
+ delete result.getValue();
+ }
+
+ TEST(MultiHostQueryOp, ThreeHostsTwoErrors) {
+
+ // Initialize notifier before the thread pool
+ Notification unhangNotify;
+
+ HostThreadPools threadPool(1, true);
+ MockSystemEnv mockSystem(&threadPool);
+
+ ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000"));
+ ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000"));
+ ConnectionString hostC = ConnectionString::mock(HostAndPort("$hostC:1000"));
+ vector<ConnectionString> hosts;
+ hosts.push_back(hostA);
+ hosts.push_back(hostB);
+ hosts.push_back(hostC);
+
+ // One host hangs, two hosts have errors (finish at mock timeout point so query ends)
+ Status hostError = Status(ErrorCodes::InternalError, "");
+ mockSystem.addMockHungHostAt(hostA, 1000, &unhangNotify);
+ mockSystem.addMockHostErrorAt(hostB, 4000, hostError);
+ mockSystem.addMockHostErrorAt(hostC, 2000, hostError);
+
+ MultiHostQueryOp queryOp(&mockSystem, &threadPool);
+
+ QuerySpec query;
+ StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 4000);
+ // Unhang before checking status, in case it throws
+ unhangNotify.notifyOne();
+
+ ASSERT_EQUALS(result.getStatus().code(), ErrorCodes::MultipleErrorsOccurred);
+ }
+
+ TEST(MultiHostQueryOp, ThreeHostsOneHangOneError) {
+
+ // Initialize notifier before the thread pool
+ Notification unhangNotify;
+
+ HostThreadPools threadPool(1, true);
+ MockSystemEnv mockSystem(&threadPool);
+
+ ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000"));
+ ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000"));
+ ConnectionString hostC = ConnectionString::mock(HostAndPort("$hostC:1000"));
+ vector<ConnectionString> hosts;
+ hosts.push_back(hostA);
+ hosts.push_back(hostB);
+ hosts.push_back(hostC);
+
+ // One host hangs, two hosts have errors (finish at mock timeout point so query ends)
+ Status hostError = Status(ErrorCodes::InternalError, "");
+ mockSystem.addMockHungHostAt(hostA, 1000, &unhangNotify);
+ mockSystem.addMockHostErrorAt(hostB, 2000, hostError);
+ mockSystem.addMockHostResultAt(hostC, 3000);
+
+ MultiHostQueryOp queryOp(&mockSystem, &threadPool);
+
+ QuerySpec query;
+ StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 4000);
+ // Unhang before checking status, in case it throws
+ unhangNotify.notifyOne();
+
+ ASSERT_OK(result.getStatus());
+ ASSERT(NULL != result.getValue());
+ ASSERT_EQUALS(result.getValue()->originalHost(), hostC.toString());
+ delete result.getValue();
+ }
+
+ TEST(MultiHostQueryOp, TwoHostsOneHangUnscoped) {
+
+ // Initialize notifier before the thread pool
+ Notification unhangNotify;
+
+ // Create a thread pool which detaches itself from outstanding work on cleanup
+ scoped_ptr<HostThreadPools> threadPool(new HostThreadPools(1, false));
+ MockSystemEnv mockSystem(threadPool.get());
+
+ ConnectionString hostA = ConnectionString::mock(HostAndPort("$hostA:1000"));
+ ConnectionString hostB = ConnectionString::mock(HostAndPort("$hostB:1000"));
+ vector<ConnectionString> hosts;
+ hosts.push_back(hostA);
+ hosts.push_back(hostB);
+
+ // One host hangs
+ mockSystem.addMockHungHostAt(hostA, 1000, &unhangNotify);
+ mockSystem.addMockHostResultAt(hostB, 2000);
+
+ MultiHostQueryOp queryOp(&mockSystem, threadPool.get());
+
+ QuerySpec query;
+ StatusWith<DBClientCursor*> result = queryOp.queryAny(hosts, query, 3000);
+
+ // Clean up the thread pool
+ mockSystem.setHostThreadPools( NULL);
+ threadPool.reset();
+
+ // Unhang before checking status, in case it throws
+ unhangNotify.notifyOne();
+
+ ASSERT_OK(result.getStatus());
+ ASSERT(NULL != result.getValue());
+ ASSERT_EQUALS(result.getValue()->originalHost(), hostB.toString());
+ delete result.getValue();
+
+ // Make sure we get the next result
+ result = queryOp.waitForNextResult(4000);
+
+ ASSERT_EQUALS(result.getStatus().code(), ErrorCodes::InternalError);
+ }
+
+} // unnamed namespace