diff options
Diffstat (limited to 'src/mongo/client/dbclientcursor.cpp')
-rw-r--r-- | src/mongo/client/dbclientcursor.cpp | 825 |
1 files changed, 406 insertions, 419 deletions
diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp index 058c74ed74e..a0cfb89d1f4 100644 --- a/src/mongo/client/dbclientcursor.cpp +++ b/src/mongo/client/dbclientcursor.cpp @@ -48,504 +48,491 @@ namespace mongo { - using std::unique_ptr; - using std::endl; - using std::string; - using std::vector; +using std::unique_ptr; +using std::endl; +using std::string; +using std::vector; namespace { - /** - * This code is mostly duplicated from DBClientWithCommands::runCommand. It may not - * be worth de-duplicating as this codepath will eventually be removed anyway. - */ - std::unique_ptr<Message> assembleCommandRequest(DBClientWithCommands* cli, - StringData database, - int legacyQueryOptions, - BSONObj legacyQuery) { - - // TODO: Rewrite this to a common utility shared between this and DBClientMultiCommand. - - // Can be an OP_COMMAND or OP_QUERY message. - auto requestBuilder = rpc::makeRequestBuilder(cli->getClientRPCProtocols(), - cli->getServerRPCProtocols()); - - BSONObj upconvertedCommand; - BSONObj upconvertedMetadata; - - std::tie(upconvertedCommand, upconvertedMetadata) = uassertStatusOK( - rpc::upconvertRequestMetadata(std::move(legacyQuery), legacyQueryOptions) - ); - - BSONObjBuilder metadataBob; - metadataBob.appendElements(upconvertedMetadata); - if (cli->getRequestMetadataWriter()) { - uassertStatusOK(cli->getRequestMetadataWriter()(&metadataBob)); - } +/** + * This code is mostly duplicated from DBClientWithCommands::runCommand. It may not + * be worth de-duplicating as this codepath will eventually be removed anyway. + */ +std::unique_ptr<Message> assembleCommandRequest(DBClientWithCommands* cli, + StringData database, + int legacyQueryOptions, + BSONObj legacyQuery) { + // TODO: Rewrite this to a common utility shared between this and DBClientMultiCommand. + + // Can be an OP_COMMAND or OP_QUERY message. + auto requestBuilder = + rpc::makeRequestBuilder(cli->getClientRPCProtocols(), cli->getServerRPCProtocols()); + + BSONObj upconvertedCommand; + BSONObj upconvertedMetadata; + + std::tie(upconvertedCommand, upconvertedMetadata) = + uassertStatusOK(rpc::upconvertRequestMetadata(std::move(legacyQuery), legacyQueryOptions)); + + BSONObjBuilder metadataBob; + metadataBob.appendElements(upconvertedMetadata); + if (cli->getRequestMetadataWriter()) { + uassertStatusOK(cli->getRequestMetadataWriter()(&metadataBob)); + } - requestBuilder->setDatabase(database); - // We need to get the command name from the upconverted command as it may have originally - // been wrapped. - requestBuilder->setCommandName(upconvertedCommand.firstElementFieldName()); - requestBuilder->setMetadata(metadataBob.done()); - requestBuilder->setCommandArgs(std::move(upconvertedCommand)); + requestBuilder->setDatabase(database); + // We need to get the command name from the upconverted command as it may have originally + // been wrapped. + requestBuilder->setCommandName(upconvertedCommand.firstElementFieldName()); + requestBuilder->setMetadata(metadataBob.done()); + requestBuilder->setCommandArgs(std::move(upconvertedCommand)); - return requestBuilder->done(); - } + return requestBuilder->done(); +} } // namespace - int DBClientCursor::nextBatchSize() { +int DBClientCursor::nextBatchSize() { + if (nToReturn == 0) + return batchSize; - if ( nToReturn == 0 ) - return batchSize; + if (batchSize == 0) + return nToReturn; - if ( batchSize == 0 ) - return nToReturn; + return batchSize < nToReturn ? batchSize : nToReturn; +} - return batchSize < nToReturn ? batchSize : nToReturn; - } +void DBClientCursor::_assembleInit(Message& toSend) { + // If we haven't gotten a cursorId yet, we need to issue a new query or command. + if (!cursorId) { + // HACK: + // Unfortunately, this code is used by the shell to run commands, + // so we need to allow the shell to send invalid options so that we can + // test that the server rejects them. Thus, to allow generating commands with + // invalid options, we validate them here, and fall back to generating an OP_QUERY + // through assembleQueryRequest if the options are invalid. - void DBClientCursor::_assembleInit( Message& toSend ) { - // If we haven't gotten a cursorId yet, we need to issue a new query or command. - if ( !cursorId ) { - // HACK: - // Unfortunately, this code is used by the shell to run commands, - // so we need to allow the shell to send invalid options so that we can - // test that the server rejects them. Thus, to allow generating commands with - // invalid options, we validate them here, and fall back to generating an OP_QUERY - // through assembleQueryRequest if the options are invalid. - - bool hasValidNToReturnForCommand = (nToReturn == 1 || nToReturn == -1); - bool hasValidFlagsForCommand = !(opts & mongo::QueryOption_Exhaust); - - if (_isCommand && hasValidNToReturnForCommand && hasValidFlagsForCommand) { - toSend = *assembleCommandRequest(_client, - nsToDatabaseSubstring(ns), - opts, - query); - return; - } - assembleQueryRequest(ns, - query, - nextBatchSize(), - nToSkip, - fieldsToReturn, - opts, - toSend); + bool hasValidNToReturnForCommand = (nToReturn == 1 || nToReturn == -1); + bool hasValidFlagsForCommand = !(opts & mongo::QueryOption_Exhaust); + + if (_isCommand && hasValidNToReturnForCommand && hasValidFlagsForCommand) { + toSend = *assembleCommandRequest(_client, nsToDatabaseSubstring(ns), opts, query); return; } - // Assemble a legacy getMore request. - BufBuilder b; - b.appendNum( opts ); - b.appendStr( ns ); - b.appendNum( nToReturn ); - b.appendNum( cursorId ); - toSend.setData( dbGetMore, b.buf(), b.len() ); + assembleQueryRequest(ns, query, nextBatchSize(), nToSkip, fieldsToReturn, opts, toSend); + return; } - - bool DBClientCursor::init() { - Message toSend; - _assembleInit( toSend ); - verify( _client ); - if ( !_client->call( toSend, *batch.m, false, &_originalHost ) ) { - // log msg temp? - log() << "DBClientCursor::init call() failed" << endl; - return false; - } - if ( batch.m->empty() ) { - // log msg temp? - log() << "DBClientCursor::init message from call() was empty" << endl; - return false; - } - dataReceived(); - return true; + // Assemble a legacy getMore request. + BufBuilder b; + b.appendNum(opts); + b.appendStr(ns); + b.appendNum(nToReturn); + b.appendNum(cursorId); + toSend.setData(dbGetMore, b.buf(), b.len()); +} + +bool DBClientCursor::init() { + Message toSend; + _assembleInit(toSend); + verify(_client); + if (!_client->call(toSend, *batch.m, false, &_originalHost)) { + // log msg temp? + log() << "DBClientCursor::init call() failed" << endl; + return false; } - - void DBClientCursor::initLazy( bool isRetry ) { - massert( 15875 , "DBClientCursor::initLazy called on a client that doesn't support lazy" , _client->lazySupported() ); - Message toSend; - _assembleInit( toSend ); - _client->say( toSend, isRetry, &_originalHost ); + if (batch.m->empty()) { + // log msg temp? + log() << "DBClientCursor::init message from call() was empty" << endl; + return false; + } + dataReceived(); + return true; +} + +void DBClientCursor::initLazy(bool isRetry) { + massert(15875, + "DBClientCursor::initLazy called on a client that doesn't support lazy", + _client->lazySupported()); + Message toSend; + _assembleInit(toSend); + _client->say(toSend, isRetry, &_originalHost); +} + +bool DBClientCursor::initLazyFinish(bool& retry) { + bool recvd = _client->recv(*batch.m); + + // If we get a bad response, return false + if (!recvd || batch.m->empty()) { + if (!recvd) + log() << "DBClientCursor::init lazy say() failed" << endl; + if (batch.m->empty()) + log() << "DBClientCursor::init message from say() was empty" << endl; + + _client->checkResponse(NULL, -1, &retry, &_lazyHost); + + return false; } - bool DBClientCursor::initLazyFinish( bool& retry ) { - - bool recvd = _client->recv( *batch.m ); - - // If we get a bad response, return false - if ( ! recvd || batch.m->empty() ) { + dataReceived(retry, _lazyHost); - if( !recvd ) - log() << "DBClientCursor::init lazy say() failed" << endl; - if( batch.m->empty() ) - log() << "DBClientCursor::init message from say() was empty" << endl; + return !retry; +} - _client->checkResponse( NULL, -1, &retry, &_lazyHost ); +bool DBClientCursor::initCommand() { + BSONObj res; - return false; + bool ok = _client->runCommand(nsGetDB(ns), query, res, opts); + replyToQuery(0, *batch.m, res); + dataReceived(); - } + return ok; +} - dataReceived( retry, _lazyHost ); +void DBClientCursor::requestMore() { + verify(cursorId && batch.pos == batch.nReturned); - return ! retry; + if (haveLimit) { + nToReturn -= batch.nReturned; + verify(nToReturn > 0); } - - bool DBClientCursor::initCommand(){ - BSONObj res; - - bool ok = _client->runCommand( nsGetDB( ns ), query, res, opts ); - replyToQuery( 0, *batch.m, res ); + BufBuilder b; + b.appendNum(opts); + b.appendStr(ns); + b.appendNum(nextBatchSize()); + b.appendNum(cursorId); + + Message toSend; + toSend.setData(dbGetMore, b.buf(), b.len()); + unique_ptr<Message> response(new Message()); + + if (_client) { + _client->call(toSend, *response); + this->batch.m = std::move(response); dataReceived(); - - return ok; - } - - void DBClientCursor::requestMore() { - verify( cursorId && batch.pos == batch.nReturned ); - - if (haveLimit) { - nToReturn -= batch.nReturned; - verify(nToReturn > 0); - } - BufBuilder b; - b.appendNum(opts); - b.appendStr(ns); - b.appendNum(nextBatchSize()); - b.appendNum(cursorId); - - Message toSend; - toSend.setData(dbGetMore, b.buf(), b.len()); - unique_ptr<Message> response(new Message()); - - if ( _client ) { - _client->call( toSend, *response ); - this->batch.m = std::move(response); - dataReceived(); - } - else { - verify( _scopedHost.size() ); - ScopedDbConnection conn(_scopedHost); - conn->call( toSend , *response ); - _client = conn.get(); - this->batch.m = std::move(response); - dataReceived(); - _client = 0; - conn.done(); - } - } - - /** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */ - void DBClientCursor::exhaustReceiveMore() { - verify( cursorId && batch.pos == batch.nReturned ); - verify( !haveLimit ); - unique_ptr<Message> response(new Message()); - verify( _client ); - if (!_client->recv(*response)) { - uasserted(16465, "recv failed while exhausting cursor"); - } - batch.m = std::move(response); + } else { + verify(_scopedHost.size()); + ScopedDbConnection conn(_scopedHost); + conn->call(toSend, *response); + _client = conn.get(); + this->batch.m = std::move(response); dataReceived(); + _client = 0; + conn.done(); } +} + +/** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */ +void DBClientCursor::exhaustReceiveMore() { + verify(cursorId && batch.pos == batch.nReturned); + verify(!haveLimit); + unique_ptr<Message> response(new Message()); + verify(_client); + if (!_client->recv(*response)) { + uasserted(16465, "recv failed while exhausting cursor"); + } + batch.m = std::move(response); + dataReceived(); +} - void DBClientCursor::commandDataReceived() { - int op = batch.m->operation(); - invariant(op == opReply || op == dbCommandReply); +void DBClientCursor::commandDataReceived() { + int op = batch.m->operation(); + invariant(op == opReply || op == dbCommandReply); - batch.nReturned = 1; - batch.pos = 0; + batch.nReturned = 1; + batch.pos = 0; - auto commandReply = rpc::makeReply(batch.m.get()); + auto commandReply = rpc::makeReply(batch.m.get()); - auto commandStatus = getStatusFromCommandResult(commandReply->getCommandReply()); + auto commandStatus = getStatusFromCommandResult(commandReply->getCommandReply()); - if (ErrorCodes::SendStaleConfig == commandStatus) { - throw RecvStaleConfigException("stale config in DBClientCursor::dataReceived()", - commandReply->getCommandReply()); - } - else if (!commandStatus.isOK()) { - wasError = true; - } - - if (_client->getReplyMetadataReader()) { - uassertStatusOK( - _client->getReplyMetadataReader()(commandReply->getMetadata(), - _client->getServerAddress()) - ); - } + if (ErrorCodes::SendStaleConfig == commandStatus) { + throw RecvStaleConfigException("stale config in DBClientCursor::dataReceived()", + commandReply->getCommandReply()); + } else if (!commandStatus.isOK()) { + wasError = true; + } - // HACK: If we got an OP_COMMANDREPLY, take the reply object - // and shove it in to an OP_REPLY message. - if (op == dbCommandReply) { - // Need to take ownership here as we destroy the underlying message. - BSONObj reply = commandReply->getCommandReply().getOwned(); - batch.m = stdx::make_unique<Message>(); - replyToQuery(0, *batch.m, reply); - } + if (_client->getReplyMetadataReader()) { + uassertStatusOK(_client->getReplyMetadataReader()(commandReply->getMetadata(), + _client->getServerAddress())); + } - QueryResult::View qr = batch.m->singleData().view2ptr(); - batch.data = qr.data(); + // HACK: If we got an OP_COMMANDREPLY, take the reply object + // and shove it in to an OP_REPLY message. + if (op == dbCommandReply) { + // Need to take ownership here as we destroy the underlying message. + BSONObj reply = commandReply->getCommandReply().getOwned(); + batch.m = stdx::make_unique<Message>(); + replyToQuery(0, *batch.m, reply); } - void DBClientCursor::dataReceived( bool& retry, string& host ) { - // If this is a reply to our initial command request. - if (_isCommand && cursorId == 0) { - commandDataReceived(); - return; - } + QueryResult::View qr = batch.m->singleData().view2ptr(); + batch.data = qr.data(); +} - QueryResult::View qr = batch.m->singleData().view2ptr(); - resultFlags = qr.getResultFlags(); +void DBClientCursor::dataReceived(bool& retry, string& host) { + // If this is a reply to our initial command request. + if (_isCommand && cursorId == 0) { + commandDataReceived(); + return; + } - if ( qr.getResultFlags() & ResultFlag_ErrSet ) { - wasError = true; - } + QueryResult::View qr = batch.m->singleData().view2ptr(); + resultFlags = qr.getResultFlags(); - if ( qr.getResultFlags() & ResultFlag_CursorNotFound ) { - // cursor id no longer valid at the server. - invariant(qr.getCursorId() == 0); + if (qr.getResultFlags() & ResultFlag_ErrSet) { + wasError = true; + } - if (!(opts & QueryOption_CursorTailable)) { - uasserted(13127, - str::stream() << "cursor id " << cursorId << " didn't exist on server."); - } + if (qr.getResultFlags() & ResultFlag_CursorNotFound) { + // cursor id no longer valid at the server. + invariant(qr.getCursorId() == 0); - // 0 indicates no longer valid (dead) - cursorId = 0; + if (!(opts & QueryOption_CursorTailable)) { + uasserted(13127, + str::stream() << "cursor id " << cursorId << " didn't exist on server."); } - if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) { - // only set initially: we don't want to kill it on end of data - // if it's a tailable cursor - cursorId = qr.getCursorId(); - } + // 0 indicates no longer valid (dead) + cursorId = 0; + } - batch.nReturned = qr.getNReturned(); - batch.pos = 0; - batch.data = qr.data(); + if (cursorId == 0 || !(opts & QueryOption_CursorTailable)) { + // only set initially: we don't want to kill it on end of data + // if it's a tailable cursor + cursorId = qr.getCursorId(); + } - _client->checkResponse( batch.data, batch.nReturned, &retry, &host ); // watches for "not master" + batch.nReturned = qr.getNReturned(); + batch.pos = 0; + batch.data = qr.data(); - if( qr.getResultFlags() & ResultFlag_ShardConfigStale ) { - BSONObj error; - verify( peekError( &error ) ); - throw RecvStaleConfigException( (string)"stale config on lazy receive" + causedBy( getErrField( error ) ), error ); - } + _client->checkResponse(batch.data, batch.nReturned, &retry, &host); // watches for "not master" - /* this assert would fire the way we currently work: - verify( nReturned || cursorId == 0 ); - */ + if (qr.getResultFlags() & ResultFlag_ShardConfigStale) { + BSONObj error; + verify(peekError(&error)); + throw RecvStaleConfigException( + (string) "stale config on lazy receive" + causedBy(getErrField(error)), error); } - /** If true, safe to call next(). Requests more from server if necessary. */ - bool DBClientCursor::more() { - _assertIfNull(); - - if ( !_putBack.empty() ) - return true; + /* this assert would fire the way we currently work: + verify( nReturned || cursorId == 0 ); + */ +} - if (haveLimit && batch.pos >= nToReturn) - return false; +/** If true, safe to call next(). Requests more from server if necessary. */ +bool DBClientCursor::more() { + _assertIfNull(); - if ( batch.pos < batch.nReturned ) - return true; + if (!_putBack.empty()) + return true; - if ( cursorId == 0 ) - return false; + if (haveLimit && batch.pos >= nToReturn) + return false; - requestMore(); - return batch.pos < batch.nReturned; - } + if (batch.pos < batch.nReturned) + return true; - BSONObj DBClientCursor::next() { - DEV _assertIfNull(); - if ( !_putBack.empty() ) { - BSONObj ret = _putBack.top(); - _putBack.pop(); - return ret; - } + if (cursorId == 0) + return false; - uassert(13422, "DBClientCursor next() called but more() is false", batch.pos < batch.nReturned); + requestMore(); + return batch.pos < batch.nReturned; +} - batch.pos++; - BSONObj o(batch.data); - batch.data += o.objsize(); - /* todo would be good to make data null at end of batch for safety */ - return o; +BSONObj DBClientCursor::next() { + DEV _assertIfNull(); + if (!_putBack.empty()) { + BSONObj ret = _putBack.top(); + _putBack.pop(); + return ret; } - BSONObj DBClientCursor::nextSafe() { - BSONObj o = next(); - if( this->wasError && strcmp(o.firstElementFieldName(), "$err") == 0 ) { - std::string s = "nextSafe(): " + o.toString(); - LOG(5) << s; - uasserted(13106, s); - } - return o; + uassert(13422, "DBClientCursor next() called but more() is false", batch.pos < batch.nReturned); + + batch.pos++; + BSONObj o(batch.data); + batch.data += o.objsize(); + /* todo would be good to make data null at end of batch for safety */ + return o; +} + +BSONObj DBClientCursor::nextSafe() { + BSONObj o = next(); + if (this->wasError && strcmp(o.firstElementFieldName(), "$err") == 0) { + std::string s = "nextSafe(): " + o.toString(); + LOG(5) << s; + uasserted(13106, s); } + return o; +} - void DBClientCursor::peek(vector<BSONObj>& v, int atMost) { - int m = atMost; +void DBClientCursor::peek(vector<BSONObj>& v, int atMost) { + int m = atMost; - /* - for( stack<BSONObj>::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) { - if( m == 0 ) - return; - v.push_back(*i); - m--; - n++; - } - */ - - int p = batch.pos; - const char *d = batch.data; - while( m && p < batch.nReturned ) { - BSONObj o(d); - d += o.objsize(); - p++; - m--; - v.push_back(o); - } - } - - BSONObj DBClientCursor::peekFirst(){ - vector<BSONObj> v; - peek( v, 1 ); - - if( v.size() > 0 ) return v[0]; - else return BSONObj(); + /* + for( stack<BSONObj>::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) { + if( m == 0 ) + return; + v.push_back(*i); + m--; + n++; } - - bool DBClientCursor::peekError(BSONObj* error){ - if( ! wasError ) return false; - - vector<BSONObj> v; - peek(v, 1); - - verify( v.size() == 1 ); - verify( hasErrField( v[0] ) ); - - if( error ) *error = v[0].getOwned(); - return true; + */ + + int p = batch.pos; + const char* d = batch.data; + while (m && p < batch.nReturned) { + BSONObj o(d); + d += o.objsize(); + p++; + m--; + v.push_back(o); } - - void DBClientCursor::attach( AScopedConnection * conn ) { - verify( _scopedHost.size() == 0 ); - verify( conn ); - verify( conn->get() ); - - if ( conn->get()->type() == ConnectionString::SET || - conn->get()->type() == ConnectionString::SYNC ) { - if( _lazyHost.size() > 0 ) - _scopedHost = _lazyHost; - else if( _client ) - _scopedHost = _client->getServerAddress(); - else - massert(14821, "No client or lazy client specified, cannot store multi-host connection.", false); - } - else { - _scopedHost = conn->getHost(); - } - - conn->done(); - _client = 0; - _lazyHost = ""; +} + +BSONObj DBClientCursor::peekFirst() { + vector<BSONObj> v; + peek(v, 1); + + if (v.size() > 0) + return v[0]; + else + return BSONObj(); +} + +bool DBClientCursor::peekError(BSONObj* error) { + if (!wasError) + return false; + + vector<BSONObj> v; + peek(v, 1); + + verify(v.size() == 1); + verify(hasErrField(v[0])); + + if (error) + *error = v[0].getOwned(); + return true; +} + +void DBClientCursor::attach(AScopedConnection* conn) { + verify(_scopedHost.size() == 0); + verify(conn); + verify(conn->get()); + + if (conn->get()->type() == ConnectionString::SET || + conn->get()->type() == ConnectionString::SYNC) { + if (_lazyHost.size() > 0) + _scopedHost = _lazyHost; + else if (_client) + _scopedHost = _client->getServerAddress(); + else + massert(14821, + "No client or lazy client specified, cannot store multi-host connection.", + false); + } else { + _scopedHost = conn->getHost(); } - DBClientCursor::DBClientCursor(DBClientBase* client, - const std::string& ns, - const BSONObj& query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize) - : DBClientCursor(client, - ns, - query, - 0, // cursorId - nToReturn, - nToSkip, - fieldsToReturn, - queryOptions, - batchSize) {} - - DBClientCursor::DBClientCursor(DBClientBase* client, - const std::string& ns, - long long cursorId, - int nToReturn, - int queryOptions) - : DBClientCursor(client, - ns, - BSONObj(), // query - cursorId, - nToReturn, - 0, // nToSkip - nullptr, // fieldsToReturn - queryOptions, - 0) {} // batchSize - - DBClientCursor::DBClientCursor(DBClientBase* client, - const std::string& ns, - const BSONObj& query, - long long cursorId, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize) - : _client(client), - _originalHost(_client->getServerAddress()), - ns(ns), - _isCommand(nsIsFull(ns) ? nsToCollectionSubstring(ns) == "$cmd" : false), - query(query), - nToReturn(nToReturn), - haveLimit(nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)), - nToSkip(nToSkip), - fieldsToReturn(fieldsToReturn), - opts(queryOptions), - batchSize(batchSize == 1 ? 2 : batchSize), - resultFlags(0), - cursorId(cursorId), - _ownCursor(true), - wasError(false) {} - - DBClientCursor::~DBClientCursor() { - DESTRUCTOR_GUARD ( - - if ( cursorId && _ownCursor && ! inShutdown() ) { + conn->done(); + _client = 0; + _lazyHost = ""; +} + +DBClientCursor::DBClientCursor(DBClientBase* client, + const std::string& ns, + const BSONObj& query, + int nToReturn, + int nToSkip, + const BSONObj* fieldsToReturn, + int queryOptions, + int batchSize) + : DBClientCursor(client, + ns, + query, + 0, // cursorId + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize) {} + +DBClientCursor::DBClientCursor(DBClientBase* client, + const std::string& ns, + long long cursorId, + int nToReturn, + int queryOptions) + : DBClientCursor(client, + ns, + BSONObj(), // query + cursorId, + nToReturn, + 0, // nToSkip + nullptr, // fieldsToReturn + queryOptions, + 0) {} // batchSize + +DBClientCursor::DBClientCursor(DBClientBase* client, + const std::string& ns, + const BSONObj& query, + long long cursorId, + int nToReturn, + int nToSkip, + const BSONObj* fieldsToReturn, + int queryOptions, + int batchSize) + : _client(client), + _originalHost(_client->getServerAddress()), + ns(ns), + _isCommand(nsIsFull(ns) ? nsToCollectionSubstring(ns) == "$cmd" : false), + query(query), + nToReturn(nToReturn), + haveLimit(nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)), + nToSkip(nToSkip), + fieldsToReturn(fieldsToReturn), + opts(queryOptions), + batchSize(batchSize == 1 ? 2 : batchSize), + resultFlags(0), + cursorId(cursorId), + _ownCursor(true), + wasError(false) {} + +DBClientCursor::~DBClientCursor() { + DESTRUCTOR_GUARD( + + if (cursorId && _ownCursor && !inShutdown()) { BufBuilder b; - b.appendNum( (int)0 ); // reserved - b.appendNum( (int)1 ); // number - b.appendNum( cursorId ); - - Message m; - m.setData( dbKillCursors , b.buf() , b.len() ); + b.appendNum((int)0); // reserved + b.appendNum((int)1); // number + b.appendNum(cursorId); - if ( _client ) { + Message m; + m.setData(dbKillCursors, b.buf(), b.len()); + if (_client) { // Kill the cursor the same way the connection itself would. Usually, non-lazily - if( DBClientConnection::getLazyKillCursor() ) - _client->sayPiggyBack( m ); + if (DBClientConnection::getLazyKillCursor()) + _client->sayPiggyBack(m); else - _client->say( m ); + _client->say(m); - } - else { - verify( _scopedHost.size() ); + } else { + verify(_scopedHost.size()); ScopedDbConnection conn(_scopedHost); - if( DBClientConnection::getLazyKillCursor() ) - conn->sayPiggyBack( m ); + if (DBClientConnection::getLazyKillCursor()) + conn->sayPiggyBack(m); else - conn->say( m ); + conn->say(m); conn.done(); } } ); - } +} -} // namespace mongo +} // namespace mongo |