summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_merge_cursors.cpp
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 00:22:50 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 10:56:02 -0400
commit9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch)
tree3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/pipeline/document_source_merge_cursors.cpp
parent01965cf52bce6976637ecb8f4a622aeb05ab256a (diff)
downloadmongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/pipeline/document_source_merge_cursors.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.cpp221
1 files changed, 106 insertions, 115 deletions
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
index afe924d6a2f..d1e618f35bf 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
@@ -33,146 +33,137 @@
namespace mongo {
- using boost::intrusive_ptr;
- using std::make_pair;
- using std::string;
- using std::vector;
+using boost::intrusive_ptr;
+using std::make_pair;
+using std::string;
+using std::vector;
- const char DocumentSourceMergeCursors::name[] = "$mergeCursors";
+const char DocumentSourceMergeCursors::name[] = "$mergeCursors";
- const char* DocumentSourceMergeCursors::getSourceName() const {
- return name;
- }
+const char* DocumentSourceMergeCursors::getSourceName() const {
+ return name;
+}
- void DocumentSourceMergeCursors::setSource(DocumentSource *pSource) {
- /* this doesn't take a source */
- verify(false);
- }
+void DocumentSourceMergeCursors::setSource(DocumentSource* pSource) {
+ /* this doesn't take a source */
+ verify(false);
+}
- DocumentSourceMergeCursors::DocumentSourceMergeCursors(
- const CursorIds& cursorIds,
- const intrusive_ptr<ExpressionContext> &pExpCtx)
- : DocumentSource(pExpCtx)
- , _cursorIds(cursorIds)
- , _unstarted(true)
- {}
-
- intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create(
- const CursorIds& cursorIds,
- const intrusive_ptr<ExpressionContext> &pExpCtx) {
- return new DocumentSourceMergeCursors(cursorIds, pExpCtx);
- }
+DocumentSourceMergeCursors::DocumentSourceMergeCursors(
+ const CursorIds& cursorIds, const intrusive_ptr<ExpressionContext>& pExpCtx)
+ : DocumentSource(pExpCtx), _cursorIds(cursorIds), _unstarted(true) {}
- intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
- BSONElement elem,
- const intrusive_ptr<ExpressionContext>& pExpCtx) {
+intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create(
+ const CursorIds& cursorIds, const intrusive_ptr<ExpressionContext>& pExpCtx) {
+ return new DocumentSourceMergeCursors(cursorIds, pExpCtx);
+}
- massert(17026, string("Expected an Array, but got a ") + typeName(elem.type()),
- elem.type() == Array);
+intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
+ BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
+ massert(17026,
+ string("Expected an Array, but got a ") + typeName(elem.type()),
+ elem.type() == Array);
+
+ CursorIds cursorIds;
+ BSONObj array = elem.embeddedObject();
+ BSONForEach(cursor, array) {
+ massert(17027,
+ string("Expected an Object, but got a ") + typeName(cursor.type()),
+ cursor.type() == Object);
+
+ cursorIds.push_back(
+ make_pair(ConnectionString(HostAndPort(cursor["host"].String())), cursor["id"].Long()));
+ }
- CursorIds cursorIds;
- BSONObj array = elem.embeddedObject();
- BSONForEach(cursor, array) {
- massert(17027, string("Expected an Object, but got a ") + typeName(cursor.type()),
- cursor.type() == Object);
+ return new DocumentSourceMergeCursors(cursorIds, pExpCtx);
+}
- cursorIds.push_back(make_pair(ConnectionString(HostAndPort(cursor["host"].String())),
- cursor["id"].Long()));
- }
-
- return new DocumentSourceMergeCursors(cursorIds, pExpCtx);
+Value DocumentSourceMergeCursors::serialize(bool explain) const {
+ vector<Value> cursors;
+ for (size_t i = 0; i < _cursorIds.size(); i++) {
+ cursors.push_back(Value(
+ DOC("host" << Value(_cursorIds[i].first.toString()) << "id" << _cursorIds[i].second)));
}
+ return Value(DOC(getSourceName() << Value(cursors)));
+}
- Value DocumentSourceMergeCursors::serialize(bool explain) const {
- vector<Value> cursors;
- for (size_t i = 0; i < _cursorIds.size(); i++) {
- cursors.push_back(Value(DOC("host" << Value(_cursorIds[i].first.toString())
- << "id" << _cursorIds[i].second)));
- }
- return Value(DOC(getSourceName() << Value(cursors)));
+DocumentSourceMergeCursors::CursorAndConnection::CursorAndConnection(ConnectionString host,
+ NamespaceString ns,
+ CursorId id)
+ : connection(host), cursor(connection.get(), ns, id, 0, 0) {}
+
+vector<DBClientCursor*> DocumentSourceMergeCursors::getCursors() {
+ verify(_unstarted);
+ start();
+ vector<DBClientCursor*> out;
+ for (Cursors::const_iterator it = _cursors.begin(); it != _cursors.end(); ++it) {
+ out.push_back(&((*it)->cursor));
}
- DocumentSourceMergeCursors::CursorAndConnection::CursorAndConnection(
- ConnectionString host,
- NamespaceString ns,
- CursorId id)
- : connection(host)
- , cursor(connection.get(), ns, id, 0, 0)
- {}
+ return out;
+}
- vector<DBClientCursor*> DocumentSourceMergeCursors::getCursors() {
- verify(_unstarted);
- start();
- vector<DBClientCursor*> out;
- for (Cursors::const_iterator it = _cursors.begin(); it !=_cursors.end(); ++it) {
- out.push_back(&((*it)->cursor));
- }
+void DocumentSourceMergeCursors::start() {
+ _unstarted = false;
- return out;
+ // open each cursor and send message asking for a batch
+ for (CursorIds::const_iterator it = _cursorIds.begin(); it != _cursorIds.end(); ++it) {
+ _cursors.push_back(
+ std::make_shared<CursorAndConnection>(it->first, pExpCtx->ns, it->second));
+ verify(_cursors.back()->connection->lazySupported());
+ _cursors.back()->cursor.initLazy(); // shouldn't block
}
- void DocumentSourceMergeCursors::start() {
- _unstarted = false;
-
- // open each cursor and send message asking for a batch
- for (CursorIds::const_iterator it = _cursorIds.begin(); it !=_cursorIds.end(); ++it) {
- _cursors.push_back(std::make_shared<CursorAndConnection>(
- it->first, pExpCtx->ns, it->second));
- verify(_cursors.back()->connection->lazySupported());
- _cursors.back()->cursor.initLazy(); // shouldn't block
- }
-
- // wait for all cursors to return a batch
- // TODO need a way to keep cursors alive if some take longer than 10 minutes.
- for (Cursors::const_iterator it = _cursors.begin(); it !=_cursors.end(); ++it) {
- bool retry = false;
- bool ok = (*it)->cursor.initLazyFinish(retry); // blocks here for first batch
-
- uassert(17028,
- "error reading response from " + _cursors.back()->connection->toString(),
- ok);
- verify(!retry);
- }
+ // wait for all cursors to return a batch
+ // TODO need a way to keep cursors alive if some take longer than 10 minutes.
+ for (Cursors::const_iterator it = _cursors.begin(); it != _cursors.end(); ++it) {
+ bool retry = false;
+ bool ok = (*it)->cursor.initLazyFinish(retry); // blocks here for first batch
- _currentCursor = _cursors.begin();
+ uassert(
+ 17028, "error reading response from " + _cursors.back()->connection->toString(), ok);
+ verify(!retry);
}
- Document DocumentSourceMergeCursors::nextSafeFrom(DBClientCursor* cursor) {
- const BSONObj next = cursor->next();
- if (next.hasField("$err")) {
- const int code = next.hasField("code") ? next["code"].numberInt() : 17029;
- uasserted(code, str::stream() << "Received error in response from "
- << cursor->originalHost()
- << ": " << next);
- }
- return Document::fromBsonWithMetaData(next);
+ _currentCursor = _cursors.begin();
+}
+
+Document DocumentSourceMergeCursors::nextSafeFrom(DBClientCursor* cursor) {
+ const BSONObj next = cursor->next();
+ if (next.hasField("$err")) {
+ const int code = next.hasField("code") ? next["code"].numberInt() : 17029;
+ uasserted(code,
+ str::stream() << "Received error in response from " << cursor->originalHost()
+ << ": " << next);
}
+ return Document::fromBsonWithMetaData(next);
+}
- boost::optional<Document> DocumentSourceMergeCursors::getNext() {
- if (_unstarted)
- start();
+boost::optional<Document> DocumentSourceMergeCursors::getNext() {
+ if (_unstarted)
+ start();
- // purge eof cursors and release their connections
- while (!_cursors.empty() && !(*_currentCursor)->cursor.more()) {
- (*_currentCursor)->connection.done();
- _cursors.erase(_currentCursor);
- _currentCursor = _cursors.begin();
- }
+ // purge eof cursors and release their connections
+ while (!_cursors.empty() && !(*_currentCursor)->cursor.more()) {
+ (*_currentCursor)->connection.done();
+ _cursors.erase(_currentCursor);
+ _currentCursor = _cursors.begin();
+ }
- if (_cursors.empty())
- return boost::none;
+ if (_cursors.empty())
+ return boost::none;
- const Document next = nextSafeFrom(&((*_currentCursor)->cursor));
+ const Document next = nextSafeFrom(&((*_currentCursor)->cursor));
- // advance _currentCursor, wrapping if needed
- if (++_currentCursor == _cursors.end())
- _currentCursor = _cursors.begin();
+ // advance _currentCursor, wrapping if needed
+ if (++_currentCursor == _cursors.end())
+ _currentCursor = _cursors.begin();
- return next;
- }
+ return next;
+}
- void DocumentSourceMergeCursors::dispose() {
- _cursors.clear();
- _currentCursor = _cursors.end();
- }
+void DocumentSourceMergeCursors::dispose() {
+ _cursors.clear();
+ _currentCursor = _cursors.end();
+}
}