summaryrefslogtreecommitdiff
path: root/src/mongo/s/client/multi_host_query.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/client/multi_host_query.cpp')
-rw-r--r--src/mongo/s/client/multi_host_query.cpp526
1 files changed, 245 insertions, 281 deletions
diff --git a/src/mongo/s/client/multi_host_query.cpp b/src/mongo/s/client/multi_host_query.cpp
index 2fcb5906bf4..bfe9d36f55d 100644
--- a/src/mongo/s/client/multi_host_query.cpp
+++ b/src/mongo/s/client/multi_host_query.cpp
@@ -34,375 +34,339 @@
namespace mongo {
- using std::shared_ptr;
- using std::make_pair;
- using std::string;
- using std::vector;
+using std::shared_ptr;
+using std::make_pair;
+using std::string;
+using std::vector;
- typedef stdx::unique_lock<stdx::mutex> boost_unique_lock;
+typedef stdx::unique_lock<stdx::mutex> boost_unique_lock;
- HostThreadPool::HostThreadPool(int poolSize, bool scopeAllWork) :
- _scopeAllWork(scopeAllWork), _context(new PoolContext) {
+HostThreadPool::HostThreadPool(int poolSize, bool scopeAllWork)
+ : _scopeAllWork(scopeAllWork), _context(new PoolContext) {
+ // All threads start as active to avoid races detecting idleness on thread startup -
+ // the pool isn't idle until all threads have started waiting.
+ _context->numActiveWorkers = poolSize;
- // All threads start as active to avoid races detecting idleness on thread startup -
- // the pool isn't idle until all threads have started waiting.
- _context->numActiveWorkers = poolSize;
-
- for (int i = 0; i < poolSize; ++i) {
-
- //
- // Each thread keeps a shared context allowing them to synchronize even if this
- // dispatching pool has already been disposed.
- //
-
- _threads.push_back(new stdx::thread(stdx::bind(&HostThreadPool::doWork, _context)));
- }
- }
+ for (int i = 0; i < poolSize; ++i) {
+ //
+ // Each thread keeps a shared context allowing them to synchronize even if this
+ // dispatching pool has already been disposed.
+ //
- void HostThreadPool::schedule(Callback callback) {
- boost_unique_lock lk(_context->mutex);
- _context->scheduled.push_back(callback);
- _context->workScheduledCV.notify_one();
+ _threads.push_back(new stdx::thread(stdx::bind(&HostThreadPool::doWork, _context)));
}
+}
- void HostThreadPool::doWork(std::shared_ptr<PoolContext> context) {
-
- while (true) {
-
- Callback callback;
-
- {
- boost_unique_lock lk(context->mutex);
-
- --context->numActiveWorkers;
- if (context->numActiveWorkers == 0)
- context->isIdleCV.notify_all();
-
- // Wait for work or until we're finished
- while (context->isPoolActive && context->scheduled.empty()) {
- context->workScheduledCV.wait(lk);
- }
+void HostThreadPool::schedule(Callback callback) {
+ boost_unique_lock lk(_context->mutex);
+ _context->scheduled.push_back(callback);
+ _context->workScheduledCV.notify_one();
+}
- //
- // Either the pool is no longer active, or the queue has some work we should do
- //
+void HostThreadPool::doWork(std::shared_ptr<PoolContext> context) {
+ while (true) {
+ Callback callback;
- if (!context->isPoolActive)
- return;
+ {
+ boost_unique_lock lk(context->mutex);
- invariant( !context->scheduled.empty() );
- callback = context->scheduled.front();
- context->scheduled.pop_front();
+ --context->numActiveWorkers;
+ if (context->numActiveWorkers == 0)
+ context->isIdleCV.notify_all();
- ++context->numActiveWorkers;
+ // Wait for work or until we're finished
+ while (context->isPoolActive && context->scheduled.empty()) {
+ context->workScheduledCV.wait(lk);
}
- callback();
- }
- }
-
- void HostThreadPool::waitUntilIdle() {
- boost_unique_lock lk(_context->mutex);
- while (_context->numActiveWorkers > 0) {
- _context->isIdleCV.wait(lk);
- }
- }
+ //
+ // Either the pool is no longer active, or the queue has some work we should do
+ //
- HostThreadPool::~HostThreadPool() {
+ if (!context->isPoolActive)
+ return;
- // Boost can throw on notify(), join(), detach()
+ invariant(!context->scheduled.empty());
+ callback = context->scheduled.front();
+ context->scheduled.pop_front();
- {
- boost_unique_lock lk(_context->mutex);
- _context->isPoolActive = false;
- _context->scheduled.clear();
+ ++context->numActiveWorkers;
}
- DESTRUCTOR_GUARD( _context->workScheduledCV.notify_all(); )
-
- for (vector<stdx::thread*>::iterator it = _threads.begin(); it != _threads.end(); ++it) {
-
- if (_scopeAllWork) {
- DESTRUCTOR_GUARD( ( *it )->join(); )
- }
- else {
- DESTRUCTOR_GUARD( ( *it )->detach(); )
- }
+ callback();
+ }
+}
- delete *it;
- }
+void HostThreadPool::waitUntilIdle() {
+ boost_unique_lock lk(_context->mutex);
+ while (_context->numActiveWorkers > 0) {
+ _context->isIdleCV.wait(lk);
}
+}
- HostThreadPools::HostThreadPools(int poolSize, bool scopeAllWork) :
- _poolSize(poolSize), _scopeAllWork(scopeAllWork) {
+HostThreadPool::~HostThreadPool() {
+ // Boost can throw on notify(), join(), detach()
+
+ {
+ boost_unique_lock lk(_context->mutex);
+ _context->isPoolActive = false;
+ _context->scheduled.clear();
}
- void HostThreadPools::schedule(const ConnectionString& host,
- HostThreadPool::Callback callback) {
- boost_unique_lock lk(_mutex);
+ DESTRUCTOR_GUARD(_context->workScheduledCV.notify_all();)
- HostPoolMap::iterator seenIt = _pools.find(host);
- if (seenIt == _pools.end()) {
- seenIt = _pools.insert(make_pair(host, new HostThreadPool(_poolSize, _scopeAllWork)))
- .first;
+ for (vector<stdx::thread*>::iterator it = _threads.begin(); it != _threads.end(); ++it) {
+ if (_scopeAllWork) {
+ DESTRUCTOR_GUARD((*it)->join();)
+ } else {
+ DESTRUCTOR_GUARD((*it)->detach();)
}
- seenIt->second->schedule(callback);
+ delete *it;
}
+}
- void HostThreadPools::waitUntilIdle(const ConnectionString& host) {
-
- // Note that this prevents the creation of any new pools - it is only intended to be used
- // for testing.
-
- boost_unique_lock lk(_mutex);
+HostThreadPools::HostThreadPools(int poolSize, bool scopeAllWork)
+ : _poolSize(poolSize), _scopeAllWork(scopeAllWork) {}
- HostPoolMap::iterator seenIt = _pools.find(host);
- if (seenIt == _pools.end())
- return;
+void HostThreadPools::schedule(const ConnectionString& host, HostThreadPool::Callback callback) {
+ boost_unique_lock lk(_mutex);
- seenIt->second->waitUntilIdle();
+ HostPoolMap::iterator seenIt = _pools.find(host);
+ if (seenIt == _pools.end()) {
+ seenIt = _pools.insert(make_pair(host, new HostThreadPool(_poolSize, _scopeAllWork))).first;
}
- HostThreadPools::~HostThreadPools() {
+ seenIt->second->schedule(callback);
+}
- boost_unique_lock lk(_mutex);
- for (HostPoolMap::iterator it = _pools.begin(); it != _pools.end(); ++it) {
- delete it->second;
- }
- }
+void HostThreadPools::waitUntilIdle(const ConnectionString& host) {
+ // Note that this prevents the creation of any new pools - it is only intended to be used
+ // for testing.
- MultiHostQueryOp::MultiHostQueryOp(SystemEnv* systemEnv, HostThreadPools* hostThreads) :
- _systemEnv(systemEnv), _hostThreads(hostThreads) {
- }
+ boost_unique_lock lk(_mutex);
- StatusWith<DBClientCursor*> MultiHostQueryOp::queryAny(const vector<ConnectionString>& hosts,
- const QuerySpec& query,
- int timeoutMillis) {
-
- Date_t nowMillis = _systemEnv->currentTimeMillis();
- Date_t timeoutAtMillis = nowMillis + Milliseconds(timeoutMillis);
+ HostPoolMap::iterator seenIt = _pools.find(host);
+ if (seenIt == _pools.end())
+ return;
- // Send out all queries
- scheduleQuery(hosts, query, timeoutAtMillis);
+ seenIt->second->waitUntilIdle();
+}
- // Wait for them to come back
- return waitForNextResult(timeoutAtMillis);
+HostThreadPools::~HostThreadPools() {
+ boost_unique_lock lk(_mutex);
+ for (HostPoolMap::iterator it = _pools.begin(); it != _pools.end(); ++it) {
+ delete it->second;
}
+}
- void MultiHostQueryOp::scheduleQuery(const vector<ConnectionString>& hosts,
- const QuerySpec& query,
- Date_t timeoutAtMillis) {
-
- invariant( _pending.empty() );
+MultiHostQueryOp::MultiHostQueryOp(SystemEnv* systemEnv, HostThreadPools* hostThreads)
+ : _systemEnv(systemEnv), _hostThreads(hostThreads) {}
- for (vector<ConnectionString>::const_iterator it = hosts.begin(); it != hosts.end(); ++it) {
+StatusWith<DBClientCursor*> MultiHostQueryOp::queryAny(const vector<ConnectionString>& hosts,
+ const QuerySpec& query,
+ int timeoutMillis) {
+ Date_t nowMillis = _systemEnv->currentTimeMillis();
+ Date_t timeoutAtMillis = nowMillis + Milliseconds(timeoutMillis);
- const ConnectionString& host = *it;
+ // Send out all queries
+ scheduleQuery(hosts, query, timeoutAtMillis);
- shared_ptr<PendingQueryContext> pendingOp(new PendingQueryContext(host,
- query,
- timeoutAtMillis,
- this));
+ // Wait for them to come back
+ return waitForNextResult(timeoutAtMillis);
+}
- _pending.insert(make_pair(host, pendingOp));
+void MultiHostQueryOp::scheduleQuery(const vector<ConnectionString>& hosts,
+ const QuerySpec& query,
+ Date_t timeoutAtMillis) {
+ invariant(_pending.empty());
- HostThreadPool::Callback callback =
- stdx::bind(&MultiHostQueryOp::PendingQueryContext::doBlockingQuery, pendingOp);
+ for (vector<ConnectionString>::const_iterator it = hosts.begin(); it != hosts.end(); ++it) {
+ const ConnectionString& host = *it;
- _hostThreads->schedule(host, callback);
- }
- }
+ shared_ptr<PendingQueryContext> pendingOp(
+ new PendingQueryContext(host, query, timeoutAtMillis, this));
- StatusWith<DBClientCursor*> MultiHostQueryOp::waitForNextResult(Date_t timeoutAtMillis) {
+ _pending.insert(make_pair(host, pendingOp));
- StatusWith<DBClientCursor*> nextResult( NULL);
+ HostThreadPool::Callback callback =
+ stdx::bind(&MultiHostQueryOp::PendingQueryContext::doBlockingQuery, pendingOp);
- boost_unique_lock lk(_resultsMutex);
- while (!releaseResult_inlock(&nextResult)) {
+ _hostThreads->schedule(host, callback);
+ }
+}
- Date_t nowMillis = _systemEnv->currentTimeMillis();
+StatusWith<DBClientCursor*> MultiHostQueryOp::waitForNextResult(Date_t timeoutAtMillis) {
+ StatusWith<DBClientCursor*> nextResult(NULL);
- if (nowMillis >= timeoutAtMillis) {
- nextResult = StatusWith<DBClientCursor*>(combineErrorResults_inlock());
- break;
- }
+ boost_unique_lock lk(_resultsMutex);
+ while (!releaseResult_inlock(&nextResult)) {
+ Date_t nowMillis = _systemEnv->currentTimeMillis();
- _nextResultCV.wait_for(lk, timeoutAtMillis - nowMillis);
+ if (nowMillis >= timeoutAtMillis) {
+ nextResult = StatusWith<DBClientCursor*>(combineErrorResults_inlock());
+ break;
}
- dassert( !nextResult.isOK() || nextResult.getValue() );
- return nextResult;
+ _nextResultCV.wait_for(lk, timeoutAtMillis - nowMillis);
}
- void MultiHostQueryOp::noteResult(const ConnectionString& host,
- StatusWith<DBClientCursor*> result) {
-
- boost_unique_lock lk(_resultsMutex);
- dassert( _results.find( host ) == _results.end() );
- _results.insert(make_pair(host, result));
+ dassert(!nextResult.isOK() || nextResult.getValue());
+ return nextResult;
+}
- _nextResultCV.notify_one();
- }
+void MultiHostQueryOp::noteResult(const ConnectionString& host,
+ StatusWith<DBClientCursor*> result) {
+ boost_unique_lock lk(_resultsMutex);
+ dassert(_results.find(host) == _results.end());
+ _results.insert(make_pair(host, result));
- /**
- * The results in the result map have four states:
- * Nonexistent - query result still pending
- * Status::OK w/ pointer - successful query result, not yet released to user
- * Status::OK w/ NULL pointer - successful query result, user consumed the result
- * Status::Not OK - error during query
- *
- * This function returns true and the next result to release to the user (or an error
- * if there can be no successful results to release) or false to indicate the user
- * should keep waiting.
- */
- bool MultiHostQueryOp::releaseResult_inlock(StatusWith<DBClientCursor*>* nextResult) {
-
- int numErrors = 0;
- int numReleased = 0;
- for (ResultMap::iterator it = _results.begin(); it != _results.end(); ++it) {
-
- StatusWith<DBClientCursor*>& result = it->second;
-
- if (result.isOK() && result.getValue() != NULL) {
- *nextResult = result;
- it->second = StatusWith<DBClientCursor*>( NULL);
- return true;
- }
- else if (result.isOK()) {
- ++numReleased;
- }
- else {
- ++numErrors;
- }
- }
+ _nextResultCV.notify_one();
+}
- if (numErrors + numReleased == static_cast<int>(_pending.size())) {
- *nextResult = StatusWith<DBClientCursor*>(combineErrorResults_inlock());
+/**
+ * The results in the result map have four states:
+ * Nonexistent - query result still pending
+ * Status::OK w/ pointer - successful query result, not yet released to user
+ * Status::OK w/ NULL pointer - successful query result, user consumed the result
+ * Status::Not OK - error during query
+ *
+ * This function returns true and the next result to release to the user (or an error
+ * if there can be no successful results to release) or false to indicate the user
+ * should keep waiting.
+ */
+bool MultiHostQueryOp::releaseResult_inlock(StatusWith<DBClientCursor*>* nextResult) {
+ int numErrors = 0;
+ int numReleased = 0;
+ for (ResultMap::iterator it = _results.begin(); it != _results.end(); ++it) {
+ StatusWith<DBClientCursor*>& result = it->second;
+
+ if (result.isOK() && result.getValue() != NULL) {
+ *nextResult = result;
+ it->second = StatusWith<DBClientCursor*>(NULL);
return true;
+ } else if (result.isOK()) {
+ ++numReleased;
+ } else {
+ ++numErrors;
}
-
- return false;
}
- /**
- * Goes through the set of results and combines all non-OK results into a single Status.
- * If a single error is found, just returns that error.
- * If no non-OK results are found, assumes the cause is a timeout.
- */
- Status MultiHostQueryOp::combineErrorResults_inlock() {
-
- ErrorCodes::Error code = ErrorCodes::OK;
- StringBuilder errMsg;
- // Whether we should include human-readable codes in the msg - we don't need them if we're
- // not aggregating multiple statuses together
- bool includeHRCodes = false;
-
- for (ResultMap::const_iterator it = _results.begin(); it != _results.end(); ++it) {
-
- const StatusWith<DBClientCursor*>& result = it->second;
-
- if (!result.isOK()) {
-
- if (code == ErrorCodes::OK) {
- code = result.getStatus().code();
- }
- else {
+ if (numErrors + numReleased == static_cast<int>(_pending.size())) {
+ *nextResult = StatusWith<DBClientCursor*>(combineErrorResults_inlock());
+ return true;
+ }
- if (!includeHRCodes) {
- includeHRCodes = true;
- // Fixup the single error message to include a code
- errMsg.reset();
- errMsg.append(Status(code, errMsg.str()).toString());
- }
+ return false;
+}
- code = ErrorCodes::MultipleErrorsOccurred;
- errMsg.append(" :: and :: ");
+/**
+ * Goes through the set of results and combines all non-OK results into a single Status.
+ * If a single error is found, just returns that error.
+ * If no non-OK results are found, assumes the cause is a timeout.
+ */
+Status MultiHostQueryOp::combineErrorResults_inlock() {
+ ErrorCodes::Error code = ErrorCodes::OK;
+ StringBuilder errMsg;
+ // Whether we should include human-readable codes in the msg - we don't need them if we're
+ // not aggregating multiple statuses together
+ bool includeHRCodes = false;
+
+ for (ResultMap::const_iterator it = _results.begin(); it != _results.end(); ++it) {
+ const StatusWith<DBClientCursor*>& result = it->second;
+
+ if (!result.isOK()) {
+ if (code == ErrorCodes::OK) {
+ code = result.getStatus().code();
+ } else {
+ if (!includeHRCodes) {
+ includeHRCodes = true;
+ // Fixup the single error message to include a code
+ errMsg.reset();
+ errMsg.append(Status(code, errMsg.str()).toString());
}
- errMsg.append(
- includeHRCodes ? result.getStatus().toString() : result.getStatus().reason());
- errMsg.append(string(", host ") + it->first.toString());
+ code = ErrorCodes::MultipleErrorsOccurred;
+ errMsg.append(" :: and :: ");
}
- }
- if (code == ErrorCodes::OK) {
- return Status(ErrorCodes::NetworkTimeout, "no results were returned in time");
+ errMsg.append(includeHRCodes ? result.getStatus().toString()
+ : result.getStatus().reason());
+ errMsg.append(string(", host ") + it->first.toString());
}
-
- return Status(code, errMsg.str());
}
- MultiHostQueryOp::PendingQueryContext::PendingQueryContext(const ConnectionString& host,
- const QuerySpec& query,
- const Date_t timeoutAtMillis,
- MultiHostQueryOp* parentOp) :
- host(host), query(query), timeoutAtMillis(timeoutAtMillis), parentOp(parentOp) {
+ if (code == ErrorCodes::OK) {
+ return Status(ErrorCodes::NetworkTimeout, "no results were returned in time");
}
- void MultiHostQueryOp::PendingQueryContext::doBlockingQuery() {
-
- // This *NEEDS* to be around for as long as we're doing queries - i.e. as long as the
- // HostThreadPools is.
- MultiHostQueryOp::SystemEnv* systemEnv;
+ return Status(code, errMsg.str());
+}
- // Extract means of doing query from the parent op
- {
- boost_unique_lock lk(parentMutex);
+MultiHostQueryOp::PendingQueryContext::PendingQueryContext(const ConnectionString& host,
+ const QuerySpec& query,
+ const Date_t timeoutAtMillis,
+ MultiHostQueryOp* parentOp)
+ : host(host), query(query), timeoutAtMillis(timeoutAtMillis), parentOp(parentOp) {}
- if (!parentOp)
- return;
+void MultiHostQueryOp::PendingQueryContext::doBlockingQuery() {
+ // This *NEEDS* to be around for as long as we're doing queries - i.e. as long as the
+ // HostThreadPools is.
+ MultiHostQueryOp::SystemEnv* systemEnv;
- systemEnv = parentOp->_systemEnv;
- }
+ // Extract means of doing query from the parent op
+ {
+ boost_unique_lock lk(parentMutex);
- // Make sure we're not timed out
- Date_t nowMillis = systemEnv->currentTimeMillis();
- if (nowMillis >= timeoutAtMillis)
+ if (!parentOp)
return;
- // Do query
- StatusWith<DBClientCursor*> result = systemEnv->doBlockingQuery(host, query);
-
- // Push results back to parent op if it's still around
- {
- boost_unique_lock lk(parentMutex);
-
- if (parentOp)
- parentOp->noteResult(host, result);
- else if(result.isOK())
- delete result.getValue();
- }
+ systemEnv = parentOp->_systemEnv;
}
- MultiHostQueryOp::~MultiHostQueryOp() {
+ // Make sure we're not timed out
+ Date_t nowMillis = systemEnv->currentTimeMillis();
+ if (nowMillis >= timeoutAtMillis)
+ return;
- //
- // Orphan all outstanding query contexts that haven't reported back - these will be gc'd
- // once all scheduled query callbacks are finished.
- //
+ // Do query
+ StatusWith<DBClientCursor*> result = systemEnv->doBlockingQuery(host, query);
- for (PendingMap::iterator it = _pending.begin(); it != _pending.end(); ++it) {
+ // Push results back to parent op if it's still around
+ {
+ boost_unique_lock lk(parentMutex);
- shared_ptr<PendingQueryContext>& pendingContext = it->second;
+ if (parentOp)
+ parentOp->noteResult(host, result);
+ else if (result.isOK())
+ delete result.getValue();
+ }
+}
- boost_unique_lock lk(pendingContext->parentMutex);
- pendingContext->parentOp = NULL;
- }
+MultiHostQueryOp::~MultiHostQueryOp() {
+ //
+ // Orphan all outstanding query contexts that haven't reported back - these will be gc'd
+ // once all scheduled query callbacks are finished.
+ //
- //
- // Nobody else should be modifying _results now - callbacks don't have access to this op,
- // and other clients should know the op is going out of scope
- //
+ for (PendingMap::iterator it = _pending.begin(); it != _pending.end(); ++it) {
+ shared_ptr<PendingQueryContext>& pendingContext = it->second;
- for (ResultMap::iterator it = _results.begin(); it != _results.end(); ++it) {
+ boost_unique_lock lk(pendingContext->parentMutex);
+ pendingContext->parentOp = NULL;
+ }
- StatusWith<DBClientCursor*>& result = it->second;
+ //
+ // Nobody else should be modifying _results now - callbacks don't have access to this op,
+ // and other clients should know the op is going out of scope
+ //
- if (result.isOK()) {
- delete result.getValue();
- }
+ for (ResultMap::iterator it = _results.begin(); it != _results.end(); ++it) {
+ StatusWith<DBClientCursor*>& result = it->second;
+
+ if (result.isOK()) {
+ delete result.getValue();
}
}
-
+}
}