diff options
author | Eliot Horowitz <eliot@10gen.com> | 2010-04-29 10:15:05 -0400 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2010-04-29 10:15:05 -0400 |
commit | 1ba7715c1f0f1d3ec878c0937e059c45a403a44c (patch) | |
tree | c1bc11c3c83a09ba7688c625d74377f713ef0a2d | |
parent | 0d7e6b49131779f396b1186860e23803569ba062 (diff) | |
download | mongo-1ba7715c1f0f1d3ec878c0937e059c45a403a44c.tar.gz |
some client refactoring
-rw-r--r-- | SConstruct | 2 | ||||
-rw-r--r-- | client/connpool.h | 6 | ||||
-rw-r--r-- | client/dbclient.cpp | 145 | ||||
-rw-r--r-- | client/dbclient.h | 146 | ||||
-rw-r--r-- | client/dbclientcursor.cpp | 174 | ||||
-rw-r--r-- | client/dbclientcursor.h | 174 |
6 files changed, 356 insertions, 291 deletions
diff --git a/SConstruct b/SConstruct index 9aa8d85d4fd..996814bbf84 100644 --- a/SConstruct +++ b/SConstruct @@ -364,7 +364,7 @@ commonFiles += [ "util/background.cpp" , "util/mmap.cpp" , "util/ramstore.cpp", "util/assert_util.cpp" , "util/httpclient.cpp" , "util/md5main.cpp" , "util/base64.cpp", "util/debug_util.cpp", "util/thread_pool.cpp" ] commonFiles += Glob( "util/*.c" ) -commonFiles += Split( "client/connpool.cpp client/dbclient.cpp client/model.cpp client/syncclusterconnection.cpp" ) +commonFiles += Split( "client/connpool.cpp client/dbclient.cpp client/dbclientcursor.cpp client/model.cpp client/syncclusterconnection.cpp" ) commonFiles += [ "scripting/engine.cpp" , "scripting/utils.cpp" ] #mmap stuff diff --git a/client/connpool.h b/client/connpool.h index 70ad46a9651..f39fc8da8ae 100644 --- a/client/connpool.h +++ b/client/connpool.h @@ -102,18 +102,20 @@ namespace mongo { } ScopedDbConnection() - : _host( "" ) , _conn(0 ){ + : _host( "" ) , _conn(0) { } /** throws UserException if can't connect */ ScopedDbConnection(const string& host) : _host(host), _conn( pool.get(host) ) { } - + ScopedDbConnection(const string& host, DBClientBase* conn ) : _host( host ) , _conn( conn ){ } + string getHost() const { return _host; } + /** Force closure of the connection. You should call this if you leave it in a bad state. Destructor will do this too, but it is verbose. */ diff --git a/client/dbclient.cpp b/client/dbclient.cpp index 310b7a11870..d7ff9cb1d82 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -764,151 +764,6 @@ namespace mongo { } } - int DBClientCursor::nextBatchSize(){ - if ( nToReturn == 0 ) - return batchSize; - if ( batchSize == 0 ) - return nToReturn; - - return batchSize < nToReturn ? batchSize : nToReturn; - } - - bool DBClientCursor::init() { - Message toSend; - if ( !cursorId ) { - assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend ); - } else { - BufBuilder b; - b.append( opts ); - b.append( ns.c_str() ); - b.append( nToReturn ); - b.append( cursorId ); - toSend.setData( dbGetMore, b.buf(), b.len() ); - } - if ( !connector->call( toSend, *m, false ) ) - return false; - if ( ! m->data ) - return false; - dataReceived(); - return true; - } - - void DBClientCursor::requestMore() { - assert( cursorId && pos == nReturned ); - - if (haveLimit){ - nToReturn -= nReturned; - assert(nToReturn > 0); - } - BufBuilder b; - b.append(opts); - b.append(ns.c_str()); - b.append(nextBatchSize()); - b.append(cursorId); - - Message toSend; - toSend.setData(dbGetMore, b.buf(), b.len()); - auto_ptr<Message> response(new Message()); - connector->call( toSend, *response ); - - m = response; - dataReceived(); - } - - void DBClientCursor::dataReceived() { - QueryResult *qr = (QueryResult *) m->data; - resultFlags = qr->resultFlags(); - - if ( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) { - // cursor id no longer valid at the server. - assert( qr->cursorId == 0 ); - cursorId = 0; // 0 indicates no longer valid (dead) - if ( ! ( opts & QueryOption_CursorTailable ) ) - throw UserException( 13127 , "getMore: cursor didn't exist on server, possible restart or timeout?" ); - } - - if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) { - // only set initially: we don't want to kill it on end of data - // if it's a tailable cursor - cursorId = qr->cursorId; - } - - nReturned = qr->nReturned; - pos = 0; - data = qr->data(); - - connector->checkResponse( data, nReturned ); - /* this assert would fire the way we currently work: - assert( nReturned || cursorId == 0 ); - */ - } - - /** If true, safe to call next(). Requests more from server if necessary. */ - bool DBClientCursor::more() { - if ( !_putBack.empty() ) - return true; - - if (haveLimit && pos >= nToReturn) - return false; - - if ( pos < nReturned ) - return true; - - if ( cursorId == 0 ) - return false; - - requestMore(); - return pos < nReturned; - } - - BSONObj DBClientCursor::next() { - assert( more() ); - if ( !_putBack.empty() ) { - BSONObj ret = _putBack.top(); - _putBack.pop(); - return ret; - } - pos++; - BSONObj o(data); - data += o.objsize(); - return o; - } - - void DBClientCursor::attach( ScopedDbConnection * conn ){ - assert( ! _scopedConn ); - _scopedConn = conn->steal(); - } - - - - DBClientCursor::~DBClientCursor() { - DESTRUCTOR_GUARD ( - - if ( cursorId && _ownCursor ) { - BufBuilder b; - b.append( (int)0 ); // reserved - b.append( (int)1 ); // number - b.append( cursorId ); - - Message m; - m.setData( dbKillCursors , b.buf() , b.len() ); - - connector->sayPiggyBack( m ); - } - - if ( _scopedConn ){ - if ( moreInCurrentBatch() ){ - log() << "warning: cursor deleted, but moreInCurrentBatch and scoped conn." << endl; - } - else { - _scopedConn->done(); - } - delete _scopedConn; - } - - ); - } - /* --- class dbclientpaired --- */ string DBClientPaired::toString() { diff --git a/client/dbclient.h b/client/dbclient.h index 4e8e2aa5314..0c69a4f0cec 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -84,6 +84,7 @@ namespace mongo { class BSONObj; class ScopedDbConnection; + class DBClientCursor; /** Represents a Mongo query expression. Typically one uses the QUERY(...) macro to construct a Query object. Examples: @@ -209,149 +210,6 @@ namespace mongo { virtual void checkResponse( const string &data, int nReturned ) {} }; - /** Queries return a cursor object */ - class DBClientCursor : boost::noncopyable { - public: - /** If true, safe to call next(). Requests more from server if necessary. */ - bool more(); - - /** If true, there is more in our local buffers to be fetched via next(). Returns - false when a getMore request back to server would be required. You can use this - if you want to exhaust whatever data has been fetched to the client already but - then perhaps stop. - */ - bool moreInCurrentBatch() { return !_putBack.empty() || pos < nReturned; } - - /** next - @return next object in the result cursor. - on an error at the remote server, you will get back: - { $err: <string> } - if you do not want to handle that yourself, call nextSafe(). - */ - BSONObj next(); - - /** - restore an object previously returned by next() to the cursor - */ - void putBack( const BSONObj &o ) { _putBack.push( o.getOwned() ); } - - /** throws AssertionException if get back { $err : ... } */ - BSONObj nextSafe() { - BSONObj o = next(); - BSONElement e = o.firstElement(); - if( strcmp(e.fieldName(), "$err") == 0 ) { - if( logLevel >= 5 ) - log() << "nextSafe() error " << o.toString() << endl; - uassert(13106, "nextSafe() returns $err", false); - } - return o; - } - - /** - iterate the rest of the cursor and return the number if items - */ - int itcount(){ - int c = 0; - while ( more() ){ - next(); - c++; - } - return c; - } - - /** cursor no longer valid -- use with tailable cursors. - note you should only rely on this once more() returns false; - 'dead' may be preset yet some data still queued and locally - available from the dbclientcursor. - */ - bool isDead() const { - return cursorId == 0; - } - - bool tailable() const { - return (opts & QueryOption_CursorTailable) != 0; - } - - /** see QueryResult::ResultFlagType (db/dbmessage.h) for flag values - mostly these flags are for internal purposes - - ResultFlag_ErrSet is the possible exception to that - */ - bool hasResultFlag( int flag ){ - return (resultFlags & flag) != 0; - } - - DBClientCursor( DBConnector *_connector, const string &_ns, BSONObj _query, int _nToReturn, - int _nToSkip, const BSONObj *_fieldsToReturn, int queryOptions , int bs ) : - connector(_connector), - ns(_ns), - query(_query), - nToReturn(_nToReturn), - haveLimit( _nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)), - nToSkip(_nToSkip), - fieldsToReturn(_fieldsToReturn), - opts(queryOptions), - batchSize(bs), - m(new Message()), - cursorId(), - nReturned(), - pos(), - data(), - _ownCursor( true ), - _scopedConn(0){ - } - - DBClientCursor( DBConnector *_connector, const string &_ns, long long _cursorId, int _nToReturn, int options ) : - connector(_connector), - ns(_ns), - nToReturn( _nToReturn ), - haveLimit( _nToReturn > 0 && !(options & QueryOption_CursorTailable)), - opts( options ), - m(new Message()), - cursorId( _cursorId ), - nReturned(), - pos(), - data(), - _ownCursor( true ), - _scopedConn(0){ - } - - virtual ~DBClientCursor(); - - long long getCursorId() const { return cursorId; } - - /** by default we "own" the cursor and will send the server a KillCursor - message when ~DBClientCursor() is called. This function overrides that. - */ - void decouple() { _ownCursor = false; } - - void attach( ScopedDbConnection * conn ); - - private: - friend class DBClientBase; - bool init(); - int nextBatchSize(); - DBConnector *connector; - string ns; - BSONObj query; - int nToReturn; - bool haveLimit; - int nToSkip; - const BSONObj *fieldsToReturn; - int opts; - int batchSize; - auto_ptr<Message> m; - stack< BSONObj > _putBack; - int resultFlags; - long long cursorId; - int nReturned; - int pos; - const char *data; - void dataReceived(); - void requestMore(); - bool _ownCursor; // see decouple() - ScopedDbConnection * _scopedConn; - }; - /** The interface that any db connection should implement */ @@ -967,4 +825,6 @@ namespace mongo { } // namespace mongo +#include "dbclientcursor.h" + #include "undef_macros.h" diff --git a/client/dbclientcursor.cpp b/client/dbclientcursor.cpp new file mode 100644 index 00000000000..a50b1cb14b6 --- /dev/null +++ b/client/dbclientcursor.cpp @@ -0,0 +1,174 @@ +// dbclient.cpp - connect to a Mongo database as a database, from C++ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "dbclient.h" +#include "../db/dbmessage.h" +#include "../db/cmdline.h" +#include "connpool.h" + +namespace mongo { + + void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ); + + int DBClientCursor::nextBatchSize(){ + if ( nToReturn == 0 ) + return batchSize; + if ( batchSize == 0 ) + return nToReturn; + + return batchSize < nToReturn ? batchSize : nToReturn; + } + + bool DBClientCursor::init() { + Message toSend; + if ( !cursorId ) { + assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend ); + } else { + BufBuilder b; + b.append( opts ); + b.append( ns.c_str() ); + b.append( nToReturn ); + b.append( cursorId ); + toSend.setData( dbGetMore, b.buf(), b.len() ); + } + if ( !connector->call( toSend, *m, false ) ) + return false; + if ( ! m->data ) + return false; + dataReceived(); + return true; + } + + void DBClientCursor::requestMore() { + assert( cursorId && pos == nReturned ); + + if (haveLimit){ + nToReturn -= nReturned; + assert(nToReturn > 0); + } + BufBuilder b; + b.append(opts); + b.append(ns.c_str()); + b.append(nextBatchSize()); + b.append(cursorId); + + Message toSend; + toSend.setData(dbGetMore, b.buf(), b.len()); + auto_ptr<Message> response(new Message()); + connector->call( toSend, *response ); + + m = response; + dataReceived(); + } + + void DBClientCursor::dataReceived() { + QueryResult *qr = (QueryResult *) m->data; + resultFlags = qr->resultFlags(); + + if ( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) { + // cursor id no longer valid at the server. + assert( qr->cursorId == 0 ); + cursorId = 0; // 0 indicates no longer valid (dead) + if ( ! ( opts & QueryOption_CursorTailable ) ) + throw UserException( 13127 , "getMore: cursor didn't exist on server, possible restart or timeout?" ); + } + + if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) { + // only set initially: we don't want to kill it on end of data + // if it's a tailable cursor + cursorId = qr->cursorId; + } + + nReturned = qr->nReturned; + pos = 0; + data = qr->data(); + + connector->checkResponse( data, nReturned ); + /* this assert would fire the way we currently work: + assert( nReturned || cursorId == 0 ); + */ + } + + /** If true, safe to call next(). Requests more from server if necessary. */ + bool DBClientCursor::more() { + if ( !_putBack.empty() ) + return true; + + if (haveLimit && pos >= nToReturn) + return false; + + if ( pos < nReturned ) + return true; + + if ( cursorId == 0 ) + return false; + + requestMore(); + return pos < nReturned; + } + + BSONObj DBClientCursor::next() { + assert( more() ); + if ( !_putBack.empty() ) { + BSONObj ret = _putBack.top(); + _putBack.pop(); + return ret; + } + pos++; + BSONObj o(data); + data += o.objsize(); + return o; + } + + void DBClientCursor::attach( ScopedDbConnection * conn ){ + assert( ! _scopedConn ); + _scopedConn = conn->steal(); + } + + + + DBClientCursor::~DBClientCursor() { + DESTRUCTOR_GUARD ( + + if ( cursorId && _ownCursor ) { + BufBuilder b; + b.append( (int)0 ); // reserved + b.append( (int)1 ); // number + b.append( cursorId ); + + Message m; + m.setData( dbKillCursors , b.buf() , b.len() ); + + connector->sayPiggyBack( m ); + } + + if ( _scopedConn ){ + if ( moreInCurrentBatch() ){ + log() << "warning: cursor deleted, but moreInCurrentBatch and scoped conn." << endl; + } + else { + _scopedConn->done(); + } + delete _scopedConn; + } + + ); + } + + +} // namespace mongo diff --git a/client/dbclientcursor.h b/client/dbclientcursor.h new file mode 100644 index 00000000000..8755a11ee0f --- /dev/null +++ b/client/dbclientcursor.h @@ -0,0 +1,174 @@ +// file dbclientcursor.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../pch.h" +#include "../util/message.h" +#include "../db/jsobj.h" +#include "../db/json.h" +#include <stack> + +namespace mongo { + + /** Queries return a cursor object */ + class DBClientCursor : boost::noncopyable { + public: + /** If true, safe to call next(). Requests more from server if necessary. */ + bool more(); + + /** If true, there is more in our local buffers to be fetched via next(). Returns + false when a getMore request back to server would be required. You can use this + if you want to exhaust whatever data has been fetched to the client already but + then perhaps stop. + */ + bool moreInCurrentBatch() { return !_putBack.empty() || pos < nReturned; } + + /** next + @return next object in the result cursor. + on an error at the remote server, you will get back: + { $err: <string> } + if you do not want to handle that yourself, call nextSafe(). + */ + BSONObj next(); + + /** + restore an object previously returned by next() to the cursor + */ + void putBack( const BSONObj &o ) { _putBack.push( o.getOwned() ); } + + /** throws AssertionException if get back { $err : ... } */ + BSONObj nextSafe() { + BSONObj o = next(); + BSONElement e = o.firstElement(); + if( strcmp(e.fieldName(), "$err") == 0 ) { + if( logLevel >= 5 ) + log() << "nextSafe() error " << o.toString() << endl; + uassert(13106, "nextSafe() returns $err", false); + } + return o; + } + + /** + iterate the rest of the cursor and return the number if items + */ + int itcount(){ + int c = 0; + while ( more() ){ + next(); + c++; + } + return c; + } + + /** cursor no longer valid -- use with tailable cursors. + note you should only rely on this once more() returns false; + 'dead' may be preset yet some data still queued and locally + available from the dbclientcursor. + */ + bool isDead() const { + return cursorId == 0; + } + + bool tailable() const { + return (opts & QueryOption_CursorTailable) != 0; + } + + /** see QueryResult::ResultFlagType (db/dbmessage.h) for flag values + mostly these flags are for internal purposes - + ResultFlag_ErrSet is the possible exception to that + */ + bool hasResultFlag( int flag ){ + return (resultFlags & flag) != 0; + } + + DBClientCursor( DBConnector *_connector, const string &_ns, BSONObj _query, int _nToReturn, + int _nToSkip, const BSONObj *_fieldsToReturn, int queryOptions , int bs ) : + connector(_connector), + ns(_ns), + query(_query), + nToReturn(_nToReturn), + haveLimit( _nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)), + nToSkip(_nToSkip), + fieldsToReturn(_fieldsToReturn), + opts(queryOptions), + batchSize(bs), + m(new Message()), + cursorId(), + nReturned(), + pos(), + data(), + _ownCursor( true ), + _scopedConn(0){ + } + + DBClientCursor( DBConnector *_connector, const string &_ns, long long _cursorId, int _nToReturn, int options ) : + connector(_connector), + ns(_ns), + nToReturn( _nToReturn ), + haveLimit( _nToReturn > 0 && !(options & QueryOption_CursorTailable)), + opts( options ), + m(new Message()), + cursorId( _cursorId ), + nReturned(), + pos(), + data(), + _ownCursor( true ), + _scopedConn(0){ + } + + virtual ~DBClientCursor(); + + long long getCursorId() const { return cursorId; } + + /** by default we "own" the cursor and will send the server a KillCursor + message when ~DBClientCursor() is called. This function overrides that. + */ + void decouple() { _ownCursor = false; } + + void attach( ScopedDbConnection * conn ); + + private: + friend class DBClientBase; + bool init(); + int nextBatchSize(); + DBConnector *connector; + string ns; + BSONObj query; + int nToReturn; + bool haveLimit; + int nToSkip; + const BSONObj *fieldsToReturn; + int opts; + int batchSize; + auto_ptr<Message> m; + stack< BSONObj > _putBack; + int resultFlags; + long long cursorId; + int nReturned; + int pos; + const char *data; + void dataReceived(); + void requestMore(); + bool _ownCursor; // see decouple() + ScopedDbConnection * _scopedConn; + }; + + +} // namespace mongo + +#include "undef_macros.h" |