diff options
Diffstat (limited to 'src/mongo/s/cursors.cpp')
-rw-r--r-- | src/mongo/s/cursors.cpp | 769 |
1 files changed, 382 insertions, 387 deletions
diff --git a/src/mongo/s/cursors.cpp b/src/mongo/s/cursors.cpp index 31f62c1e2b6..6b4b4e6f2c6 100644 --- a/src/mongo/s/cursors.cpp +++ b/src/mongo/s/cursors.cpp @@ -54,466 +54,461 @@ namespace mongo { - using std::unique_ptr; - using std::endl; - using std::string; - using std::stringstream; - - const int ShardedClientCursor::INIT_REPLY_BUFFER_SIZE = 32768; - - // Note: There is no counter for shardedEver from cursorInfo since it is deprecated - static Counter64 cursorStatsMultiTarget; - static Counter64 cursorStatsSingleTarget; - - // Simple class to report the sum total open cursors = sharded + refs - class CursorStatsSum { - public: - operator long long() const { - return get(); - } - long long get() const { - return cursorStatsMultiTarget.get() + cursorStatsSingleTarget.get(); - } - }; - - static CursorStatsSum cursorStatsTotalOpen; +using std::unique_ptr; +using std::endl; +using std::string; +using std::stringstream; + +const int ShardedClientCursor::INIT_REPLY_BUFFER_SIZE = 32768; + +// Note: There is no counter for shardedEver from cursorInfo since it is deprecated +static Counter64 cursorStatsMultiTarget; +static Counter64 cursorStatsSingleTarget; + +// Simple class to report the sum total open cursors = sharded + refs +class CursorStatsSum { +public: + operator long long() const { + return get(); + } + long long get() const { + return cursorStatsMultiTarget.get() + cursorStatsSingleTarget.get(); + } +}; - static ServerStatusMetricField<Counter64> dCursorStatsMultiTarget( "cursor.open.multiTarget", - &cursorStatsMultiTarget); - static ServerStatusMetricField<Counter64> dCursorStatsSingleTarget( "cursor.open.singleTarget", - &cursorStatsSingleTarget); - static ServerStatusMetricField<CursorStatsSum> dCursorStatsTotalOpen( "cursor.open.total", - &cursorStatsTotalOpen); +static CursorStatsSum cursorStatsTotalOpen; +static ServerStatusMetricField<Counter64> dCursorStatsMultiTarget("cursor.open.multiTarget", + &cursorStatsMultiTarget); +static ServerStatusMetricField<Counter64> dCursorStatsSingleTarget("cursor.open.singleTarget", + &cursorStatsSingleTarget); +static ServerStatusMetricField<CursorStatsSum> dCursorStatsTotalOpen("cursor.open.total", + &cursorStatsTotalOpen); - // -------- ShardedCursor ----------- - ShardedClientCursor::ShardedClientCursor( QueryMessage& q, - ParallelSortClusteredCursor * cursor ) { - verify( cursor ); - _cursor = cursor; +// -------- ShardedCursor ----------- - _skip = q.ntoskip; - _ntoreturn = q.ntoreturn; +ShardedClientCursor::ShardedClientCursor(QueryMessage& q, ParallelSortClusteredCursor* cursor) { + verify(cursor); + _cursor = cursor; - _totalSent = 0; - _done = false; + _skip = q.ntoskip; + _ntoreturn = q.ntoreturn; - _id = 0; + _totalSent = 0; + _done = false; - if ( q.queryOptions & QueryOption_NoCursorTimeout ) { - _lastAccessMillis = 0; - } - else - _lastAccessMillis = Listener::getElapsedTimeMillis(); + _id = 0; - cursorStatsMultiTarget.increment(); - } + if (q.queryOptions & QueryOption_NoCursorTimeout) { + _lastAccessMillis = 0; + } else + _lastAccessMillis = Listener::getElapsedTimeMillis(); - ShardedClientCursor::~ShardedClientCursor() { - verify( _cursor ); - delete _cursor; - _cursor = 0; - cursorStatsMultiTarget.decrement(); - } + cursorStatsMultiTarget.increment(); +} - long long ShardedClientCursor::getId() { - if ( _id <= 0 ) { - _id = cursorCache.genId(); - verify( _id >= 0 ); - } - return _id; - } +ShardedClientCursor::~ShardedClientCursor() { + verify(_cursor); + delete _cursor; + _cursor = 0; + cursorStatsMultiTarget.decrement(); +} - int ShardedClientCursor::getTotalSent() const { - return _totalSent; +long long ShardedClientCursor::getId() { + if (_id <= 0) { + _id = cursorCache.genId(); + verify(_id >= 0); } + return _id; +} - void ShardedClientCursor::accessed() { - if ( _lastAccessMillis > 0 ) - _lastAccessMillis = Listener::getElapsedTimeMillis(); - } +int ShardedClientCursor::getTotalSent() const { + return _totalSent; +} - long long ShardedClientCursor::idleTime( long long now ) { - if ( _lastAccessMillis == 0 ) - return 0; - return now - _lastAccessMillis; - } +void ShardedClientCursor::accessed() { + if (_lastAccessMillis > 0) + _lastAccessMillis = Listener::getElapsedTimeMillis(); +} - bool ShardedClientCursor::sendNextBatch(int ntoreturn, BufBuilder& buffer, int& docCount) { - uassert( 10191 , "cursor already done" , ! _done ); - - int maxSize = 1024 * 1024; - if ( _totalSent > 0 ) - maxSize *= 3; - - docCount = 0; - - // If ntoreturn is negative, it means that we should send up to -ntoreturn results - // back to the client, and that we should only send a *single batch*. An ntoreturn of - // 1 is also a special case which means "return up to 1 result in a single batch" (so - // that +1 actually has the same meaning of -1). For all other values of ntoreturn, we - // may have to return multiple batches. - const bool sendMoreBatches = ntoreturn == 0 || ntoreturn > 1; - ntoreturn = abs( ntoreturn ); - - bool cursorHasMore = true; - while ( ( cursorHasMore = _cursor->more() ) ) { - BSONObj o = _cursor->next(); - - buffer.appendBuf( (void*)o.objdata() , o.objsize() ); - docCount++; - // Ensure that the next batch will never wind up requesting more docs from the shard - // than are remaining to satisfy the initial ntoreturn. - if (ntoreturn != 0) { - _cursor->setBatchSize(ntoreturn - docCount); - } +long long ShardedClientCursor::idleTime(long long now) { + if (_lastAccessMillis == 0) + return 0; + return now - _lastAccessMillis; +} - if ( buffer.len() > maxSize ) { - break; - } +bool ShardedClientCursor::sendNextBatch(int ntoreturn, BufBuilder& buffer, int& docCount) { + uassert(10191, "cursor already done", !_done); + + int maxSize = 1024 * 1024; + if (_totalSent > 0) + maxSize *= 3; + + docCount = 0; + + // If ntoreturn is negative, it means that we should send up to -ntoreturn results + // back to the client, and that we should only send a *single batch*. An ntoreturn of + // 1 is also a special case which means "return up to 1 result in a single batch" (so + // that +1 actually has the same meaning of -1). For all other values of ntoreturn, we + // may have to return multiple batches. + const bool sendMoreBatches = ntoreturn == 0 || ntoreturn > 1; + ntoreturn = abs(ntoreturn); + + bool cursorHasMore = true; + while ((cursorHasMore = _cursor->more())) { + BSONObj o = _cursor->next(); + + buffer.appendBuf((void*)o.objdata(), o.objsize()); + docCount++; + // Ensure that the next batch will never wind up requesting more docs from the shard + // than are remaining to satisfy the initial ntoreturn. + if (ntoreturn != 0) { + _cursor->setBatchSize(ntoreturn - docCount); + } - if ( docCount == ntoreturn ) { - // soft limit aka batch size - break; - } + if (buffer.len() > maxSize) { + break; + } - if ( ntoreturn == 0 && _totalSent == 0 && docCount >= 100 ) { - // first batch should be max 100 unless batch size specified - break; - } + if (docCount == ntoreturn) { + // soft limit aka batch size + break; } - // We need to request another batch if the following two conditions hold: - // - // 1. ntoreturn is positive and not equal to 1 (see the comment above). This condition - // is stored in 'sendMoreBatches'. - // - // 2. The last call to _cursor->more() was true (i.e. we never explicitly got a false - // value from _cursor->more()). This condition is stored in 'cursorHasMore'. If the server - // hits EOF while executing a query or a getmore, it will pass a cursorId of 0 in the - // query response to indicate that there are no more results. In this case, _cursor->more() - // will be explicitly false, and we know for sure that we do not have to send more batches. - // - // On the other hand, if _cursor->more() is true there may or may not be more results. - // Suppose that the mongod generates enough results to fill this batch. In this case it - // does not know whether not there are more, because doing so would require requesting an - // extra result and seeing whether we get EOF. The mongod sends a valid cursorId to - // indicate that there may be more. We do the same here: we indicate that there may be - // more results to retrieve by setting 'hasMoreBatches' to true. - bool hasMoreBatches = sendMoreBatches && cursorHasMore; - - LOG(5) << "\t hasMoreBatches: " << hasMoreBatches - << " sendMoreBatches: " << sendMoreBatches - << " cursorHasMore: " << cursorHasMore - << " ntoreturn: " << ntoreturn - << " num: " << docCount - << " id:" << getId() - << " totalSent: " << _totalSent << endl; - - _totalSent += docCount; - _done = ! hasMoreBatches; - - return hasMoreBatches; + if (ntoreturn == 0 && _totalSent == 0 && docCount >= 100) { + // first batch should be max 100 unless batch size specified + break; + } } - // ---- CursorCache ----- + // We need to request another batch if the following two conditions hold: + // + // 1. ntoreturn is positive and not equal to 1 (see the comment above). This condition + // is stored in 'sendMoreBatches'. + // + // 2. The last call to _cursor->more() was true (i.e. we never explicitly got a false + // value from _cursor->more()). This condition is stored in 'cursorHasMore'. If the server + // hits EOF while executing a query or a getmore, it will pass a cursorId of 0 in the + // query response to indicate that there are no more results. In this case, _cursor->more() + // will be explicitly false, and we know for sure that we do not have to send more batches. + // + // On the other hand, if _cursor->more() is true there may or may not be more results. + // Suppose that the mongod generates enough results to fill this batch. In this case it + // does not know whether not there are more, because doing so would require requesting an + // extra result and seeing whether we get EOF. The mongod sends a valid cursorId to + // indicate that there may be more. We do the same here: we indicate that there may be + // more results to retrieve by setting 'hasMoreBatches' to true. + bool hasMoreBatches = sendMoreBatches && cursorHasMore; + + LOG(5) << "\t hasMoreBatches: " << hasMoreBatches << " sendMoreBatches: " << sendMoreBatches + << " cursorHasMore: " << cursorHasMore << " ntoreturn: " << ntoreturn + << " num: " << docCount << " id:" << getId() << " totalSent: " << _totalSent << endl; + + _totalSent += docCount; + _done = !hasMoreBatches; + + return hasMoreBatches; +} - long long CursorCache::TIMEOUT = 10 * 60 * 1000 /* 10 minutes */; - ExportedServerParameter<long long> cursorCacheTimeoutConfig(ServerParameterSet::getGlobal(), - "cursorTimeoutMillis", - &CursorCache::TIMEOUT, - true, true); +// ---- CursorCache ----- - unsigned getCCRandomSeed() { - unique_ptr<SecureRandom> sr( SecureRandom::create() ); - return sr->nextInt64(); - } +long long CursorCache::TIMEOUT = 10 * 60 * 1000 /* 10 minutes */; +ExportedServerParameter<long long> cursorCacheTimeoutConfig( + ServerParameterSet::getGlobal(), "cursorTimeoutMillis", &CursorCache::TIMEOUT, true, true); - CursorCache::CursorCache() - : _random( getCCRandomSeed() ), - _shardedTotal(0) { - } +unsigned getCCRandomSeed() { + unique_ptr<SecureRandom> sr(SecureRandom::create()); + return sr->nextInt64(); +} - CursorCache::~CursorCache() { - // TODO: delete old cursors? - bool print = shouldLog(logger::LogSeverity::Debug(1)); - if ( _cursors.size() || _refs.size() ) - print = true; - verify(_refs.size() == _refsNS.size()); - - if ( print ) - log() << " CursorCache at shutdown - " - << " sharded: " << _cursors.size() - << " passthrough: " << _refs.size() - << endl; - } +CursorCache::CursorCache() : _random(getCCRandomSeed()), _shardedTotal(0) {} - ShardedClientCursorPtr CursorCache::get( long long id ) const { - LOG(_myLogLevel) << "CursorCache::get id: " << id << endl; - stdx::lock_guard<stdx::mutex> lk( _mutex ); - MapSharded::const_iterator i = _cursors.find( id ); - if ( i == _cursors.end() ) { - return ShardedClientCursorPtr(); - } - i->second->accessed(); - return i->second; - } +CursorCache::~CursorCache() { + // TODO: delete old cursors? + bool print = shouldLog(logger::LogSeverity::Debug(1)); + if (_cursors.size() || _refs.size()) + print = true; + verify(_refs.size() == _refsNS.size()); - int CursorCache::getMaxTimeMS( long long id ) const { - verify( id ); - stdx::lock_guard<stdx::mutex> lk( _mutex ); - MapShardedInt::const_iterator i = _cursorsMaxTimeMS.find( id ); - return ( i != _cursorsMaxTimeMS.end() ) ? i->second : 0; - } + if (print) + log() << " CursorCache at shutdown - " + << " sharded: " << _cursors.size() << " passthrough: " << _refs.size() << endl; +} - void CursorCache::store( ShardedClientCursorPtr cursor, int maxTimeMS ) { - LOG(_myLogLevel) << "CursorCache::store cursor " << " id: " << cursor->getId() - << (maxTimeMS != kMaxTimeCursorNoTimeLimit ? str::stream() << "maxTimeMS: " << maxTimeMS - : string("")) - << endl; - verify( cursor->getId() ); - verify( maxTimeMS == kMaxTimeCursorTimeLimitExpired - || maxTimeMS == kMaxTimeCursorNoTimeLimit - || maxTimeMS > 0 ); - stdx::lock_guard<stdx::mutex> lk( _mutex ); - _cursorsMaxTimeMS[cursor->getId()] = maxTimeMS; - _cursors[cursor->getId()] = cursor; - _shardedTotal++; +ShardedClientCursorPtr CursorCache::get(long long id) const { + LOG(_myLogLevel) << "CursorCache::get id: " << id << endl; + stdx::lock_guard<stdx::mutex> lk(_mutex); + MapSharded::const_iterator i = _cursors.find(id); + if (i == _cursors.end()) { + return ShardedClientCursorPtr(); } + i->second->accessed(); + return i->second; +} - void CursorCache::updateMaxTimeMS( long long id, int maxTimeMS ) { - verify( id ); - verify( maxTimeMS == kMaxTimeCursorTimeLimitExpired - || maxTimeMS == kMaxTimeCursorNoTimeLimit - || maxTimeMS > 0 ); - stdx::lock_guard<stdx::mutex> lk( _mutex ); - _cursorsMaxTimeMS[id] = maxTimeMS; - } +int CursorCache::getMaxTimeMS(long long id) const { + verify(id); + stdx::lock_guard<stdx::mutex> lk(_mutex); + MapShardedInt::const_iterator i = _cursorsMaxTimeMS.find(id); + return (i != _cursorsMaxTimeMS.end()) ? i->second : 0; +} - void CursorCache::remove( long long id ) { - verify( id ); - stdx::lock_guard<stdx::mutex> lk( _mutex ); - _cursorsMaxTimeMS.erase( id ); - _cursors.erase( id ); - } +void CursorCache::store(ShardedClientCursorPtr cursor, int maxTimeMS) { + LOG(_myLogLevel) << "CursorCache::store cursor " + << " id: " << cursor->getId() + << (maxTimeMS != kMaxTimeCursorNoTimeLimit + ? str::stream() << "maxTimeMS: " << maxTimeMS + : string("")) << endl; + verify(cursor->getId()); + verify(maxTimeMS == kMaxTimeCursorTimeLimitExpired || maxTimeMS == kMaxTimeCursorNoTimeLimit || + maxTimeMS > 0); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _cursorsMaxTimeMS[cursor->getId()] = maxTimeMS; + _cursors[cursor->getId()] = cursor; + _shardedTotal++; +} - void CursorCache::removeRef( long long id ) { - verify( id ); - stdx::lock_guard<stdx::mutex> lk( _mutex ); - _refs.erase( id ); - _refsNS.erase( id ); - cursorStatsSingleTarget.decrement(); - } +void CursorCache::updateMaxTimeMS(long long id, int maxTimeMS) { + verify(id); + verify(maxTimeMS == kMaxTimeCursorTimeLimitExpired || maxTimeMS == kMaxTimeCursorNoTimeLimit || + maxTimeMS > 0); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _cursorsMaxTimeMS[id] = maxTimeMS; +} - void CursorCache::storeRef(const std::string& server, long long id, const std::string& ns) { - LOG(_myLogLevel) << "CursorCache::storeRef server: " << server << " id: " << id << endl; - verify( id ); - stdx::lock_guard<stdx::mutex> lk( _mutex ); - _refs[id] = server; - _refsNS[id] = ns; - cursorStatsSingleTarget.increment(); - } +void CursorCache::remove(long long id) { + verify(id); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _cursorsMaxTimeMS.erase(id); + _cursors.erase(id); +} - string CursorCache::getRef( long long id ) const { - verify( id ); - stdx::lock_guard<stdx::mutex> lk( _mutex ); - MapNormal::const_iterator i = _refs.find( id ); +void CursorCache::removeRef(long long id) { + verify(id); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _refs.erase(id); + _refsNS.erase(id); + cursorStatsSingleTarget.decrement(); +} - LOG(_myLogLevel) << "CursorCache::getRef id: " << id << " out: " << ( i == _refs.end() ? " NONE " : i->second ) << endl; +void CursorCache::storeRef(const std::string& server, long long id, const std::string& ns) { + LOG(_myLogLevel) << "CursorCache::storeRef server: " << server << " id: " << id << endl; + verify(id); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _refs[id] = server; + _refsNS[id] = ns; + cursorStatsSingleTarget.increment(); +} - if ( i == _refs.end() ) - return ""; - return i->second; - } +string CursorCache::getRef(long long id) const { + verify(id); + stdx::lock_guard<stdx::mutex> lk(_mutex); + MapNormal::const_iterator i = _refs.find(id); - std::string CursorCache::getRefNS(long long id) const { - verify(id); - stdx::lock_guard<stdx::mutex> lk(_mutex); - MapNormal::const_iterator i = _refsNS.find(id); + LOG(_myLogLevel) << "CursorCache::getRef id: " << id + << " out: " << (i == _refs.end() ? " NONE " : i->second) << endl; - LOG(_myLogLevel) << "CursorCache::getRefNs id: " << id - << " out: " << ( i == _refsNS.end() ? " NONE " : i->second ) << std::endl; + if (i == _refs.end()) + return ""; + return i->second; +} - if ( i == _refsNS.end() ) - return ""; - return i->second; - } +std::string CursorCache::getRefNS(long long id) const { + verify(id); + stdx::lock_guard<stdx::mutex> lk(_mutex); + MapNormal::const_iterator i = _refsNS.find(id); + LOG(_myLogLevel) << "CursorCache::getRefNs id: " << id + << " out: " << (i == _refsNS.end() ? " NONE " : i->second) << std::endl; - long long CursorCache::genId() { - while ( true ) { - stdx::lock_guard<stdx::mutex> lk( _mutex ); + if (i == _refsNS.end()) + return ""; + return i->second; +} - long long x = Listener::getElapsedTimeMillis() << 32; - x |= _random.nextInt32(); - if ( x == 0 ) - continue; +long long CursorCache::genId() { + while (true) { + stdx::lock_guard<stdx::mutex> lk(_mutex); - if ( x < 0 ) - x *= -1; + long long x = Listener::getElapsedTimeMillis() << 32; + x |= _random.nextInt32(); - MapSharded::iterator i = _cursors.find( x ); - if ( i != _cursors.end() ) - continue; + if (x == 0) + continue; - MapNormal::iterator j = _refs.find( x ); - if ( j != _refs.end() ) - continue; + if (x < 0) + x *= -1; - return x; - } + MapSharded::iterator i = _cursors.find(x); + if (i != _cursors.end()) + continue; + + MapNormal::iterator j = _refs.find(x); + if (j != _refs.end()) + continue; + + return x; } +} - void CursorCache::gotKillCursors(Message& m ) { - LastError::get(cc()).disable(); - DbMessage dbmessage(m); - int n = dbmessage.pullInt(); +void CursorCache::gotKillCursors(Message& m) { + LastError::get(cc()).disable(); + DbMessage dbmessage(m); + int n = dbmessage.pullInt(); - if ( n > 2000 ) { - ( n < 30000 ? warning() : error() ) << "receivedKillCursors, n=" << n << endl; - } + if (n > 2000) { + (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n << endl; + } - uassert( 13286 , "sent 0 cursors to kill" , n >= 1 ); - uassert( 13287 , "too many cursors to kill" , n < 30000 ); - massert( 18632 , str::stream() << "bad kill cursors size: " << m.dataSize(), - m.dataSize() == 8 + ( 8 * n ) ); + uassert(13286, "sent 0 cursors to kill", n >= 1); + uassert(13287, "too many cursors to kill", n < 30000); + massert(18632, + str::stream() << "bad kill cursors size: " << m.dataSize(), + m.dataSize() == 8 + (8 * n)); - ConstDataCursor cursors(dbmessage.getArray(n)); + ConstDataCursor cursors(dbmessage.getArray(n)); - ClientBasic* client = ClientBasic::getCurrent(); - AuthorizationSession* authSession = AuthorizationSession::get(client); - for ( int i=0; i<n; i++ ) { - long long id = cursors.readAndAdvance<LittleEndian<int64_t>>(); - LOG(_myLogLevel) << "CursorCache::gotKillCursors id: " << id << endl; + ClientBasic* client = ClientBasic::getCurrent(); + AuthorizationSession* authSession = AuthorizationSession::get(client); + for (int i = 0; i < n; i++) { + long long id = cursors.readAndAdvance<LittleEndian<int64_t>>(); + LOG(_myLogLevel) << "CursorCache::gotKillCursors id: " << id << endl; - if ( ! id ) { - warning() << " got cursor id of 0 to kill" << endl; - continue; - } + if (!id) { + warning() << " got cursor id of 0 to kill" << endl; + continue; + } - string server; - { - stdx::lock_guard<stdx::mutex> lk( _mutex ); - - MapSharded::iterator i = _cursors.find( id ); - if ( i != _cursors.end() ) { - Status authorizationStatus = authSession->checkAuthForKillCursors( - NamespaceString(i->second->getNS()), id); - audit::logKillCursorsAuthzCheck( - client, - NamespaceString(i->second->getNS()), - id, - authorizationStatus.isOK() ? ErrorCodes::OK : ErrorCodes::Unauthorized); - if (authorizationStatus.isOK()) { - _cursorsMaxTimeMS.erase( i->second->getId() ); - _cursors.erase( i ); - } - continue; - } + string server; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); - MapNormal::iterator refsIt = _refs.find(id); - MapNormal::iterator refsNSIt = _refsNS.find(id); - if (refsIt == _refs.end()) { - warning() << "can't find cursor: " << id << endl; - continue; - } - verify(refsNSIt != _refsNS.end()); - Status authorizationStatus = authSession->checkAuthForKillCursors( - NamespaceString(refsNSIt->second), id); + MapSharded::iterator i = _cursors.find(id); + if (i != _cursors.end()) { + Status authorizationStatus = + authSession->checkAuthForKillCursors(NamespaceString(i->second->getNS()), id); audit::logKillCursorsAuthzCheck( - client, - NamespaceString(refsNSIt->second), - id, - authorizationStatus.isOK() ? ErrorCodes::OK : ErrorCodes::Unauthorized); - if (!authorizationStatus.isOK()) { - continue; + client, + NamespaceString(i->second->getNS()), + id, + authorizationStatus.isOK() ? ErrorCodes::OK : ErrorCodes::Unauthorized); + if (authorizationStatus.isOK()) { + _cursorsMaxTimeMS.erase(i->second->getId()); + _cursors.erase(i); } - server = refsIt->second; - _refs.erase(refsIt); - _refsNS.erase(refsNSIt); - cursorStatsSingleTarget.decrement(); + continue; } - LOG(_myLogLevel) << "CursorCache::found gotKillCursors id: " << id << " server: " << server << endl; - - verify( server.size() ); - ScopedDbConnection conn(server); - conn->killCursor( id ); - conn.done(); + MapNormal::iterator refsIt = _refs.find(id); + MapNormal::iterator refsNSIt = _refsNS.find(id); + if (refsIt == _refs.end()) { + warning() << "can't find cursor: " << id << endl; + continue; + } + verify(refsNSIt != _refsNS.end()); + Status authorizationStatus = + authSession->checkAuthForKillCursors(NamespaceString(refsNSIt->second), id); + audit::logKillCursorsAuthzCheck(client, + NamespaceString(refsNSIt->second), + id, + authorizationStatus.isOK() ? ErrorCodes::OK + : ErrorCodes::Unauthorized); + if (!authorizationStatus.isOK()) { + continue; + } + server = refsIt->second; + _refs.erase(refsIt); + _refsNS.erase(refsNSIt); + cursorStatsSingleTarget.decrement(); } - } - void CursorCache::appendInfo( BSONObjBuilder& result ) const { - stdx::lock_guard<stdx::mutex> lk( _mutex ); - result.append( "sharded", static_cast<int>(cursorStatsMultiTarget.get())); - result.appendNumber( "shardedEver" , _shardedTotal ); - result.append( "refs", static_cast<int>(cursorStatsSingleTarget.get())); - result.append( "totalOpen", static_cast<int>(cursorStatsTotalOpen.get())); + LOG(_myLogLevel) << "CursorCache::found gotKillCursors id: " << id << " server: " << server + << endl; + + verify(server.size()); + ScopedDbConnection conn(server); + conn->killCursor(id); + conn.done(); } +} - void CursorCache::doTimeouts() { - long long now = Listener::getElapsedTimeMillis(); - stdx::lock_guard<stdx::mutex> lk( _mutex ); - for ( MapSharded::iterator i=_cursors.begin(); i!=_cursors.end(); ++i ) { - // Note: cursors with no timeout will always have an idleTime of 0 - long long idleFor = i->second->idleTime( now ); - if ( idleFor < TIMEOUT ) { - continue; - } - log() << "killing old cursor " << i->second->getId() << " idle for: " << idleFor << "ms" << endl; // TODO: make LOG(1) - _cursorsMaxTimeMS.erase( i->second->getId() ); - _cursors.erase( i ); - i = _cursors.begin(); // possible 2nd entry will get skipped, will get on next pass - if ( i == _cursors.end() ) - break; +void CursorCache::appendInfo(BSONObjBuilder& result) const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + result.append("sharded", static_cast<int>(cursorStatsMultiTarget.get())); + result.appendNumber("shardedEver", _shardedTotal); + result.append("refs", static_cast<int>(cursorStatsSingleTarget.get())); + result.append("totalOpen", static_cast<int>(cursorStatsTotalOpen.get())); +} + +void CursorCache::doTimeouts() { + long long now = Listener::getElapsedTimeMillis(); + stdx::lock_guard<stdx::mutex> lk(_mutex); + for (MapSharded::iterator i = _cursors.begin(); i != _cursors.end(); ++i) { + // Note: cursors with no timeout will always have an idleTime of 0 + long long idleFor = i->second->idleTime(now); + if (idleFor < TIMEOUT) { + continue; } + log() << "killing old cursor " << i->second->getId() << " idle for: " << idleFor << "ms" + << endl; // TODO: make LOG(1) + _cursorsMaxTimeMS.erase(i->second->getId()); + _cursors.erase(i); + i = _cursors.begin(); // possible 2nd entry will get skipped, will get on next pass + if (i == _cursors.end()) + break; } +} - CursorCache cursorCache; - - const int CursorCache::_myLogLevel = 3; +CursorCache cursorCache; - class CursorTimeoutTask : public task::Task { - public: - virtual string name() const { return "cursorTimeout"; } - virtual void doWork() { - cursorCache.doTimeouts(); - } - }; +const int CursorCache::_myLogLevel = 3; - void CursorCache::startTimeoutThread() { - task::repeat( new CursorTimeoutTask , 4000 ); +class CursorTimeoutTask : public task::Task { +public: + virtual string name() const { + return "cursorTimeout"; + } + virtual void doWork() { + cursorCache.doTimeouts(); } +}; - class CmdCursorInfo : public Command { - public: - CmdCursorInfo() : Command( "cursorInfo" ) {} - virtual bool slaveOk() const { return true; } - virtual void help( stringstream& help ) const { - help << " example: { cursorInfo : 1 }"; - } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::cursorInfo); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - virtual bool isWriteCommandForConfigServer() const { return false; } - bool run(OperationContext* txn, - const string&, - BSONObj& jsobj, - int, - string& errmsg, - BSONObjBuilder& result) { - cursorCache.appendInfo( result ); - if ( jsobj["setTimeout"].isNumber() ) - CursorCache::TIMEOUT = jsobj["setTimeout"].numberLong(); - return true; - } - } cmdCursorInfo; +void CursorCache::startTimeoutThread() { + task::repeat(new CursorTimeoutTask, 4000); +} +class CmdCursorInfo : public Command { +public: + CmdCursorInfo() : Command("cursorInfo") {} + virtual bool slaveOk() const { + return true; + } + virtual void help(stringstream& help) const { + help << " example: { cursorInfo : 1 }"; + } + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::cursorInfo); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + virtual bool isWriteCommandForConfigServer() const { + return false; + } + bool run(OperationContext* txn, + const string&, + BSONObj& jsobj, + int, + string& errmsg, + BSONObjBuilder& result) { + cursorCache.appendInfo(result); + if (jsobj["setTimeout"].isNumber()) + CursorCache::TIMEOUT = jsobj["setTimeout"].numberLong(); + return true; + } +} cmdCursorInfo; } |