// dbclient.cpp - connect to a Mongo database as a database, from C++
/* Copyright 2009 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/pch.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/client/connpool.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/namespace_string.h"
#include "mongo/s/shard.h"
#include "mongo/s/stale_exception.h" // for RecvStaleConfigException
namespace mongo {
void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend );
void DBClientCursor::_finishConsInit() {
_originalHost = _client->getServerAddress();
}
int DBClientCursor::nextBatchSize() {
if ( nToReturn == 0 )
return batchSize;
if ( batchSize == 0 )
return nToReturn;
return batchSize < nToReturn ? batchSize : nToReturn;
}
void DBClientCursor::_assembleInit( Message& toSend ) {
if ( !cursorId ) {
assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend );
}
else {
BufBuilder b;
b.appendNum( opts );
b.appendStr( ns );
b.appendNum( nToReturn );
b.appendNum( cursorId );
toSend.setData( dbGetMore, b.buf(), b.len() );
}
}
bool DBClientCursor::init() {
Message toSend;
_assembleInit( toSend );
verify( _client );
if ( !_client->call( toSend, *batch.m, false, &_originalHost ) ) {
// log msg temp?
log() << "DBClientCursor::init call() failed" << endl;
return false;
}
if ( batch.m->empty() ) {
// log msg temp?
log() << "DBClientCursor::init message from call() was empty" << endl;
return false;
}
dataReceived();
return true;
}
void DBClientCursor::initLazy( bool isRetry ) {
massert( 15875 , "DBClientCursor::initLazy called on a client that doesn't support lazy" , _client->lazySupported() );
if (DBClientWithCommands::RunCommandHookFunc hook = _client->getRunCommandHook()) {
if (NamespaceString(ns).isCommand()) {
BSONObjBuilder bob;
bob.appendElements(query);
hook(&bob);
query = bob.obj();
}
}
Message toSend;
_assembleInit( toSend );
_client->say( toSend, isRetry, &_originalHost );
}
bool DBClientCursor::initLazyFinish( bool& retry ) {
bool recvd = _client->recv( *batch.m );
// If we get a bad response, return false
if ( ! recvd || batch.m->empty() ) {
if( !recvd )
log() << "DBClientCursor::init lazy say() failed" << endl;
if( batch.m->empty() )
log() << "DBClientCursor::init message from say() was empty" << endl;
_client->checkResponse( NULL, -1, &retry, &_lazyHost );
return false;
}
dataReceived( retry, _lazyHost );
if (DBClientWithCommands::PostRunCommandHookFunc hook = _client->getPostRunCommandHook()) {
if (NamespaceString(ns).isCommand()) {
BSONObj cmdResponse = peekFirst();
hook(cmdResponse, _lazyHost);
}
}
return ! retry;
}
bool DBClientCursor::initCommand(){
BSONObj res;
bool ok = _client->runCommand( nsGetDB( ns ), query, res, opts );
replyToQuery( 0, *batch.m, res );
dataReceived();
return ok;
}
void DBClientCursor::requestMore() {
verify( cursorId && batch.pos == batch.nReturned );
if (haveLimit) {
nToReturn -= batch.nReturned;
verify(nToReturn > 0);
}
BufBuilder b;
b.appendNum(opts);
b.appendStr(ns);
b.appendNum(nextBatchSize());
b.appendNum(cursorId);
Message toSend;
toSend.setData(dbGetMore, b.buf(), b.len());
auto_ptr response(new Message());
if ( _client ) {
_client->call( toSend, *response );
this->batch.m = response;
dataReceived();
}
else {
verify( _scopedHost.size() );
ScopedDbConnection conn(_scopedHost);
conn->call( toSend , *response );
_client = conn.get();
this->batch.m = response;
dataReceived();
_client = 0;
conn.done();
}
}
/** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */
void DBClientCursor::exhaustReceiveMore() {
verify( cursorId && batch.pos == batch.nReturned );
verify( !haveLimit );
auto_ptr response(new Message());
verify( _client );
if (!_client->recv(*response)) {
uasserted(16465, "recv failed while exhausting cursor");
}
batch.m = response;
dataReceived();
}
void DBClientCursor::dataReceived( bool& retry, string& host ) {
QueryResult *qr = (QueryResult *) batch.m->singleData();
resultFlags = qr->resultFlags();
if ( qr->resultFlags() & ResultFlag_ErrSet ) {
wasError = true;
}
if ( qr->resultFlags() & ResultFlag_CursorNotFound ) {
// cursor id no longer valid at the server.
verify( qr->cursorId == 0 );
cursorId = 0; // 0 indicates no longer valid (dead)
if ( ! ( opts & QueryOption_CursorTailable ) )
throw UserException( 13127 , "getMore: cursor didn't exist on server, possible restart or timeout?" );
}
if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) {
// only set initially: we don't want to kill it on end of data
// if it's a tailable cursor
cursorId = qr->cursorId;
}
batch.nReturned = qr->nReturned;
batch.pos = 0;
batch.data = qr->data();
_client->checkResponse( batch.data, batch.nReturned, &retry, &host ); // watches for "not master"
if( qr->resultFlags() & ResultFlag_ShardConfigStale ) {
BSONObj error;
verify( peekError( &error ) );
throw RecvStaleConfigException( (string)"stale config on lazy receive" + causedBy( getErrField( error ) ), error );
}
/* this assert would fire the way we currently work:
verify( nReturned || cursorId == 0 );
*/
}
/** If true, safe to call next(). Requests more from server if necessary. */
bool DBClientCursor::more() {
_assertIfNull();
if ( !_putBack.empty() )
return true;
if (haveLimit && batch.pos >= nToReturn)
return false;
if ( batch.pos < batch.nReturned )
return true;
if ( cursorId == 0 )
return false;
requestMore();
return batch.pos < batch.nReturned;
}
BSONObj DBClientCursor::next() {
DEV _assertIfNull();
if ( !_putBack.empty() ) {
BSONObj ret = _putBack.top();
_putBack.pop();
return ret;
}
uassert(13422, "DBClientCursor next() called but more() is false", batch.pos < batch.nReturned);
batch.pos++;
BSONObj o(batch.data);
batch.data += o.objsize();
/* todo would be good to make data null at end of batch for safety */
return o;
}
void DBClientCursor::peek(vector& v, int atMost) {
int m = atMost;
/*
for( stack::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) {
if( m == 0 )
return;
v.push_back(*i);
m--;
n++;
}
*/
int p = batch.pos;
const char *d = batch.data;
while( m && p < batch.nReturned ) {
BSONObj o(d);
d += o.objsize();
p++;
m--;
v.push_back(o);
}
}
BSONObj DBClientCursor::peekFirst(){
vector v;
peek( v, 1 );
if( v.size() > 0 ) return v[0];
else return BSONObj();
}
bool DBClientCursor::peekError(BSONObj* error){
if( ! wasError ) return false;
vector v;
peek(v, 1);
verify( v.size() == 1 );
verify( hasErrField( v[0] ) );
if( error ) *error = v[0].getOwned();
return true;
}
void DBClientCursor::attach( AScopedConnection * conn ) {
verify( _scopedHost.size() == 0 );
verify( conn );
verify( conn->get() );
if ( conn->get()->type() == ConnectionString::SET ||
conn->get()->type() == ConnectionString::SYNC ) {
if( _lazyHost.size() > 0 )
_scopedHost = _lazyHost;
else if( _client )
_scopedHost = _client->getServerAddress();
else
massert(14821, "No client or lazy client specified, cannot store multi-host connection.", false);
}
else {
_scopedHost = conn->getHost();
}
conn->done();
_client = 0;
_lazyHost = "";
}
DBClientCursor::~DBClientCursor() {
if (!this)
return;
DESTRUCTOR_GUARD (
if ( cursorId && _ownCursor && ! inShutdown() ) {
BufBuilder b;
b.appendNum( (int)0 ); // reserved
b.appendNum( (int)1 ); // number
b.appendNum( cursorId );
Message m;
m.setData( dbKillCursors , b.buf() , b.len() );
if ( _client ) {
// Kill the cursor the same way the connection itself would. Usually, non-lazily
if( DBClientConnection::getLazyKillCursor() )
_client->sayPiggyBack( m );
else
_client->say( m );
}
else {
verify( _scopedHost.size() );
ScopedDbConnection conn(_scopedHost);
if( DBClientConnection::getLazyKillCursor() )
conn->sayPiggyBack( m );
else
conn->say( m );
conn.done();
}
}
);
}
} // namespace mongo