diff options
Diffstat (limited to 'src/mongo/client/dbclientcursor.cpp')
-rw-r--r-- | src/mongo/client/dbclientcursor.cpp | 578 |
1 files changed, 292 insertions, 286 deletions
diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp index 47152ccbe89..d3ec7160ca3 100644 --- a/src/mongo/client/dbclientcursor.cpp +++ b/src/mongo/client/dbclientcursor.cpp @@ -44,354 +44,360 @@ namespace mongo { - using std::auto_ptr; - using std::endl; - using std::string; - using std::vector; - - void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ); - - void DBClientCursor::_finishConsInit() { - _originalHost = _client->getServerAddress(); +using std::auto_ptr; +using std::endl; +using std::string; +using std::vector; + +void assembleRequest(const string& ns, + BSONObj query, + int nToReturn, + int nToSkip, + const BSONObj* fieldsToReturn, + int queryOptions, + Message& toSend); + +void DBClientCursor::_finishConsInit() { + _originalHost = _client->getServerAddress(); +} + +int DBClientCursor::nextBatchSize() { + if (nToReturn == 0) + return batchSize; + + if (batchSize == 0) + return nToReturn; + + return batchSize < nToReturn ? batchSize : nToReturn; +} + +void DBClientCursor::_assembleInit(Message& toSend) { + if (!cursorId) { + assembleRequest(ns, query, nextBatchSize(), nToSkip, fieldsToReturn, opts, toSend); + } else { + BufBuilder b; + b.appendNum(opts); + b.appendStr(ns); + b.appendNum(nToReturn); + b.appendNum(cursorId); + toSend.setData(dbGetMore, b.buf(), b.len()); } - - int DBClientCursor::nextBatchSize() { - - if ( nToReturn == 0 ) - return batchSize; - - if ( batchSize == 0 ) - return nToReturn; - - return batchSize < nToReturn ? batchSize : nToReturn; +} + +bool DBClientCursor::init() { + Message toSend; + _assembleInit(toSend); + verify(_client); + if (!_client->call(toSend, *batch.m, false, &_originalHost)) { + // log msg temp? + log() << "DBClientCursor::init call() failed" << endl; + return false; } - - void DBClientCursor::_assembleInit( Message& toSend ) { - if ( !cursorId ) { - assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend ); - } - else { - BufBuilder b; - b.appendNum( opts ); - b.appendStr( ns ); - b.appendNum( nToReturn ); - b.appendNum( cursorId ); - toSend.setData( dbGetMore, b.buf(), b.len() ); - } + if (batch.m->empty()) { + // log msg temp? + log() << "DBClientCursor::init message from call() was empty" << endl; + return false; } - - bool DBClientCursor::init() { - Message toSend; - _assembleInit( toSend ); - verify( _client ); - if ( !_client->call( toSend, *batch.m, false, &_originalHost ) ) { - // log msg temp? - log() << "DBClientCursor::init call() failed" << endl; - return false; - } - if ( batch.m->empty() ) { - // log msg temp? - log() << "DBClientCursor::init message from call() was empty" << endl; - return false; + dataReceived(); + return true; +} + +void DBClientCursor::initLazy(bool isRetry) { + massert(15875, + "DBClientCursor::initLazy called on a client that doesn't support lazy", + _client->lazySupported()); + if (DBClientWithCommands::RunCommandHookFunc hook = _client->getRunCommandHook()) { + if (NamespaceString(ns).isCommand()) { + BSONObjBuilder bob; + bob.appendElements(query); + hook(&bob); + query = bob.obj(); } - dataReceived(); - return true; } - - void DBClientCursor::initLazy( bool isRetry ) { - massert( 15875 , "DBClientCursor::initLazy called on a client that doesn't support lazy" , _client->lazySupported() ); - if (DBClientWithCommands::RunCommandHookFunc hook = _client->getRunCommandHook()) { - if (NamespaceString(ns).isCommand()) { - BSONObjBuilder bob; - bob.appendElements(query); - hook(&bob); - query = bob.obj(); - } - } - - Message toSend; - _assembleInit( toSend ); - _client->say( toSend, isRetry, &_originalHost ); - } - - bool DBClientCursor::initLazyFinish( bool& retry ) { - bool recvd = _client->recv( *batch.m ); + Message toSend; + _assembleInit(toSend); + _client->say(toSend, isRetry, &_originalHost); +} - // If we get a bad response, return false - if ( ! recvd || batch.m->empty() ) { +bool DBClientCursor::initLazyFinish(bool& retry) { + bool recvd = _client->recv(*batch.m); - if( !recvd ) - log() << "DBClientCursor::init lazy say() failed" << endl; - if( batch.m->empty() ) - log() << "DBClientCursor::init message from say() was empty" << endl; + // If we get a bad response, return false + if (!recvd || batch.m->empty()) { + if (!recvd) + log() << "DBClientCursor::init lazy say() failed" << endl; + if (batch.m->empty()) + log() << "DBClientCursor::init message from say() was empty" << endl; - _client->checkResponse( NULL, -1, &retry, &_lazyHost ); + _client->checkResponse(NULL, -1, &retry, &_lazyHost); - return false; - - } + return false; + } - dataReceived( retry, _lazyHost ); + dataReceived(retry, _lazyHost); - if (DBClientWithCommands::PostRunCommandHookFunc hook = _client->getPostRunCommandHook()) { - if (NamespaceString(ns).isCommand()) { - BSONObj cmdResponse = peekFirst(); - hook(cmdResponse, _lazyHost); - } + if (DBClientWithCommands::PostRunCommandHookFunc hook = _client->getPostRunCommandHook()) { + if (NamespaceString(ns).isCommand()) { + BSONObj cmdResponse = peekFirst(); + hook(cmdResponse, _lazyHost); } - - return ! retry; } - bool DBClientCursor::initCommand(){ - BSONObj res; + return !retry; +} - bool ok = _client->runCommand( nsGetDB( ns ), query, res, opts ); - replyToQuery( 0, *batch.m, res ); - dataReceived(); +bool DBClientCursor::initCommand() { + BSONObj res; - return ok; - } + bool ok = _client->runCommand(nsGetDB(ns), query, res, opts); + replyToQuery(0, *batch.m, res); + dataReceived(); - void DBClientCursor::requestMore() { - verify( cursorId && batch.pos == batch.nReturned ); + return ok; +} - if (haveLimit) { - nToReturn -= batch.nReturned; - verify(nToReturn > 0); - } - BufBuilder b; - b.appendNum(opts); - b.appendStr(ns); - b.appendNum(nextBatchSize()); - b.appendNum(cursorId); +void DBClientCursor::requestMore() { + verify(cursorId && batch.pos == batch.nReturned); - Message toSend; - toSend.setData(dbGetMore, b.buf(), b.len()); - auto_ptr<Message> response(new Message()); - - if ( _client ) { - _client->call( toSend, *response ); - this->batch.m = response; - dataReceived(); - } - else { - verify( _scopedHost.size() ); - ScopedDbConnection conn(_scopedHost); - conn->call( toSend , *response ); - _client = conn.get(); - this->batch.m = response; - dataReceived(); - _client = 0; - conn.done(); - } + if (haveLimit) { + nToReturn -= batch.nReturned; + verify(nToReturn > 0); } - - /** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */ - void DBClientCursor::exhaustReceiveMore() { - verify( cursorId && batch.pos == batch.nReturned ); - verify( !haveLimit ); - auto_ptr<Message> response(new Message()); - verify( _client ); - if (!_client->recv(*response)) { - uasserted(16465, "recv failed while exhausting cursor"); - } - batch.m = response; + BufBuilder b; + b.appendNum(opts); + b.appendStr(ns); + b.appendNum(nextBatchSize()); + b.appendNum(cursorId); + + Message toSend; + toSend.setData(dbGetMore, b.buf(), b.len()); + auto_ptr<Message> response(new Message()); + + if (_client) { + _client->call(toSend, *response); + this->batch.m = response; + dataReceived(); + } else { + verify(_scopedHost.size()); + ScopedDbConnection conn(_scopedHost); + conn->call(toSend, *response); + _client = conn.get(); + this->batch.m = response; dataReceived(); + _client = 0; + conn.done(); } +} + +/** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */ +void DBClientCursor::exhaustReceiveMore() { + verify(cursorId && batch.pos == batch.nReturned); + verify(!haveLimit); + auto_ptr<Message> response(new Message()); + verify(_client); + if (!_client->recv(*response)) { + uasserted(16465, "recv failed while exhausting cursor"); + } + batch.m = response; + dataReceived(); +} - void DBClientCursor::dataReceived( bool& retry, string& host ) { - - QueryResult::View qr = batch.m->singleData().view2ptr(); - resultFlags = qr.getResultFlags(); - - if ( qr.getResultFlags() & ResultFlag_ErrSet ) { - wasError = true; - } +void DBClientCursor::dataReceived(bool& retry, string& host) { + QueryResult::View qr = batch.m->singleData().view2ptr(); + resultFlags = qr.getResultFlags(); - if ( qr.getResultFlags() & ResultFlag_CursorNotFound ) { - // cursor id no longer valid at the server. - verify( qr.getCursorId() == 0 ); - cursorId = 0; // 0 indicates no longer valid (dead) - if ( ! ( opts & QueryOption_CursorTailable ) ) - throw UserException( 13127 , "getMore: cursor didn't exist on server, possible restart or timeout?" ); - } + if (qr.getResultFlags() & ResultFlag_ErrSet) { + wasError = true; + } - if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) { - // only set initially: we don't want to kill it on end of data - // if it's a tailable cursor - cursorId = qr.getCursorId(); - } + if (qr.getResultFlags() & ResultFlag_CursorNotFound) { + // cursor id no longer valid at the server. + verify(qr.getCursorId() == 0); + cursorId = 0; // 0 indicates no longer valid (dead) + if (!(opts & QueryOption_CursorTailable)) + throw UserException( + 13127, "getMore: cursor didn't exist on server, possible restart or timeout?"); + } - batch.nReturned = qr.getNReturned(); - batch.pos = 0; - batch.data = qr.data(); + if (cursorId == 0 || !(opts & QueryOption_CursorTailable)) { + // only set initially: we don't want to kill it on end of data + // if it's a tailable cursor + cursorId = qr.getCursorId(); + } - _client->checkResponse( batch.data, batch.nReturned, &retry, &host ); // watches for "not master" + batch.nReturned = qr.getNReturned(); + batch.pos = 0; + batch.data = qr.data(); - if( qr.getResultFlags() & ResultFlag_ShardConfigStale ) { - BSONObj error; - verify( peekError( &error ) ); - throw RecvStaleConfigException( (string)"stale config on lazy receive" + causedBy( getErrField( error ) ), error ); - } + _client->checkResponse(batch.data, batch.nReturned, &retry, &host); // watches for "not master" - /* this assert would fire the way we currently work: - verify( nReturned || cursorId == 0 ); - */ + if (qr.getResultFlags() & ResultFlag_ShardConfigStale) { + BSONObj error; + verify(peekError(&error)); + throw RecvStaleConfigException( + (string) "stale config on lazy receive" + causedBy(getErrField(error)), error); } - /** If true, safe to call next(). Requests more from server if necessary. */ - bool DBClientCursor::more() { - _assertIfNull(); - - if ( !_putBack.empty() ) - return true; + /* this assert would fire the way we currently work: + verify( nReturned || cursorId == 0 ); + */ +} - if (haveLimit && batch.pos >= nToReturn) - return false; +/** If true, safe to call next(). Requests more from server if necessary. */ +bool DBClientCursor::more() { + _assertIfNull(); - if ( batch.pos < batch.nReturned ) - return true; + if (!_putBack.empty()) + return true; - if ( cursorId == 0 ) - return false; + if (haveLimit && batch.pos >= nToReturn) + return false; - requestMore(); - return batch.pos < batch.nReturned; - } + if (batch.pos < batch.nReturned) + return true; - BSONObj DBClientCursor::next() { - DEV _assertIfNull(); - if ( !_putBack.empty() ) { - BSONObj ret = _putBack.top(); - _putBack.pop(); - return ret; - } + if (cursorId == 0) + return false; - uassert(13422, "DBClientCursor next() called but more() is false", batch.pos < batch.nReturned); + requestMore(); + return batch.pos < batch.nReturned; +} - batch.pos++; - BSONObj o(batch.data); - batch.data += o.objsize(); - /* todo would be good to make data null at end of batch for safety */ - return o; +BSONObj DBClientCursor::next() { + DEV _assertIfNull(); + if (!_putBack.empty()) { + BSONObj ret = _putBack.top(); + _putBack.pop(); + return ret; } - BSONObj DBClientCursor::nextSafe() { - BSONObj o = next(); - if( this->wasError && strcmp(o.firstElementFieldName(), "$err") == 0 ) { - std::string s = "nextSafe(): " + o.toString(); - LOG(5) << s; - uasserted(13106, s); - } - return o; + uassert(13422, "DBClientCursor next() called but more() is false", batch.pos < batch.nReturned); + + batch.pos++; + BSONObj o(batch.data); + batch.data += o.objsize(); + /* todo would be good to make data null at end of batch for safety */ + return o; +} + +BSONObj DBClientCursor::nextSafe() { + BSONObj o = next(); + if (this->wasError && strcmp(o.firstElementFieldName(), "$err") == 0) { + std::string s = "nextSafe(): " + o.toString(); + LOG(5) << s; + uasserted(13106, s); } - - void DBClientCursor::peek(vector<BSONObj>& v, int atMost) { - int m = atMost; - - /* - for( stack<BSONObj>::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) { - if( m == 0 ) - return; - v.push_back(*i); - m--; - n++; - } - */ - - int p = batch.pos; - const char *d = batch.data; - while( m && p < batch.nReturned ) { - BSONObj o(d); - d += o.objsize(); - p++; - m--; - v.push_back(o); - } + return o; +} + +void DBClientCursor::peek(vector<BSONObj>& v, int atMost) { + int m = atMost; + + /* + for( stack<BSONObj>::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) { + if( m == 0 ) + return; + v.push_back(*i); + m--; + n++; } - - BSONObj DBClientCursor::peekFirst(){ - vector<BSONObj> v; - peek( v, 1 ); - - if( v.size() > 0 ) return v[0]; - else return BSONObj(); + */ + + int p = batch.pos; + const char* d = batch.data; + while (m && p < batch.nReturned) { + BSONObj o(d); + d += o.objsize(); + p++; + m--; + v.push_back(o); } - - bool DBClientCursor::peekError(BSONObj* error){ - if( ! wasError ) return false; - - vector<BSONObj> v; - peek(v, 1); - - verify( v.size() == 1 ); - verify( hasErrField( v[0] ) ); - - if( error ) *error = v[0].getOwned(); - return true; +} + +BSONObj DBClientCursor::peekFirst() { + vector<BSONObj> v; + peek(v, 1); + + if (v.size() > 0) + return v[0]; + else + return BSONObj(); +} + +bool DBClientCursor::peekError(BSONObj* error) { + if (!wasError) + return false; + + vector<BSONObj> v; + peek(v, 1); + + verify(v.size() == 1); + verify(hasErrField(v[0])); + + if (error) + *error = v[0].getOwned(); + return true; +} + +void DBClientCursor::attach(AScopedConnection* conn) { + verify(_scopedHost.size() == 0); + verify(conn); + verify(conn->get()); + + if (conn->get()->type() == ConnectionString::SET || + conn->get()->type() == ConnectionString::SYNC) { + if (_lazyHost.size() > 0) + _scopedHost = _lazyHost; + else if (_client) + _scopedHost = _client->getServerAddress(); + else + massert(14821, + "No client or lazy client specified, cannot store multi-host connection.", + false); + } else { + _scopedHost = conn->getHost(); } - void DBClientCursor::attach( AScopedConnection * conn ) { - verify( _scopedHost.size() == 0 ); - verify( conn ); - verify( conn->get() ); - - if ( conn->get()->type() == ConnectionString::SET || - conn->get()->type() == ConnectionString::SYNC ) { - if( _lazyHost.size() > 0 ) - _scopedHost = _lazyHost; - else if( _client ) - _scopedHost = _client->getServerAddress(); - else - massert(14821, "No client or lazy client specified, cannot store multi-host connection.", false); - } - else { - _scopedHost = conn->getHost(); - } - - conn->done(); - _client = 0; - _lazyHost = ""; - } + conn->done(); + _client = 0; + _lazyHost = ""; +} - DBClientCursor::~DBClientCursor() { - DESTRUCTOR_GUARD ( +DBClientCursor::~DBClientCursor() { + DESTRUCTOR_GUARD( - if ( cursorId && _ownCursor && ! inShutdown() ) { + if (cursorId && _ownCursor && !inShutdown()) { BufBuilder b; - b.appendNum( (int)0 ); // reserved - b.appendNum( (int)1 ); // number - b.appendNum( cursorId ); - - Message m; - m.setData( dbKillCursors , b.buf() , b.len() ); + b.appendNum((int)0); // reserved + b.appendNum((int)1); // number + b.appendNum(cursorId); - if ( _client ) { + Message m; + m.setData(dbKillCursors, b.buf(), b.len()); + if (_client) { // Kill the cursor the same way the connection itself would. Usually, non-lazily - if( DBClientConnection::getLazyKillCursor() ) - _client->sayPiggyBack( m ); + if (DBClientConnection::getLazyKillCursor()) + _client->sayPiggyBack(m); else - _client->say( m ); + _client->say(m); - } - else { - verify( _scopedHost.size() ); + } else { + verify(_scopedHost.size()); ScopedDbConnection conn(_scopedHost); - if( DBClientConnection::getLazyKillCursor() ) - conn->sayPiggyBack( m ); + if (DBClientConnection::getLazyKillCursor()) + conn->sayPiggyBack(m); else - conn->say( m ); + conn->say(m); conn.done(); } } ); - } +} -} // namespace mongo +} // namespace mongo |