summaryrefslogtreecommitdiff
path: root/src/mongo/client/dbclientcursor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/dbclientcursor.cpp')
-rw-r--r--src/mongo/client/dbclientcursor.cpp578
1 files changed, 292 insertions, 286 deletions
diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp
index 47152ccbe89..d3ec7160ca3 100644
--- a/src/mongo/client/dbclientcursor.cpp
+++ b/src/mongo/client/dbclientcursor.cpp
@@ -44,354 +44,360 @@
namespace mongo {
- using std::auto_ptr;
- using std::endl;
- using std::string;
- using std::vector;
-
- void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend );
-
- void DBClientCursor::_finishConsInit() {
- _originalHost = _client->getServerAddress();
+using std::auto_ptr;
+using std::endl;
+using std::string;
+using std::vector;
+
+void assembleRequest(const string& ns,
+ BSONObj query,
+ int nToReturn,
+ int nToSkip,
+ const BSONObj* fieldsToReturn,
+ int queryOptions,
+ Message& toSend);
+
+void DBClientCursor::_finishConsInit() {
+ _originalHost = _client->getServerAddress();
+}
+
+int DBClientCursor::nextBatchSize() {
+ if (nToReturn == 0)
+ return batchSize;
+
+ if (batchSize == 0)
+ return nToReturn;
+
+ return batchSize < nToReturn ? batchSize : nToReturn;
+}
+
+void DBClientCursor::_assembleInit(Message& toSend) {
+ if (!cursorId) {
+ assembleRequest(ns, query, nextBatchSize(), nToSkip, fieldsToReturn, opts, toSend);
+ } else {
+ BufBuilder b;
+ b.appendNum(opts);
+ b.appendStr(ns);
+ b.appendNum(nToReturn);
+ b.appendNum(cursorId);
+ toSend.setData(dbGetMore, b.buf(), b.len());
}
-
- int DBClientCursor::nextBatchSize() {
-
- if ( nToReturn == 0 )
- return batchSize;
-
- if ( batchSize == 0 )
- return nToReturn;
-
- return batchSize < nToReturn ? batchSize : nToReturn;
+}
+
+bool DBClientCursor::init() {
+ Message toSend;
+ _assembleInit(toSend);
+ verify(_client);
+ if (!_client->call(toSend, *batch.m, false, &_originalHost)) {
+ // log msg temp?
+ log() << "DBClientCursor::init call() failed" << endl;
+ return false;
}
-
- void DBClientCursor::_assembleInit( Message& toSend ) {
- if ( !cursorId ) {
- assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend );
- }
- else {
- BufBuilder b;
- b.appendNum( opts );
- b.appendStr( ns );
- b.appendNum( nToReturn );
- b.appendNum( cursorId );
- toSend.setData( dbGetMore, b.buf(), b.len() );
- }
+ if (batch.m->empty()) {
+ // log msg temp?
+ log() << "DBClientCursor::init message from call() was empty" << endl;
+ return false;
}
-
- bool DBClientCursor::init() {
- Message toSend;
- _assembleInit( toSend );
- verify( _client );
- if ( !_client->call( toSend, *batch.m, false, &_originalHost ) ) {
- // log msg temp?
- log() << "DBClientCursor::init call() failed" << endl;
- return false;
- }
- if ( batch.m->empty() ) {
- // log msg temp?
- log() << "DBClientCursor::init message from call() was empty" << endl;
- return false;
+ dataReceived();
+ return true;
+}
+
+void DBClientCursor::initLazy(bool isRetry) {
+ massert(15875,
+ "DBClientCursor::initLazy called on a client that doesn't support lazy",
+ _client->lazySupported());
+ if (DBClientWithCommands::RunCommandHookFunc hook = _client->getRunCommandHook()) {
+ if (NamespaceString(ns).isCommand()) {
+ BSONObjBuilder bob;
+ bob.appendElements(query);
+ hook(&bob);
+ query = bob.obj();
}
- dataReceived();
- return true;
}
-
- void DBClientCursor::initLazy( bool isRetry ) {
- massert( 15875 , "DBClientCursor::initLazy called on a client that doesn't support lazy" , _client->lazySupported() );
- if (DBClientWithCommands::RunCommandHookFunc hook = _client->getRunCommandHook()) {
- if (NamespaceString(ns).isCommand()) {
- BSONObjBuilder bob;
- bob.appendElements(query);
- hook(&bob);
- query = bob.obj();
- }
- }
-
- Message toSend;
- _assembleInit( toSend );
- _client->say( toSend, isRetry, &_originalHost );
- }
-
- bool DBClientCursor::initLazyFinish( bool& retry ) {
- bool recvd = _client->recv( *batch.m );
+ Message toSend;
+ _assembleInit(toSend);
+ _client->say(toSend, isRetry, &_originalHost);
+}
- // If we get a bad response, return false
- if ( ! recvd || batch.m->empty() ) {
+bool DBClientCursor::initLazyFinish(bool& retry) {
+ bool recvd = _client->recv(*batch.m);
- if( !recvd )
- log() << "DBClientCursor::init lazy say() failed" << endl;
- if( batch.m->empty() )
- log() << "DBClientCursor::init message from say() was empty" << endl;
+ // If we get a bad response, return false
+ if (!recvd || batch.m->empty()) {
+ if (!recvd)
+ log() << "DBClientCursor::init lazy say() failed" << endl;
+ if (batch.m->empty())
+ log() << "DBClientCursor::init message from say() was empty" << endl;
- _client->checkResponse( NULL, -1, &retry, &_lazyHost );
+ _client->checkResponse(NULL, -1, &retry, &_lazyHost);
- return false;
-
- }
+ return false;
+ }
- dataReceived( retry, _lazyHost );
+ dataReceived(retry, _lazyHost);
- if (DBClientWithCommands::PostRunCommandHookFunc hook = _client->getPostRunCommandHook()) {
- if (NamespaceString(ns).isCommand()) {
- BSONObj cmdResponse = peekFirst();
- hook(cmdResponse, _lazyHost);
- }
+ if (DBClientWithCommands::PostRunCommandHookFunc hook = _client->getPostRunCommandHook()) {
+ if (NamespaceString(ns).isCommand()) {
+ BSONObj cmdResponse = peekFirst();
+ hook(cmdResponse, _lazyHost);
}
-
- return ! retry;
}
- bool DBClientCursor::initCommand(){
- BSONObj res;
+ return !retry;
+}
- bool ok = _client->runCommand( nsGetDB( ns ), query, res, opts );
- replyToQuery( 0, *batch.m, res );
- dataReceived();
+bool DBClientCursor::initCommand() {
+ BSONObj res;
- return ok;
- }
+ bool ok = _client->runCommand(nsGetDB(ns), query, res, opts);
+ replyToQuery(0, *batch.m, res);
+ dataReceived();
- void DBClientCursor::requestMore() {
- verify( cursorId && batch.pos == batch.nReturned );
+ return ok;
+}
- if (haveLimit) {
- nToReturn -= batch.nReturned;
- verify(nToReturn > 0);
- }
- BufBuilder b;
- b.appendNum(opts);
- b.appendStr(ns);
- b.appendNum(nextBatchSize());
- b.appendNum(cursorId);
+void DBClientCursor::requestMore() {
+ verify(cursorId && batch.pos == batch.nReturned);
- Message toSend;
- toSend.setData(dbGetMore, b.buf(), b.len());
- auto_ptr<Message> response(new Message());
-
- if ( _client ) {
- _client->call( toSend, *response );
- this->batch.m = response;
- dataReceived();
- }
- else {
- verify( _scopedHost.size() );
- ScopedDbConnection conn(_scopedHost);
- conn->call( toSend , *response );
- _client = conn.get();
- this->batch.m = response;
- dataReceived();
- _client = 0;
- conn.done();
- }
+ if (haveLimit) {
+ nToReturn -= batch.nReturned;
+ verify(nToReturn > 0);
}
-
- /** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */
- void DBClientCursor::exhaustReceiveMore() {
- verify( cursorId && batch.pos == batch.nReturned );
- verify( !haveLimit );
- auto_ptr<Message> response(new Message());
- verify( _client );
- if (!_client->recv(*response)) {
- uasserted(16465, "recv failed while exhausting cursor");
- }
- batch.m = response;
+ BufBuilder b;
+ b.appendNum(opts);
+ b.appendStr(ns);
+ b.appendNum(nextBatchSize());
+ b.appendNum(cursorId);
+
+ Message toSend;
+ toSend.setData(dbGetMore, b.buf(), b.len());
+ auto_ptr<Message> response(new Message());
+
+ if (_client) {
+ _client->call(toSend, *response);
+ this->batch.m = response;
+ dataReceived();
+ } else {
+ verify(_scopedHost.size());
+ ScopedDbConnection conn(_scopedHost);
+ conn->call(toSend, *response);
+ _client = conn.get();
+ this->batch.m = response;
dataReceived();
+ _client = 0;
+ conn.done();
}
+}
+
+/** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */
+void DBClientCursor::exhaustReceiveMore() {
+ verify(cursorId && batch.pos == batch.nReturned);
+ verify(!haveLimit);
+ auto_ptr<Message> response(new Message());
+ verify(_client);
+ if (!_client->recv(*response)) {
+ uasserted(16465, "recv failed while exhausting cursor");
+ }
+ batch.m = response;
+ dataReceived();
+}
- void DBClientCursor::dataReceived( bool& retry, string& host ) {
-
- QueryResult::View qr = batch.m->singleData().view2ptr();
- resultFlags = qr.getResultFlags();
-
- if ( qr.getResultFlags() & ResultFlag_ErrSet ) {
- wasError = true;
- }
+void DBClientCursor::dataReceived(bool& retry, string& host) {
+ QueryResult::View qr = batch.m->singleData().view2ptr();
+ resultFlags = qr.getResultFlags();
- if ( qr.getResultFlags() & ResultFlag_CursorNotFound ) {
- // cursor id no longer valid at the server.
- verify( qr.getCursorId() == 0 );
- cursorId = 0; // 0 indicates no longer valid (dead)
- if ( ! ( opts & QueryOption_CursorTailable ) )
- throw UserException( 13127 , "getMore: cursor didn't exist on server, possible restart or timeout?" );
- }
+ if (qr.getResultFlags() & ResultFlag_ErrSet) {
+ wasError = true;
+ }
- if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) {
- // only set initially: we don't want to kill it on end of data
- // if it's a tailable cursor
- cursorId = qr.getCursorId();
- }
+ if (qr.getResultFlags() & ResultFlag_CursorNotFound) {
+ // cursor id no longer valid at the server.
+ verify(qr.getCursorId() == 0);
+ cursorId = 0; // 0 indicates no longer valid (dead)
+ if (!(opts & QueryOption_CursorTailable))
+ throw UserException(
+ 13127, "getMore: cursor didn't exist on server, possible restart or timeout?");
+ }
- batch.nReturned = qr.getNReturned();
- batch.pos = 0;
- batch.data = qr.data();
+ if (cursorId == 0 || !(opts & QueryOption_CursorTailable)) {
+ // only set initially: we don't want to kill it on end of data
+ // if it's a tailable cursor
+ cursorId = qr.getCursorId();
+ }
- _client->checkResponse( batch.data, batch.nReturned, &retry, &host ); // watches for "not master"
+ batch.nReturned = qr.getNReturned();
+ batch.pos = 0;
+ batch.data = qr.data();
- if( qr.getResultFlags() & ResultFlag_ShardConfigStale ) {
- BSONObj error;
- verify( peekError( &error ) );
- throw RecvStaleConfigException( (string)"stale config on lazy receive" + causedBy( getErrField( error ) ), error );
- }
+ _client->checkResponse(batch.data, batch.nReturned, &retry, &host); // watches for "not master"
- /* this assert would fire the way we currently work:
- verify( nReturned || cursorId == 0 );
- */
+ if (qr.getResultFlags() & ResultFlag_ShardConfigStale) {
+ BSONObj error;
+ verify(peekError(&error));
+ throw RecvStaleConfigException(
+ (string) "stale config on lazy receive" + causedBy(getErrField(error)), error);
}
- /** If true, safe to call next(). Requests more from server if necessary. */
- bool DBClientCursor::more() {
- _assertIfNull();
-
- if ( !_putBack.empty() )
- return true;
+ /* this assert would fire the way we currently work:
+ verify( nReturned || cursorId == 0 );
+ */
+}
- if (haveLimit && batch.pos >= nToReturn)
- return false;
+/** If true, safe to call next(). Requests more from server if necessary. */
+bool DBClientCursor::more() {
+ _assertIfNull();
- if ( batch.pos < batch.nReturned )
- return true;
+ if (!_putBack.empty())
+ return true;
- if ( cursorId == 0 )
- return false;
+ if (haveLimit && batch.pos >= nToReturn)
+ return false;
- requestMore();
- return batch.pos < batch.nReturned;
- }
+ if (batch.pos < batch.nReturned)
+ return true;
- BSONObj DBClientCursor::next() {
- DEV _assertIfNull();
- if ( !_putBack.empty() ) {
- BSONObj ret = _putBack.top();
- _putBack.pop();
- return ret;
- }
+ if (cursorId == 0)
+ return false;
- uassert(13422, "DBClientCursor next() called but more() is false", batch.pos < batch.nReturned);
+ requestMore();
+ return batch.pos < batch.nReturned;
+}
- batch.pos++;
- BSONObj o(batch.data);
- batch.data += o.objsize();
- /* todo would be good to make data null at end of batch for safety */
- return o;
+BSONObj DBClientCursor::next() {
+ DEV _assertIfNull();
+ if (!_putBack.empty()) {
+ BSONObj ret = _putBack.top();
+ _putBack.pop();
+ return ret;
}
- BSONObj DBClientCursor::nextSafe() {
- BSONObj o = next();
- if( this->wasError && strcmp(o.firstElementFieldName(), "$err") == 0 ) {
- std::string s = "nextSafe(): " + o.toString();
- LOG(5) << s;
- uasserted(13106, s);
- }
- return o;
+ uassert(13422, "DBClientCursor next() called but more() is false", batch.pos < batch.nReturned);
+
+ batch.pos++;
+ BSONObj o(batch.data);
+ batch.data += o.objsize();
+ /* todo would be good to make data null at end of batch for safety */
+ return o;
+}
+
+BSONObj DBClientCursor::nextSafe() {
+ BSONObj o = next();
+ if (this->wasError && strcmp(o.firstElementFieldName(), "$err") == 0) {
+ std::string s = "nextSafe(): " + o.toString();
+ LOG(5) << s;
+ uasserted(13106, s);
}
-
- void DBClientCursor::peek(vector<BSONObj>& v, int atMost) {
- int m = atMost;
-
- /*
- for( stack<BSONObj>::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) {
- if( m == 0 )
- return;
- v.push_back(*i);
- m--;
- n++;
- }
- */
-
- int p = batch.pos;
- const char *d = batch.data;
- while( m && p < batch.nReturned ) {
- BSONObj o(d);
- d += o.objsize();
- p++;
- m--;
- v.push_back(o);
- }
+ return o;
+}
+
+void DBClientCursor::peek(vector<BSONObj>& v, int atMost) {
+ int m = atMost;
+
+ /*
+ for( stack<BSONObj>::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) {
+ if( m == 0 )
+ return;
+ v.push_back(*i);
+ m--;
+ n++;
}
-
- BSONObj DBClientCursor::peekFirst(){
- vector<BSONObj> v;
- peek( v, 1 );
-
- if( v.size() > 0 ) return v[0];
- else return BSONObj();
+ */
+
+ int p = batch.pos;
+ const char* d = batch.data;
+ while (m && p < batch.nReturned) {
+ BSONObj o(d);
+ d += o.objsize();
+ p++;
+ m--;
+ v.push_back(o);
}
-
- bool DBClientCursor::peekError(BSONObj* error){
- if( ! wasError ) return false;
-
- vector<BSONObj> v;
- peek(v, 1);
-
- verify( v.size() == 1 );
- verify( hasErrField( v[0] ) );
-
- if( error ) *error = v[0].getOwned();
- return true;
+}
+
+BSONObj DBClientCursor::peekFirst() {
+ vector<BSONObj> v;
+ peek(v, 1);
+
+ if (v.size() > 0)
+ return v[0];
+ else
+ return BSONObj();
+}
+
+bool DBClientCursor::peekError(BSONObj* error) {
+ if (!wasError)
+ return false;
+
+ vector<BSONObj> v;
+ peek(v, 1);
+
+ verify(v.size() == 1);
+ verify(hasErrField(v[0]));
+
+ if (error)
+ *error = v[0].getOwned();
+ return true;
+}
+
+void DBClientCursor::attach(AScopedConnection* conn) {
+ verify(_scopedHost.size() == 0);
+ verify(conn);
+ verify(conn->get());
+
+ if (conn->get()->type() == ConnectionString::SET ||
+ conn->get()->type() == ConnectionString::SYNC) {
+ if (_lazyHost.size() > 0)
+ _scopedHost = _lazyHost;
+ else if (_client)
+ _scopedHost = _client->getServerAddress();
+ else
+ massert(14821,
+ "No client or lazy client specified, cannot store multi-host connection.",
+ false);
+ } else {
+ _scopedHost = conn->getHost();
}
- void DBClientCursor::attach( AScopedConnection * conn ) {
- verify( _scopedHost.size() == 0 );
- verify( conn );
- verify( conn->get() );
-
- if ( conn->get()->type() == ConnectionString::SET ||
- conn->get()->type() == ConnectionString::SYNC ) {
- if( _lazyHost.size() > 0 )
- _scopedHost = _lazyHost;
- else if( _client )
- _scopedHost = _client->getServerAddress();
- else
- massert(14821, "No client or lazy client specified, cannot store multi-host connection.", false);
- }
- else {
- _scopedHost = conn->getHost();
- }
-
- conn->done();
- _client = 0;
- _lazyHost = "";
- }
+ conn->done();
+ _client = 0;
+ _lazyHost = "";
+}
- DBClientCursor::~DBClientCursor() {
- DESTRUCTOR_GUARD (
+DBClientCursor::~DBClientCursor() {
+ DESTRUCTOR_GUARD(
- if ( cursorId && _ownCursor && ! inShutdown() ) {
+ if (cursorId && _ownCursor && !inShutdown()) {
BufBuilder b;
- b.appendNum( (int)0 ); // reserved
- b.appendNum( (int)1 ); // number
- b.appendNum( cursorId );
-
- Message m;
- m.setData( dbKillCursors , b.buf() , b.len() );
+ b.appendNum((int)0); // reserved
+ b.appendNum((int)1); // number
+ b.appendNum(cursorId);
- if ( _client ) {
+ Message m;
+ m.setData(dbKillCursors, b.buf(), b.len());
+ if (_client) {
// Kill the cursor the same way the connection itself would. Usually, non-lazily
- if( DBClientConnection::getLazyKillCursor() )
- _client->sayPiggyBack( m );
+ if (DBClientConnection::getLazyKillCursor())
+ _client->sayPiggyBack(m);
else
- _client->say( m );
+ _client->say(m);
- }
- else {
- verify( _scopedHost.size() );
+ } else {
+ verify(_scopedHost.size());
ScopedDbConnection conn(_scopedHost);
- if( DBClientConnection::getLazyKillCursor() )
- conn->sayPiggyBack( m );
+ if (DBClientConnection::getLazyKillCursor())
+ conn->sayPiggyBack(m);
else
- conn->say( m );
+ conn->say(m);
conn.done();
}
}
);
- }
+}
-} // namespace mongo
+} // namespace mongo