diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 00:22:50 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 10:56:02 -0400 |
commit | 9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch) | |
tree | 3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/s/client/multi_host_query.cpp | |
parent | 01965cf52bce6976637ecb8f4a622aeb05ab256a (diff) | |
download | mongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz |
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/s/client/multi_host_query.cpp')
-rw-r--r-- | src/mongo/s/client/multi_host_query.cpp | 526 |
1 files changed, 245 insertions, 281 deletions
diff --git a/src/mongo/s/client/multi_host_query.cpp b/src/mongo/s/client/multi_host_query.cpp index 2fcb5906bf4..bfe9d36f55d 100644 --- a/src/mongo/s/client/multi_host_query.cpp +++ b/src/mongo/s/client/multi_host_query.cpp @@ -34,375 +34,339 @@ namespace mongo { - using std::shared_ptr; - using std::make_pair; - using std::string; - using std::vector; +using std::shared_ptr; +using std::make_pair; +using std::string; +using std::vector; - typedef stdx::unique_lock<stdx::mutex> boost_unique_lock; +typedef stdx::unique_lock<stdx::mutex> boost_unique_lock; - HostThreadPool::HostThreadPool(int poolSize, bool scopeAllWork) : - _scopeAllWork(scopeAllWork), _context(new PoolContext) { +HostThreadPool::HostThreadPool(int poolSize, bool scopeAllWork) + : _scopeAllWork(scopeAllWork), _context(new PoolContext) { + // All threads start as active to avoid races detecting idleness on thread startup - + // the pool isn't idle until all threads have started waiting. + _context->numActiveWorkers = poolSize; - // All threads start as active to avoid races detecting idleness on thread startup - - // the pool isn't idle until all threads have started waiting. - _context->numActiveWorkers = poolSize; - - for (int i = 0; i < poolSize; ++i) { - - // - // Each thread keeps a shared context allowing them to synchronize even if this - // dispatching pool has already been disposed. - // - - _threads.push_back(new stdx::thread(stdx::bind(&HostThreadPool::doWork, _context))); - } - } + for (int i = 0; i < poolSize; ++i) { + // + // Each thread keeps a shared context allowing them to synchronize even if this + // dispatching pool has already been disposed. + // - void HostThreadPool::schedule(Callback callback) { - boost_unique_lock lk(_context->mutex); - _context->scheduled.push_back(callback); - _context->workScheduledCV.notify_one(); + _threads.push_back(new stdx::thread(stdx::bind(&HostThreadPool::doWork, _context))); } +} - void HostThreadPool::doWork(std::shared_ptr<PoolContext> context) { - - while (true) { - - Callback callback; - - { - boost_unique_lock lk(context->mutex); - - --context->numActiveWorkers; - if (context->numActiveWorkers == 0) - context->isIdleCV.notify_all(); - - // Wait for work or until we're finished - while (context->isPoolActive && context->scheduled.empty()) { - context->workScheduledCV.wait(lk); - } +void HostThreadPool::schedule(Callback callback) { + boost_unique_lock lk(_context->mutex); + _context->scheduled.push_back(callback); + _context->workScheduledCV.notify_one(); +} - // - // Either the pool is no longer active, or the queue has some work we should do - // +void HostThreadPool::doWork(std::shared_ptr<PoolContext> context) { + while (true) { + Callback callback; - if (!context->isPoolActive) - return; + { + boost_unique_lock lk(context->mutex); - invariant( !context->scheduled.empty() ); - callback = context->scheduled.front(); - context->scheduled.pop_front(); + --context->numActiveWorkers; + if (context->numActiveWorkers == 0) + context->isIdleCV.notify_all(); - ++context->numActiveWorkers; + // Wait for work or until we're finished + while (context->isPoolActive && context->scheduled.empty()) { + context->workScheduledCV.wait(lk); } - callback(); - } - } - - void HostThreadPool::waitUntilIdle() { - boost_unique_lock lk(_context->mutex); - while (_context->numActiveWorkers > 0) { - _context->isIdleCV.wait(lk); - } - } + // + // Either the pool is no longer active, or the queue has some work we should do + // - HostThreadPool::~HostThreadPool() { + if (!context->isPoolActive) + return; - // Boost can throw on notify(), join(), detach() + invariant(!context->scheduled.empty()); + callback = context->scheduled.front(); + context->scheduled.pop_front(); - { - boost_unique_lock lk(_context->mutex); - _context->isPoolActive = false; - _context->scheduled.clear(); + ++context->numActiveWorkers; } - DESTRUCTOR_GUARD( _context->workScheduledCV.notify_all(); ) - - for (vector<stdx::thread*>::iterator it = _threads.begin(); it != _threads.end(); ++it) { - - if (_scopeAllWork) { - DESTRUCTOR_GUARD( ( *it )->join(); ) - } - else { - DESTRUCTOR_GUARD( ( *it )->detach(); ) - } + callback(); + } +} - delete *it; - } +void HostThreadPool::waitUntilIdle() { + boost_unique_lock lk(_context->mutex); + while (_context->numActiveWorkers > 0) { + _context->isIdleCV.wait(lk); } +} - HostThreadPools::HostThreadPools(int poolSize, bool scopeAllWork) : - _poolSize(poolSize), _scopeAllWork(scopeAllWork) { +HostThreadPool::~HostThreadPool() { + // Boost can throw on notify(), join(), detach() + + { + boost_unique_lock lk(_context->mutex); + _context->isPoolActive = false; + _context->scheduled.clear(); } - void HostThreadPools::schedule(const ConnectionString& host, - HostThreadPool::Callback callback) { - boost_unique_lock lk(_mutex); + DESTRUCTOR_GUARD(_context->workScheduledCV.notify_all();) - HostPoolMap::iterator seenIt = _pools.find(host); - if (seenIt == _pools.end()) { - seenIt = _pools.insert(make_pair(host, new HostThreadPool(_poolSize, _scopeAllWork))) - .first; + for (vector<stdx::thread*>::iterator it = _threads.begin(); it != _threads.end(); ++it) { + if (_scopeAllWork) { + DESTRUCTOR_GUARD((*it)->join();) + } else { + DESTRUCTOR_GUARD((*it)->detach();) } - seenIt->second->schedule(callback); + delete *it; } +} - void HostThreadPools::waitUntilIdle(const ConnectionString& host) { - - // Note that this prevents the creation of any new pools - it is only intended to be used - // for testing. - - boost_unique_lock lk(_mutex); +HostThreadPools::HostThreadPools(int poolSize, bool scopeAllWork) + : _poolSize(poolSize), _scopeAllWork(scopeAllWork) {} - HostPoolMap::iterator seenIt = _pools.find(host); - if (seenIt == _pools.end()) - return; +void HostThreadPools::schedule(const ConnectionString& host, HostThreadPool::Callback callback) { + boost_unique_lock lk(_mutex); - seenIt->second->waitUntilIdle(); + HostPoolMap::iterator seenIt = _pools.find(host); + if (seenIt == _pools.end()) { + seenIt = _pools.insert(make_pair(host, new HostThreadPool(_poolSize, _scopeAllWork))).first; } - HostThreadPools::~HostThreadPools() { + seenIt->second->schedule(callback); +} - boost_unique_lock lk(_mutex); - for (HostPoolMap::iterator it = _pools.begin(); it != _pools.end(); ++it) { - delete it->second; - } - } +void HostThreadPools::waitUntilIdle(const ConnectionString& host) { + // Note that this prevents the creation of any new pools - it is only intended to be used + // for testing. - MultiHostQueryOp::MultiHostQueryOp(SystemEnv* systemEnv, HostThreadPools* hostThreads) : - _systemEnv(systemEnv), _hostThreads(hostThreads) { - } + boost_unique_lock lk(_mutex); - StatusWith<DBClientCursor*> MultiHostQueryOp::queryAny(const vector<ConnectionString>& hosts, - const QuerySpec& query, - int timeoutMillis) { - - Date_t nowMillis = _systemEnv->currentTimeMillis(); - Date_t timeoutAtMillis = nowMillis + Milliseconds(timeoutMillis); + HostPoolMap::iterator seenIt = _pools.find(host); + if (seenIt == _pools.end()) + return; - // Send out all queries - scheduleQuery(hosts, query, timeoutAtMillis); + seenIt->second->waitUntilIdle(); +} - // Wait for them to come back - return waitForNextResult(timeoutAtMillis); +HostThreadPools::~HostThreadPools() { + boost_unique_lock lk(_mutex); + for (HostPoolMap::iterator it = _pools.begin(); it != _pools.end(); ++it) { + delete it->second; } +} - void MultiHostQueryOp::scheduleQuery(const vector<ConnectionString>& hosts, - const QuerySpec& query, - Date_t timeoutAtMillis) { - - invariant( _pending.empty() ); +MultiHostQueryOp::MultiHostQueryOp(SystemEnv* systemEnv, HostThreadPools* hostThreads) + : _systemEnv(systemEnv), _hostThreads(hostThreads) {} - for (vector<ConnectionString>::const_iterator it = hosts.begin(); it != hosts.end(); ++it) { +StatusWith<DBClientCursor*> MultiHostQueryOp::queryAny(const vector<ConnectionString>& hosts, + const QuerySpec& query, + int timeoutMillis) { + Date_t nowMillis = _systemEnv->currentTimeMillis(); + Date_t timeoutAtMillis = nowMillis + Milliseconds(timeoutMillis); - const ConnectionString& host = *it; + // Send out all queries + scheduleQuery(hosts, query, timeoutAtMillis); - shared_ptr<PendingQueryContext> pendingOp(new PendingQueryContext(host, - query, - timeoutAtMillis, - this)); + // Wait for them to come back + return waitForNextResult(timeoutAtMillis); +} - _pending.insert(make_pair(host, pendingOp)); +void MultiHostQueryOp::scheduleQuery(const vector<ConnectionString>& hosts, + const QuerySpec& query, + Date_t timeoutAtMillis) { + invariant(_pending.empty()); - HostThreadPool::Callback callback = - stdx::bind(&MultiHostQueryOp::PendingQueryContext::doBlockingQuery, pendingOp); + for (vector<ConnectionString>::const_iterator it = hosts.begin(); it != hosts.end(); ++it) { + const ConnectionString& host = *it; - _hostThreads->schedule(host, callback); - } - } + shared_ptr<PendingQueryContext> pendingOp( + new PendingQueryContext(host, query, timeoutAtMillis, this)); - StatusWith<DBClientCursor*> MultiHostQueryOp::waitForNextResult(Date_t timeoutAtMillis) { + _pending.insert(make_pair(host, pendingOp)); - StatusWith<DBClientCursor*> nextResult( NULL); + HostThreadPool::Callback callback = + stdx::bind(&MultiHostQueryOp::PendingQueryContext::doBlockingQuery, pendingOp); - boost_unique_lock lk(_resultsMutex); - while (!releaseResult_inlock(&nextResult)) { + _hostThreads->schedule(host, callback); + } +} - Date_t nowMillis = _systemEnv->currentTimeMillis(); +StatusWith<DBClientCursor*> MultiHostQueryOp::waitForNextResult(Date_t timeoutAtMillis) { + StatusWith<DBClientCursor*> nextResult(NULL); - if (nowMillis >= timeoutAtMillis) { - nextResult = StatusWith<DBClientCursor*>(combineErrorResults_inlock()); - break; - } + boost_unique_lock lk(_resultsMutex); + while (!releaseResult_inlock(&nextResult)) { + Date_t nowMillis = _systemEnv->currentTimeMillis(); - _nextResultCV.wait_for(lk, timeoutAtMillis - nowMillis); + if (nowMillis >= timeoutAtMillis) { + nextResult = StatusWith<DBClientCursor*>(combineErrorResults_inlock()); + break; } - dassert( !nextResult.isOK() || nextResult.getValue() ); - return nextResult; + _nextResultCV.wait_for(lk, timeoutAtMillis - nowMillis); } - void MultiHostQueryOp::noteResult(const ConnectionString& host, - StatusWith<DBClientCursor*> result) { - - boost_unique_lock lk(_resultsMutex); - dassert( _results.find( host ) == _results.end() ); - _results.insert(make_pair(host, result)); + dassert(!nextResult.isOK() || nextResult.getValue()); + return nextResult; +} - _nextResultCV.notify_one(); - } +void MultiHostQueryOp::noteResult(const ConnectionString& host, + StatusWith<DBClientCursor*> result) { + boost_unique_lock lk(_resultsMutex); + dassert(_results.find(host) == _results.end()); + _results.insert(make_pair(host, result)); - /** - * The results in the result map have four states: - * Nonexistent - query result still pending - * Status::OK w/ pointer - successful query result, not yet released to user - * Status::OK w/ NULL pointer - successful query result, user consumed the result - * Status::Not OK - error during query - * - * This function returns true and the next result to release to the user (or an error - * if there can be no successful results to release) or false to indicate the user - * should keep waiting. - */ - bool MultiHostQueryOp::releaseResult_inlock(StatusWith<DBClientCursor*>* nextResult) { - - int numErrors = 0; - int numReleased = 0; - for (ResultMap::iterator it = _results.begin(); it != _results.end(); ++it) { - - StatusWith<DBClientCursor*>& result = it->second; - - if (result.isOK() && result.getValue() != NULL) { - *nextResult = result; - it->second = StatusWith<DBClientCursor*>( NULL); - return true; - } - else if (result.isOK()) { - ++numReleased; - } - else { - ++numErrors; - } - } + _nextResultCV.notify_one(); +} - if (numErrors + numReleased == static_cast<int>(_pending.size())) { - *nextResult = StatusWith<DBClientCursor*>(combineErrorResults_inlock()); +/** + * The results in the result map have four states: + * Nonexistent - query result still pending + * Status::OK w/ pointer - successful query result, not yet released to user + * Status::OK w/ NULL pointer - successful query result, user consumed the result + * Status::Not OK - error during query + * + * This function returns true and the next result to release to the user (or an error + * if there can be no successful results to release) or false to indicate the user + * should keep waiting. + */ +bool MultiHostQueryOp::releaseResult_inlock(StatusWith<DBClientCursor*>* nextResult) { + int numErrors = 0; + int numReleased = 0; + for (ResultMap::iterator it = _results.begin(); it != _results.end(); ++it) { + StatusWith<DBClientCursor*>& result = it->second; + + if (result.isOK() && result.getValue() != NULL) { + *nextResult = result; + it->second = StatusWith<DBClientCursor*>(NULL); return true; + } else if (result.isOK()) { + ++numReleased; + } else { + ++numErrors; } - - return false; } - /** - * Goes through the set of results and combines all non-OK results into a single Status. - * If a single error is found, just returns that error. - * If no non-OK results are found, assumes the cause is a timeout. - */ - Status MultiHostQueryOp::combineErrorResults_inlock() { - - ErrorCodes::Error code = ErrorCodes::OK; - StringBuilder errMsg; - // Whether we should include human-readable codes in the msg - we don't need them if we're - // not aggregating multiple statuses together - bool includeHRCodes = false; - - for (ResultMap::const_iterator it = _results.begin(); it != _results.end(); ++it) { - - const StatusWith<DBClientCursor*>& result = it->second; - - if (!result.isOK()) { - - if (code == ErrorCodes::OK) { - code = result.getStatus().code(); - } - else { + if (numErrors + numReleased == static_cast<int>(_pending.size())) { + *nextResult = StatusWith<DBClientCursor*>(combineErrorResults_inlock()); + return true; + } - if (!includeHRCodes) { - includeHRCodes = true; - // Fixup the single error message to include a code - errMsg.reset(); - errMsg.append(Status(code, errMsg.str()).toString()); - } + return false; +} - code = ErrorCodes::MultipleErrorsOccurred; - errMsg.append(" :: and :: "); +/** + * Goes through the set of results and combines all non-OK results into a single Status. + * If a single error is found, just returns that error. + * If no non-OK results are found, assumes the cause is a timeout. + */ +Status MultiHostQueryOp::combineErrorResults_inlock() { + ErrorCodes::Error code = ErrorCodes::OK; + StringBuilder errMsg; + // Whether we should include human-readable codes in the msg - we don't need them if we're + // not aggregating multiple statuses together + bool includeHRCodes = false; + + for (ResultMap::const_iterator it = _results.begin(); it != _results.end(); ++it) { + const StatusWith<DBClientCursor*>& result = it->second; + + if (!result.isOK()) { + if (code == ErrorCodes::OK) { + code = result.getStatus().code(); + } else { + if (!includeHRCodes) { + includeHRCodes = true; + // Fixup the single error message to include a code + errMsg.reset(); + errMsg.append(Status(code, errMsg.str()).toString()); } - errMsg.append( - includeHRCodes ? result.getStatus().toString() : result.getStatus().reason()); - errMsg.append(string(", host ") + it->first.toString()); + code = ErrorCodes::MultipleErrorsOccurred; + errMsg.append(" :: and :: "); } - } - if (code == ErrorCodes::OK) { - return Status(ErrorCodes::NetworkTimeout, "no results were returned in time"); + errMsg.append(includeHRCodes ? result.getStatus().toString() + : result.getStatus().reason()); + errMsg.append(string(", host ") + it->first.toString()); } - - return Status(code, errMsg.str()); } - MultiHostQueryOp::PendingQueryContext::PendingQueryContext(const ConnectionString& host, - const QuerySpec& query, - const Date_t timeoutAtMillis, - MultiHostQueryOp* parentOp) : - host(host), query(query), timeoutAtMillis(timeoutAtMillis), parentOp(parentOp) { + if (code == ErrorCodes::OK) { + return Status(ErrorCodes::NetworkTimeout, "no results were returned in time"); } - void MultiHostQueryOp::PendingQueryContext::doBlockingQuery() { - - // This *NEEDS* to be around for as long as we're doing queries - i.e. as long as the - // HostThreadPools is. - MultiHostQueryOp::SystemEnv* systemEnv; + return Status(code, errMsg.str()); +} - // Extract means of doing query from the parent op - { - boost_unique_lock lk(parentMutex); +MultiHostQueryOp::PendingQueryContext::PendingQueryContext(const ConnectionString& host, + const QuerySpec& query, + const Date_t timeoutAtMillis, + MultiHostQueryOp* parentOp) + : host(host), query(query), timeoutAtMillis(timeoutAtMillis), parentOp(parentOp) {} - if (!parentOp) - return; +void MultiHostQueryOp::PendingQueryContext::doBlockingQuery() { + // This *NEEDS* to be around for as long as we're doing queries - i.e. as long as the + // HostThreadPools is. + MultiHostQueryOp::SystemEnv* systemEnv; - systemEnv = parentOp->_systemEnv; - } + // Extract means of doing query from the parent op + { + boost_unique_lock lk(parentMutex); - // Make sure we're not timed out - Date_t nowMillis = systemEnv->currentTimeMillis(); - if (nowMillis >= timeoutAtMillis) + if (!parentOp) return; - // Do query - StatusWith<DBClientCursor*> result = systemEnv->doBlockingQuery(host, query); - - // Push results back to parent op if it's still around - { - boost_unique_lock lk(parentMutex); - - if (parentOp) - parentOp->noteResult(host, result); - else if(result.isOK()) - delete result.getValue(); - } + systemEnv = parentOp->_systemEnv; } - MultiHostQueryOp::~MultiHostQueryOp() { + // Make sure we're not timed out + Date_t nowMillis = systemEnv->currentTimeMillis(); + if (nowMillis >= timeoutAtMillis) + return; - // - // Orphan all outstanding query contexts that haven't reported back - these will be gc'd - // once all scheduled query callbacks are finished. - // + // Do query + StatusWith<DBClientCursor*> result = systemEnv->doBlockingQuery(host, query); - for (PendingMap::iterator it = _pending.begin(); it != _pending.end(); ++it) { + // Push results back to parent op if it's still around + { + boost_unique_lock lk(parentMutex); - shared_ptr<PendingQueryContext>& pendingContext = it->second; + if (parentOp) + parentOp->noteResult(host, result); + else if (result.isOK()) + delete result.getValue(); + } +} - boost_unique_lock lk(pendingContext->parentMutex); - pendingContext->parentOp = NULL; - } +MultiHostQueryOp::~MultiHostQueryOp() { + // + // Orphan all outstanding query contexts that haven't reported back - these will be gc'd + // once all scheduled query callbacks are finished. + // - // - // Nobody else should be modifying _results now - callbacks don't have access to this op, - // and other clients should know the op is going out of scope - // + for (PendingMap::iterator it = _pending.begin(); it != _pending.end(); ++it) { + shared_ptr<PendingQueryContext>& pendingContext = it->second; - for (ResultMap::iterator it = _results.begin(); it != _results.end(); ++it) { + boost_unique_lock lk(pendingContext->parentMutex); + pendingContext->parentOp = NULL; + } - StatusWith<DBClientCursor*>& result = it->second; + // + // Nobody else should be modifying _results now - callbacks don't have access to this op, + // and other clients should know the op is going out of scope + // - if (result.isOK()) { - delete result.getValue(); - } + for (ResultMap::iterator it = _results.begin(); it != _results.end(); ++it) { + StatusWith<DBClientCursor*>& result = it->second; + + if (result.isOK()) { + delete result.getValue(); } } - +} } |