// cursors.cpp /* * Copyright (C) 2010 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "pch.h" #include "cursors.h" #include "../client/connpool.h" #include "../db/commands.h" #include "../util/concurrency/task.h" #include "../util/net/listen.h" namespace mongo { const int ShardedClientCursor::INIT_REPLY_BUFFER_SIZE = 32768; // -------- ShardedCursor ----------- ShardedClientCursor::ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor ) { verify( cursor ); _cursor = cursor; _skip = q.ntoskip; _ntoreturn = q.ntoreturn; _totalSent = 0; _done = false; _id = 0; if ( q.queryOptions & QueryOption_NoCursorTimeout ) { _lastAccessMillis = 0; } else _lastAccessMillis = Listener::getElapsedTimeMillis(); } ShardedClientCursor::~ShardedClientCursor() { verify( _cursor ); delete _cursor; _cursor = 0; } long long ShardedClientCursor::getId() { if ( _id <= 0 ) { _id = cursorCache.genId(); verify( _id >= 0 ); } return _id; } int ShardedClientCursor::getTotalSent() const { return _totalSent; } void ShardedClientCursor::accessed() { if ( _lastAccessMillis > 0 ) _lastAccessMillis = Listener::getElapsedTimeMillis(); } long long ShardedClientCursor::idleTime( long long now ) { if ( _lastAccessMillis == 0 ) return 0; return now - _lastAccessMillis; } bool ShardedClientCursor::sendNextBatchAndReply( Request& r ){ BufBuilder buffer( INIT_REPLY_BUFFER_SIZE ); int docCount = 0; bool hasMore = sendNextBatch( r, _ntoreturn, buffer, docCount ); replyToQuery( 0, r.p(), r.m(), buffer.buf(), buffer.len(), docCount, _totalSent, hasMore ? getId() : 0 ); return hasMore; } bool ShardedClientCursor::sendNextBatch( Request& r , int ntoreturn , BufBuilder& buffer, int& docCount ) { uassert( 10191 , "cursor already done" , ! _done ); int maxSize = 1024 * 1024; if ( _totalSent > 0 ) maxSize *= 3; docCount = 0; // Send more if ntoreturn is 0, or any value > 1 // (one is assumed to be a single doc return, with no cursor) bool sendMore = ntoreturn == 0 || ntoreturn > 1; ntoreturn = abs( ntoreturn ); while ( _cursor->more() ) { BSONObj o = _cursor->next(); buffer.appendBuf( (void*)o.objdata() , o.objsize() ); docCount++; if ( buffer.len() > maxSize ) { break; } if ( docCount == ntoreturn ) { // soft limit aka batch size break; } if ( ntoreturn == 0 && _totalSent == 0 && docCount >= 100 ) { // first batch should be max 100 unless batch size specified break; } } bool hasMore = sendMore && _cursor->more(); LOG(5) << "\t hasMore: " << hasMore << " sendMore: " << sendMore << " cursorMore: " << _cursor->more() << " ntoreturn: " << ntoreturn << " num: " << docCount << " wouldSendMoreIfHad: " << sendMore << " id:" << getId() << " totalSent: " << _totalSent << endl; _totalSent += docCount; _done = ! hasMore; return hasMore; } // ---- CursorCache ----- long long CursorCache::TIMEOUT = 600000; CursorCache::CursorCache() :_mutex( "CursorCache" ), _shardedTotal(0) { } CursorCache::~CursorCache() { // TODO: delete old cursors? bool print = logLevel > 0; if ( _cursors.size() || _refs.size() ) print = true; if ( print ) cout << " CursorCache at shutdown - " << " sharded: " << _cursors.size() << " passthrough: " << _refs.size() << endl; } ShardedClientCursorPtr CursorCache::get( long long id ) const { LOG(_myLogLevel) << "CursorCache::get id: " << id << endl; scoped_lock lk( _mutex ); MapSharded::const_iterator i = _cursors.find( id ); if ( i == _cursors.end() ) { OCCASIONALLY log() << "Sharded CursorCache missing cursor id: " << id << endl; return ShardedClientCursorPtr(); } i->second->accessed(); return i->second; } void CursorCache::store( ShardedClientCursorPtr cursor ) { LOG(_myLogLevel) << "CursorCache::store cursor " << " id: " << cursor->getId() << endl; verify( cursor->getId() ); scoped_lock lk( _mutex ); _cursors[cursor->getId()] = cursor; _shardedTotal++; } void CursorCache::remove( long long id ) { verify( id ); scoped_lock lk( _mutex ); _cursors.erase( id ); } void CursorCache::storeRef( const string& server , long long id ) { LOG(_myLogLevel) << "CursorCache::storeRef server: " << server << " id: " << id << endl; verify( id ); scoped_lock lk( _mutex ); _refs[id] = server; } string CursorCache::getRef( long long id ) const { verify( id ); scoped_lock lk( _mutex ); MapNormal::const_iterator i = _refs.find( id ); LOG(_myLogLevel) << "CursorCache::getRef id: " << id << " out: " << ( i == _refs.end() ? " NONE " : i->second ) << endl; if ( i == _refs.end() ) return ""; return i->second; } long long CursorCache::genId() { while ( true ) { long long x = Security::getNonce(); if ( x == 0 ) continue; if ( x < 0 ) x *= -1; scoped_lock lk( _mutex ); MapSharded::iterator i = _cursors.find( x ); if ( i != _cursors.end() ) continue; MapNormal::iterator j = _refs.find( x ); if ( j != _refs.end() ) continue; return x; } } void CursorCache::gotKillCursors(Message& m ) { int *x = (int *) m.singleData()->_data; x++; // reserved int n = *x++; if ( n > 2000 ) { log( n < 30000 ? LL_WARNING : LL_ERROR ) << "receivedKillCursors, n=" << n << endl; } uassert( 13286 , "sent 0 cursors to kill" , n >= 1 ); uassert( 13287 , "too many cursors to kill" , n < 30000 ); long long * cursors = (long long *)x; for ( int i=0; isecond; _refs.erase( j ); } LOG(_myLogLevel) << "CursorCache::found gotKillCursors id: " << id << " server: " << server << endl; verify( server.size() ); scoped_ptr conn( ScopedDbConnection::getScopedDbConnection( server ) ); conn->get()->killCursor( id ); conn->done(); } } void CursorCache::appendInfo( BSONObjBuilder& result ) const { scoped_lock lk( _mutex ); result.append( "sharded" , (int)_cursors.size() ); result.appendNumber( "shardedEver" , _shardedTotal ); result.append( "refs" , (int)_refs.size() ); result.append( "totalOpen" , (int)(_cursors.size() + _refs.size() ) ); } void CursorCache::doTimeouts() { long long now = Listener::getElapsedTimeMillis(); scoped_lock lk( _mutex ); for ( MapSharded::iterator i=_cursors.begin(); i!=_cursors.end(); ++i ) { // Note: cursors with no timeout will always have an idleTime of 0 long long idleFor = i->second->idleTime( now ); if ( idleFor < TIMEOUT ) { continue; } log() << "killing old cursor " << i->second->getId() << " idle for: " << idleFor << "ms" << endl; // TODO: make log(1) _cursors.erase( i ); i = _cursors.begin(); // possible 2nd entry will get skipped, will get on next pass if ( i == _cursors.end() ) break; } } CursorCache cursorCache; const int CursorCache::_myLogLevel = 3; class CursorTimeoutTask : public task::Task { public: virtual string name() const { return "cursorTimeout"; } virtual void doWork() { cursorCache.doTimeouts(); } }; void CursorCache::startTimeoutThread() { task::repeat( new CursorTimeoutTask , 400 ); } class CmdCursorInfo : public Command { public: CmdCursorInfo() : Command( "cursorInfo", true ) {} virtual bool slaveOk() const { return true; } virtual void help( stringstream& help ) const { help << " example: { cursorInfo : 1 }"; } virtual LockType locktype() const { return NONE; } bool run(const string&, BSONObj& jsobj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { cursorCache.appendInfo( result ); if ( jsobj["setTimeout"].isNumber() ) CursorCache::TIMEOUT = jsobj["setTimeout"].numberLong(); return true; } } cmdCursorInfo; }