diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 00:22:50 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 10:56:02 -0400 |
commit | 9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch) | |
tree | 3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/pipeline/document_source_merge_cursors.cpp | |
parent | 01965cf52bce6976637ecb8f4a622aeb05ab256a (diff) | |
download | mongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz |
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/pipeline/document_source_merge_cursors.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_merge_cursors.cpp | 221 |
1 files changed, 106 insertions, 115 deletions
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp index afe924d6a2f..d1e618f35bf 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp @@ -33,146 +33,137 @@ namespace mongo { - using boost::intrusive_ptr; - using std::make_pair; - using std::string; - using std::vector; +using boost::intrusive_ptr; +using std::make_pair; +using std::string; +using std::vector; - const char DocumentSourceMergeCursors::name[] = "$mergeCursors"; +const char DocumentSourceMergeCursors::name[] = "$mergeCursors"; - const char* DocumentSourceMergeCursors::getSourceName() const { - return name; - } +const char* DocumentSourceMergeCursors::getSourceName() const { + return name; +} - void DocumentSourceMergeCursors::setSource(DocumentSource *pSource) { - /* this doesn't take a source */ - verify(false); - } +void DocumentSourceMergeCursors::setSource(DocumentSource* pSource) { + /* this doesn't take a source */ + verify(false); +} - DocumentSourceMergeCursors::DocumentSourceMergeCursors( - const CursorIds& cursorIds, - const intrusive_ptr<ExpressionContext> &pExpCtx) - : DocumentSource(pExpCtx) - , _cursorIds(cursorIds) - , _unstarted(true) - {} - - intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create( - const CursorIds& cursorIds, - const intrusive_ptr<ExpressionContext> &pExpCtx) { - return new DocumentSourceMergeCursors(cursorIds, pExpCtx); - } +DocumentSourceMergeCursors::DocumentSourceMergeCursors( + const CursorIds& cursorIds, const intrusive_ptr<ExpressionContext>& pExpCtx) + : DocumentSource(pExpCtx), _cursorIds(cursorIds), _unstarted(true) {} - intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson( - BSONElement elem, - const intrusive_ptr<ExpressionContext>& pExpCtx) { +intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create( + const CursorIds& cursorIds, const intrusive_ptr<ExpressionContext>& pExpCtx) { + return new DocumentSourceMergeCursors(cursorIds, pExpCtx); +} - massert(17026, string("Expected an Array, but got a ") + typeName(elem.type()), - elem.type() == Array); +intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson( + BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { + massert(17026, + string("Expected an Array, but got a ") + typeName(elem.type()), + elem.type() == Array); + + CursorIds cursorIds; + BSONObj array = elem.embeddedObject(); + BSONForEach(cursor, array) { + massert(17027, + string("Expected an Object, but got a ") + typeName(cursor.type()), + cursor.type() == Object); + + cursorIds.push_back( + make_pair(ConnectionString(HostAndPort(cursor["host"].String())), cursor["id"].Long())); + } - CursorIds cursorIds; - BSONObj array = elem.embeddedObject(); - BSONForEach(cursor, array) { - massert(17027, string("Expected an Object, but got a ") + typeName(cursor.type()), - cursor.type() == Object); + return new DocumentSourceMergeCursors(cursorIds, pExpCtx); +} - cursorIds.push_back(make_pair(ConnectionString(HostAndPort(cursor["host"].String())), - cursor["id"].Long())); - } - - return new DocumentSourceMergeCursors(cursorIds, pExpCtx); +Value DocumentSourceMergeCursors::serialize(bool explain) const { + vector<Value> cursors; + for (size_t i = 0; i < _cursorIds.size(); i++) { + cursors.push_back(Value( + DOC("host" << Value(_cursorIds[i].first.toString()) << "id" << _cursorIds[i].second))); } + return Value(DOC(getSourceName() << Value(cursors))); +} - Value DocumentSourceMergeCursors::serialize(bool explain) const { - vector<Value> cursors; - for (size_t i = 0; i < _cursorIds.size(); i++) { - cursors.push_back(Value(DOC("host" << Value(_cursorIds[i].first.toString()) - << "id" << _cursorIds[i].second))); - } - return Value(DOC(getSourceName() << Value(cursors))); +DocumentSourceMergeCursors::CursorAndConnection::CursorAndConnection(ConnectionString host, + NamespaceString ns, + CursorId id) + : connection(host), cursor(connection.get(), ns, id, 0, 0) {} + +vector<DBClientCursor*> DocumentSourceMergeCursors::getCursors() { + verify(_unstarted); + start(); + vector<DBClientCursor*> out; + for (Cursors::const_iterator it = _cursors.begin(); it != _cursors.end(); ++it) { + out.push_back(&((*it)->cursor)); } - DocumentSourceMergeCursors::CursorAndConnection::CursorAndConnection( - ConnectionString host, - NamespaceString ns, - CursorId id) - : connection(host) - , cursor(connection.get(), ns, id, 0, 0) - {} + return out; +} - vector<DBClientCursor*> DocumentSourceMergeCursors::getCursors() { - verify(_unstarted); - start(); - vector<DBClientCursor*> out; - for (Cursors::const_iterator it = _cursors.begin(); it !=_cursors.end(); ++it) { - out.push_back(&((*it)->cursor)); - } +void DocumentSourceMergeCursors::start() { + _unstarted = false; - return out; + // open each cursor and send message asking for a batch + for (CursorIds::const_iterator it = _cursorIds.begin(); it != _cursorIds.end(); ++it) { + _cursors.push_back( + std::make_shared<CursorAndConnection>(it->first, pExpCtx->ns, it->second)); + verify(_cursors.back()->connection->lazySupported()); + _cursors.back()->cursor.initLazy(); // shouldn't block } - void DocumentSourceMergeCursors::start() { - _unstarted = false; - - // open each cursor and send message asking for a batch - for (CursorIds::const_iterator it = _cursorIds.begin(); it !=_cursorIds.end(); ++it) { - _cursors.push_back(std::make_shared<CursorAndConnection>( - it->first, pExpCtx->ns, it->second)); - verify(_cursors.back()->connection->lazySupported()); - _cursors.back()->cursor.initLazy(); // shouldn't block - } - - // wait for all cursors to return a batch - // TODO need a way to keep cursors alive if some take longer than 10 minutes. - for (Cursors::const_iterator it = _cursors.begin(); it !=_cursors.end(); ++it) { - bool retry = false; - bool ok = (*it)->cursor.initLazyFinish(retry); // blocks here for first batch - - uassert(17028, - "error reading response from " + _cursors.back()->connection->toString(), - ok); - verify(!retry); - } + // wait for all cursors to return a batch + // TODO need a way to keep cursors alive if some take longer than 10 minutes. + for (Cursors::const_iterator it = _cursors.begin(); it != _cursors.end(); ++it) { + bool retry = false; + bool ok = (*it)->cursor.initLazyFinish(retry); // blocks here for first batch - _currentCursor = _cursors.begin(); + uassert( + 17028, "error reading response from " + _cursors.back()->connection->toString(), ok); + verify(!retry); } - Document DocumentSourceMergeCursors::nextSafeFrom(DBClientCursor* cursor) { - const BSONObj next = cursor->next(); - if (next.hasField("$err")) { - const int code = next.hasField("code") ? next["code"].numberInt() : 17029; - uasserted(code, str::stream() << "Received error in response from " - << cursor->originalHost() - << ": " << next); - } - return Document::fromBsonWithMetaData(next); + _currentCursor = _cursors.begin(); +} + +Document DocumentSourceMergeCursors::nextSafeFrom(DBClientCursor* cursor) { + const BSONObj next = cursor->next(); + if (next.hasField("$err")) { + const int code = next.hasField("code") ? next["code"].numberInt() : 17029; + uasserted(code, + str::stream() << "Received error in response from " << cursor->originalHost() + << ": " << next); } + return Document::fromBsonWithMetaData(next); +} - boost::optional<Document> DocumentSourceMergeCursors::getNext() { - if (_unstarted) - start(); +boost::optional<Document> DocumentSourceMergeCursors::getNext() { + if (_unstarted) + start(); - // purge eof cursors and release their connections - while (!_cursors.empty() && !(*_currentCursor)->cursor.more()) { - (*_currentCursor)->connection.done(); - _cursors.erase(_currentCursor); - _currentCursor = _cursors.begin(); - } + // purge eof cursors and release their connections + while (!_cursors.empty() && !(*_currentCursor)->cursor.more()) { + (*_currentCursor)->connection.done(); + _cursors.erase(_currentCursor); + _currentCursor = _cursors.begin(); + } - if (_cursors.empty()) - return boost::none; + if (_cursors.empty()) + return boost::none; - const Document next = nextSafeFrom(&((*_currentCursor)->cursor)); + const Document next = nextSafeFrom(&((*_currentCursor)->cursor)); - // advance _currentCursor, wrapping if needed - if (++_currentCursor == _cursors.end()) - _currentCursor = _cursors.begin(); + // advance _currentCursor, wrapping if needed + if (++_currentCursor == _cursors.end()) + _currentCursor = _cursors.begin(); - return next; - } + return next; +} - void DocumentSourceMergeCursors::dispose() { - _cursors.clear(); - _currentCursor = _cursors.end(); - } +void DocumentSourceMergeCursors::dispose() { + _cursors.clear(); + _currentCursor = _cursors.end(); +} } |