summaryrefslogtreecommitdiff
path: root/src/mongo/s/cursors.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/cursors.cpp')
-rw-r--r--src/mongo/s/cursors.cpp769
1 files changed, 382 insertions, 387 deletions
diff --git a/src/mongo/s/cursors.cpp b/src/mongo/s/cursors.cpp
index 31f62c1e2b6..6b4b4e6f2c6 100644
--- a/src/mongo/s/cursors.cpp
+++ b/src/mongo/s/cursors.cpp
@@ -54,466 +54,461 @@
namespace mongo {
- using std::unique_ptr;
- using std::endl;
- using std::string;
- using std::stringstream;
-
- const int ShardedClientCursor::INIT_REPLY_BUFFER_SIZE = 32768;
-
- // Note: There is no counter for shardedEver from cursorInfo since it is deprecated
- static Counter64 cursorStatsMultiTarget;
- static Counter64 cursorStatsSingleTarget;
-
- // Simple class to report the sum total open cursors = sharded + refs
- class CursorStatsSum {
- public:
- operator long long() const {
- return get();
- }
- long long get() const {
- return cursorStatsMultiTarget.get() + cursorStatsSingleTarget.get();
- }
- };
-
- static CursorStatsSum cursorStatsTotalOpen;
+using std::unique_ptr;
+using std::endl;
+using std::string;
+using std::stringstream;
+
+const int ShardedClientCursor::INIT_REPLY_BUFFER_SIZE = 32768;
+
+// Note: There is no counter for shardedEver from cursorInfo since it is deprecated
+static Counter64 cursorStatsMultiTarget;
+static Counter64 cursorStatsSingleTarget;
+
+// Simple class to report the sum total open cursors = sharded + refs
+class CursorStatsSum {
+public:
+ operator long long() const {
+ return get();
+ }
+ long long get() const {
+ return cursorStatsMultiTarget.get() + cursorStatsSingleTarget.get();
+ }
+};
- static ServerStatusMetricField<Counter64> dCursorStatsMultiTarget( "cursor.open.multiTarget",
- &cursorStatsMultiTarget);
- static ServerStatusMetricField<Counter64> dCursorStatsSingleTarget( "cursor.open.singleTarget",
- &cursorStatsSingleTarget);
- static ServerStatusMetricField<CursorStatsSum> dCursorStatsTotalOpen( "cursor.open.total",
- &cursorStatsTotalOpen);
+static CursorStatsSum cursorStatsTotalOpen;
+static ServerStatusMetricField<Counter64> dCursorStatsMultiTarget("cursor.open.multiTarget",
+ &cursorStatsMultiTarget);
+static ServerStatusMetricField<Counter64> dCursorStatsSingleTarget("cursor.open.singleTarget",
+ &cursorStatsSingleTarget);
+static ServerStatusMetricField<CursorStatsSum> dCursorStatsTotalOpen("cursor.open.total",
+ &cursorStatsTotalOpen);
- // -------- ShardedCursor -----------
- ShardedClientCursor::ShardedClientCursor( QueryMessage& q,
- ParallelSortClusteredCursor * cursor ) {
- verify( cursor );
- _cursor = cursor;
+// -------- ShardedCursor -----------
- _skip = q.ntoskip;
- _ntoreturn = q.ntoreturn;
+ShardedClientCursor::ShardedClientCursor(QueryMessage& q, ParallelSortClusteredCursor* cursor) {
+ verify(cursor);
+ _cursor = cursor;
- _totalSent = 0;
- _done = false;
+ _skip = q.ntoskip;
+ _ntoreturn = q.ntoreturn;
- _id = 0;
+ _totalSent = 0;
+ _done = false;
- if ( q.queryOptions & QueryOption_NoCursorTimeout ) {
- _lastAccessMillis = 0;
- }
- else
- _lastAccessMillis = Listener::getElapsedTimeMillis();
+ _id = 0;
- cursorStatsMultiTarget.increment();
- }
+ if (q.queryOptions & QueryOption_NoCursorTimeout) {
+ _lastAccessMillis = 0;
+ } else
+ _lastAccessMillis = Listener::getElapsedTimeMillis();
- ShardedClientCursor::~ShardedClientCursor() {
- verify( _cursor );
- delete _cursor;
- _cursor = 0;
- cursorStatsMultiTarget.decrement();
- }
+ cursorStatsMultiTarget.increment();
+}
- long long ShardedClientCursor::getId() {
- if ( _id <= 0 ) {
- _id = cursorCache.genId();
- verify( _id >= 0 );
- }
- return _id;
- }
+ShardedClientCursor::~ShardedClientCursor() {
+ verify(_cursor);
+ delete _cursor;
+ _cursor = 0;
+ cursorStatsMultiTarget.decrement();
+}
- int ShardedClientCursor::getTotalSent() const {
- return _totalSent;
+long long ShardedClientCursor::getId() {
+ if (_id <= 0) {
+ _id = cursorCache.genId();
+ verify(_id >= 0);
}
+ return _id;
+}
- void ShardedClientCursor::accessed() {
- if ( _lastAccessMillis > 0 )
- _lastAccessMillis = Listener::getElapsedTimeMillis();
- }
+int ShardedClientCursor::getTotalSent() const {
+ return _totalSent;
+}
- long long ShardedClientCursor::idleTime( long long now ) {
- if ( _lastAccessMillis == 0 )
- return 0;
- return now - _lastAccessMillis;
- }
+void ShardedClientCursor::accessed() {
+ if (_lastAccessMillis > 0)
+ _lastAccessMillis = Listener::getElapsedTimeMillis();
+}
- bool ShardedClientCursor::sendNextBatch(int ntoreturn, BufBuilder& buffer, int& docCount) {
- uassert( 10191 , "cursor already done" , ! _done );
-
- int maxSize = 1024 * 1024;
- if ( _totalSent > 0 )
- maxSize *= 3;
-
- docCount = 0;
-
- // If ntoreturn is negative, it means that we should send up to -ntoreturn results
- // back to the client, and that we should only send a *single batch*. An ntoreturn of
- // 1 is also a special case which means "return up to 1 result in a single batch" (so
- // that +1 actually has the same meaning of -1). For all other values of ntoreturn, we
- // may have to return multiple batches.
- const bool sendMoreBatches = ntoreturn == 0 || ntoreturn > 1;
- ntoreturn = abs( ntoreturn );
-
- bool cursorHasMore = true;
- while ( ( cursorHasMore = _cursor->more() ) ) {
- BSONObj o = _cursor->next();
-
- buffer.appendBuf( (void*)o.objdata() , o.objsize() );
- docCount++;
- // Ensure that the next batch will never wind up requesting more docs from the shard
- // than are remaining to satisfy the initial ntoreturn.
- if (ntoreturn != 0) {
- _cursor->setBatchSize(ntoreturn - docCount);
- }
+long long ShardedClientCursor::idleTime(long long now) {
+ if (_lastAccessMillis == 0)
+ return 0;
+ return now - _lastAccessMillis;
+}
- if ( buffer.len() > maxSize ) {
- break;
- }
+bool ShardedClientCursor::sendNextBatch(int ntoreturn, BufBuilder& buffer, int& docCount) {
+ uassert(10191, "cursor already done", !_done);
+
+ int maxSize = 1024 * 1024;
+ if (_totalSent > 0)
+ maxSize *= 3;
+
+ docCount = 0;
+
+ // If ntoreturn is negative, it means that we should send up to -ntoreturn results
+ // back to the client, and that we should only send a *single batch*. An ntoreturn of
+ // 1 is also a special case which means "return up to 1 result in a single batch" (so
+ // that +1 actually has the same meaning of -1). For all other values of ntoreturn, we
+ // may have to return multiple batches.
+ const bool sendMoreBatches = ntoreturn == 0 || ntoreturn > 1;
+ ntoreturn = abs(ntoreturn);
+
+ bool cursorHasMore = true;
+ while ((cursorHasMore = _cursor->more())) {
+ BSONObj o = _cursor->next();
+
+ buffer.appendBuf((void*)o.objdata(), o.objsize());
+ docCount++;
+ // Ensure that the next batch will never wind up requesting more docs from the shard
+ // than are remaining to satisfy the initial ntoreturn.
+ if (ntoreturn != 0) {
+ _cursor->setBatchSize(ntoreturn - docCount);
+ }
- if ( docCount == ntoreturn ) {
- // soft limit aka batch size
- break;
- }
+ if (buffer.len() > maxSize) {
+ break;
+ }
- if ( ntoreturn == 0 && _totalSent == 0 && docCount >= 100 ) {
- // first batch should be max 100 unless batch size specified
- break;
- }
+ if (docCount == ntoreturn) {
+ // soft limit aka batch size
+ break;
}
- // We need to request another batch if the following two conditions hold:
- //
- // 1. ntoreturn is positive and not equal to 1 (see the comment above). This condition
- // is stored in 'sendMoreBatches'.
- //
- // 2. The last call to _cursor->more() was true (i.e. we never explicitly got a false
- // value from _cursor->more()). This condition is stored in 'cursorHasMore'. If the server
- // hits EOF while executing a query or a getmore, it will pass a cursorId of 0 in the
- // query response to indicate that there are no more results. In this case, _cursor->more()
- // will be explicitly false, and we know for sure that we do not have to send more batches.
- //
- // On the other hand, if _cursor->more() is true there may or may not be more results.
- // Suppose that the mongod generates enough results to fill this batch. In this case it
- // does not know whether not there are more, because doing so would require requesting an
- // extra result and seeing whether we get EOF. The mongod sends a valid cursorId to
- // indicate that there may be more. We do the same here: we indicate that there may be
- // more results to retrieve by setting 'hasMoreBatches' to true.
- bool hasMoreBatches = sendMoreBatches && cursorHasMore;
-
- LOG(5) << "\t hasMoreBatches: " << hasMoreBatches
- << " sendMoreBatches: " << sendMoreBatches
- << " cursorHasMore: " << cursorHasMore
- << " ntoreturn: " << ntoreturn
- << " num: " << docCount
- << " id:" << getId()
- << " totalSent: " << _totalSent << endl;
-
- _totalSent += docCount;
- _done = ! hasMoreBatches;
-
- return hasMoreBatches;
+ if (ntoreturn == 0 && _totalSent == 0 && docCount >= 100) {
+ // first batch should be max 100 unless batch size specified
+ break;
+ }
}
- // ---- CursorCache -----
+ // We need to request another batch if the following two conditions hold:
+ //
+ // 1. ntoreturn is positive and not equal to 1 (see the comment above). This condition
+ // is stored in 'sendMoreBatches'.
+ //
+ // 2. The last call to _cursor->more() was true (i.e. we never explicitly got a false
+ // value from _cursor->more()). This condition is stored in 'cursorHasMore'. If the server
+ // hits EOF while executing a query or a getmore, it will pass a cursorId of 0 in the
+ // query response to indicate that there are no more results. In this case, _cursor->more()
+ // will be explicitly false, and we know for sure that we do not have to send more batches.
+ //
+ // On the other hand, if _cursor->more() is true there may or may not be more results.
+ // Suppose that the mongod generates enough results to fill this batch. In this case it
+ // does not know whether not there are more, because doing so would require requesting an
+ // extra result and seeing whether we get EOF. The mongod sends a valid cursorId to
+ // indicate that there may be more. We do the same here: we indicate that there may be
+ // more results to retrieve by setting 'hasMoreBatches' to true.
+ bool hasMoreBatches = sendMoreBatches && cursorHasMore;
+
+ LOG(5) << "\t hasMoreBatches: " << hasMoreBatches << " sendMoreBatches: " << sendMoreBatches
+ << " cursorHasMore: " << cursorHasMore << " ntoreturn: " << ntoreturn
+ << " num: " << docCount << " id:" << getId() << " totalSent: " << _totalSent << endl;
+
+ _totalSent += docCount;
+ _done = !hasMoreBatches;
+
+ return hasMoreBatches;
+}
- long long CursorCache::TIMEOUT = 10 * 60 * 1000 /* 10 minutes */;
- ExportedServerParameter<long long> cursorCacheTimeoutConfig(ServerParameterSet::getGlobal(),
- "cursorTimeoutMillis",
- &CursorCache::TIMEOUT,
- true, true);
+// ---- CursorCache -----
- unsigned getCCRandomSeed() {
- unique_ptr<SecureRandom> sr( SecureRandom::create() );
- return sr->nextInt64();
- }
+long long CursorCache::TIMEOUT = 10 * 60 * 1000 /* 10 minutes */;
+ExportedServerParameter<long long> cursorCacheTimeoutConfig(
+ ServerParameterSet::getGlobal(), "cursorTimeoutMillis", &CursorCache::TIMEOUT, true, true);
- CursorCache::CursorCache()
- : _random( getCCRandomSeed() ),
- _shardedTotal(0) {
- }
+unsigned getCCRandomSeed() {
+ unique_ptr<SecureRandom> sr(SecureRandom::create());
+ return sr->nextInt64();
+}
- CursorCache::~CursorCache() {
- // TODO: delete old cursors?
- bool print = shouldLog(logger::LogSeverity::Debug(1));
- if ( _cursors.size() || _refs.size() )
- print = true;
- verify(_refs.size() == _refsNS.size());
-
- if ( print )
- log() << " CursorCache at shutdown - "
- << " sharded: " << _cursors.size()
- << " passthrough: " << _refs.size()
- << endl;
- }
+CursorCache::CursorCache() : _random(getCCRandomSeed()), _shardedTotal(0) {}
- ShardedClientCursorPtr CursorCache::get( long long id ) const {
- LOG(_myLogLevel) << "CursorCache::get id: " << id << endl;
- stdx::lock_guard<stdx::mutex> lk( _mutex );
- MapSharded::const_iterator i = _cursors.find( id );
- if ( i == _cursors.end() ) {
- return ShardedClientCursorPtr();
- }
- i->second->accessed();
- return i->second;
- }
+CursorCache::~CursorCache() {
+ // TODO: delete old cursors?
+ bool print = shouldLog(logger::LogSeverity::Debug(1));
+ if (_cursors.size() || _refs.size())
+ print = true;
+ verify(_refs.size() == _refsNS.size());
- int CursorCache::getMaxTimeMS( long long id ) const {
- verify( id );
- stdx::lock_guard<stdx::mutex> lk( _mutex );
- MapShardedInt::const_iterator i = _cursorsMaxTimeMS.find( id );
- return ( i != _cursorsMaxTimeMS.end() ) ? i->second : 0;
- }
+ if (print)
+ log() << " CursorCache at shutdown - "
+ << " sharded: " << _cursors.size() << " passthrough: " << _refs.size() << endl;
+}
- void CursorCache::store( ShardedClientCursorPtr cursor, int maxTimeMS ) {
- LOG(_myLogLevel) << "CursorCache::store cursor " << " id: " << cursor->getId()
- << (maxTimeMS != kMaxTimeCursorNoTimeLimit ? str::stream() << "maxTimeMS: " << maxTimeMS
- : string(""))
- << endl;
- verify( cursor->getId() );
- verify( maxTimeMS == kMaxTimeCursorTimeLimitExpired
- || maxTimeMS == kMaxTimeCursorNoTimeLimit
- || maxTimeMS > 0 );
- stdx::lock_guard<stdx::mutex> lk( _mutex );
- _cursorsMaxTimeMS[cursor->getId()] = maxTimeMS;
- _cursors[cursor->getId()] = cursor;
- _shardedTotal++;
+ShardedClientCursorPtr CursorCache::get(long long id) const {
+ LOG(_myLogLevel) << "CursorCache::get id: " << id << endl;
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ MapSharded::const_iterator i = _cursors.find(id);
+ if (i == _cursors.end()) {
+ return ShardedClientCursorPtr();
}
+ i->second->accessed();
+ return i->second;
+}
- void CursorCache::updateMaxTimeMS( long long id, int maxTimeMS ) {
- verify( id );
- verify( maxTimeMS == kMaxTimeCursorTimeLimitExpired
- || maxTimeMS == kMaxTimeCursorNoTimeLimit
- || maxTimeMS > 0 );
- stdx::lock_guard<stdx::mutex> lk( _mutex );
- _cursorsMaxTimeMS[id] = maxTimeMS;
- }
+int CursorCache::getMaxTimeMS(long long id) const {
+ verify(id);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ MapShardedInt::const_iterator i = _cursorsMaxTimeMS.find(id);
+ return (i != _cursorsMaxTimeMS.end()) ? i->second : 0;
+}
- void CursorCache::remove( long long id ) {
- verify( id );
- stdx::lock_guard<stdx::mutex> lk( _mutex );
- _cursorsMaxTimeMS.erase( id );
- _cursors.erase( id );
- }
+void CursorCache::store(ShardedClientCursorPtr cursor, int maxTimeMS) {
+ LOG(_myLogLevel) << "CursorCache::store cursor "
+ << " id: " << cursor->getId()
+ << (maxTimeMS != kMaxTimeCursorNoTimeLimit
+ ? str::stream() << "maxTimeMS: " << maxTimeMS
+ : string("")) << endl;
+ verify(cursor->getId());
+ verify(maxTimeMS == kMaxTimeCursorTimeLimitExpired || maxTimeMS == kMaxTimeCursorNoTimeLimit ||
+ maxTimeMS > 0);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _cursorsMaxTimeMS[cursor->getId()] = maxTimeMS;
+ _cursors[cursor->getId()] = cursor;
+ _shardedTotal++;
+}
- void CursorCache::removeRef( long long id ) {
- verify( id );
- stdx::lock_guard<stdx::mutex> lk( _mutex );
- _refs.erase( id );
- _refsNS.erase( id );
- cursorStatsSingleTarget.decrement();
- }
+void CursorCache::updateMaxTimeMS(long long id, int maxTimeMS) {
+ verify(id);
+ verify(maxTimeMS == kMaxTimeCursorTimeLimitExpired || maxTimeMS == kMaxTimeCursorNoTimeLimit ||
+ maxTimeMS > 0);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _cursorsMaxTimeMS[id] = maxTimeMS;
+}
- void CursorCache::storeRef(const std::string& server, long long id, const std::string& ns) {
- LOG(_myLogLevel) << "CursorCache::storeRef server: " << server << " id: " << id << endl;
- verify( id );
- stdx::lock_guard<stdx::mutex> lk( _mutex );
- _refs[id] = server;
- _refsNS[id] = ns;
- cursorStatsSingleTarget.increment();
- }
+void CursorCache::remove(long long id) {
+ verify(id);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _cursorsMaxTimeMS.erase(id);
+ _cursors.erase(id);
+}
- string CursorCache::getRef( long long id ) const {
- verify( id );
- stdx::lock_guard<stdx::mutex> lk( _mutex );
- MapNormal::const_iterator i = _refs.find( id );
+void CursorCache::removeRef(long long id) {
+ verify(id);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _refs.erase(id);
+ _refsNS.erase(id);
+ cursorStatsSingleTarget.decrement();
+}
- LOG(_myLogLevel) << "CursorCache::getRef id: " << id << " out: " << ( i == _refs.end() ? " NONE " : i->second ) << endl;
+void CursorCache::storeRef(const std::string& server, long long id, const std::string& ns) {
+ LOG(_myLogLevel) << "CursorCache::storeRef server: " << server << " id: " << id << endl;
+ verify(id);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _refs[id] = server;
+ _refsNS[id] = ns;
+ cursorStatsSingleTarget.increment();
+}
- if ( i == _refs.end() )
- return "";
- return i->second;
- }
+string CursorCache::getRef(long long id) const {
+ verify(id);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ MapNormal::const_iterator i = _refs.find(id);
- std::string CursorCache::getRefNS(long long id) const {
- verify(id);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- MapNormal::const_iterator i = _refsNS.find(id);
+ LOG(_myLogLevel) << "CursorCache::getRef id: " << id
+ << " out: " << (i == _refs.end() ? " NONE " : i->second) << endl;
- LOG(_myLogLevel) << "CursorCache::getRefNs id: " << id
- << " out: " << ( i == _refsNS.end() ? " NONE " : i->second ) << std::endl;
+ if (i == _refs.end())
+ return "";
+ return i->second;
+}
- if ( i == _refsNS.end() )
- return "";
- return i->second;
- }
+std::string CursorCache::getRefNS(long long id) const {
+ verify(id);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ MapNormal::const_iterator i = _refsNS.find(id);
+ LOG(_myLogLevel) << "CursorCache::getRefNs id: " << id
+ << " out: " << (i == _refsNS.end() ? " NONE " : i->second) << std::endl;
- long long CursorCache::genId() {
- while ( true ) {
- stdx::lock_guard<stdx::mutex> lk( _mutex );
+ if (i == _refsNS.end())
+ return "";
+ return i->second;
+}
- long long x = Listener::getElapsedTimeMillis() << 32;
- x |= _random.nextInt32();
- if ( x == 0 )
- continue;
+long long CursorCache::genId() {
+ while (true) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- if ( x < 0 )
- x *= -1;
+ long long x = Listener::getElapsedTimeMillis() << 32;
+ x |= _random.nextInt32();
- MapSharded::iterator i = _cursors.find( x );
- if ( i != _cursors.end() )
- continue;
+ if (x == 0)
+ continue;
- MapNormal::iterator j = _refs.find( x );
- if ( j != _refs.end() )
- continue;
+ if (x < 0)
+ x *= -1;
- return x;
- }
+ MapSharded::iterator i = _cursors.find(x);
+ if (i != _cursors.end())
+ continue;
+
+ MapNormal::iterator j = _refs.find(x);
+ if (j != _refs.end())
+ continue;
+
+ return x;
}
+}
- void CursorCache::gotKillCursors(Message& m ) {
- LastError::get(cc()).disable();
- DbMessage dbmessage(m);
- int n = dbmessage.pullInt();
+void CursorCache::gotKillCursors(Message& m) {
+ LastError::get(cc()).disable();
+ DbMessage dbmessage(m);
+ int n = dbmessage.pullInt();
- if ( n > 2000 ) {
- ( n < 30000 ? warning() : error() ) << "receivedKillCursors, n=" << n << endl;
- }
+ if (n > 2000) {
+ (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n << endl;
+ }
- uassert( 13286 , "sent 0 cursors to kill" , n >= 1 );
- uassert( 13287 , "too many cursors to kill" , n < 30000 );
- massert( 18632 , str::stream() << "bad kill cursors size: " << m.dataSize(),
- m.dataSize() == 8 + ( 8 * n ) );
+ uassert(13286, "sent 0 cursors to kill", n >= 1);
+ uassert(13287, "too many cursors to kill", n < 30000);
+ massert(18632,
+ str::stream() << "bad kill cursors size: " << m.dataSize(),
+ m.dataSize() == 8 + (8 * n));
- ConstDataCursor cursors(dbmessage.getArray(n));
+ ConstDataCursor cursors(dbmessage.getArray(n));
- ClientBasic* client = ClientBasic::getCurrent();
- AuthorizationSession* authSession = AuthorizationSession::get(client);
- for ( int i=0; i<n; i++ ) {
- long long id = cursors.readAndAdvance<LittleEndian<int64_t>>();
- LOG(_myLogLevel) << "CursorCache::gotKillCursors id: " << id << endl;
+ ClientBasic* client = ClientBasic::getCurrent();
+ AuthorizationSession* authSession = AuthorizationSession::get(client);
+ for (int i = 0; i < n; i++) {
+ long long id = cursors.readAndAdvance<LittleEndian<int64_t>>();
+ LOG(_myLogLevel) << "CursorCache::gotKillCursors id: " << id << endl;
- if ( ! id ) {
- warning() << " got cursor id of 0 to kill" << endl;
- continue;
- }
+ if (!id) {
+ warning() << " got cursor id of 0 to kill" << endl;
+ continue;
+ }
- string server;
- {
- stdx::lock_guard<stdx::mutex> lk( _mutex );
-
- MapSharded::iterator i = _cursors.find( id );
- if ( i != _cursors.end() ) {
- Status authorizationStatus = authSession->checkAuthForKillCursors(
- NamespaceString(i->second->getNS()), id);
- audit::logKillCursorsAuthzCheck(
- client,
- NamespaceString(i->second->getNS()),
- id,
- authorizationStatus.isOK() ? ErrorCodes::OK : ErrorCodes::Unauthorized);
- if (authorizationStatus.isOK()) {
- _cursorsMaxTimeMS.erase( i->second->getId() );
- _cursors.erase( i );
- }
- continue;
- }
+ string server;
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- MapNormal::iterator refsIt = _refs.find(id);
- MapNormal::iterator refsNSIt = _refsNS.find(id);
- if (refsIt == _refs.end()) {
- warning() << "can't find cursor: " << id << endl;
- continue;
- }
- verify(refsNSIt != _refsNS.end());
- Status authorizationStatus = authSession->checkAuthForKillCursors(
- NamespaceString(refsNSIt->second), id);
+ MapSharded::iterator i = _cursors.find(id);
+ if (i != _cursors.end()) {
+ Status authorizationStatus =
+ authSession->checkAuthForKillCursors(NamespaceString(i->second->getNS()), id);
audit::logKillCursorsAuthzCheck(
- client,
- NamespaceString(refsNSIt->second),
- id,
- authorizationStatus.isOK() ? ErrorCodes::OK : ErrorCodes::Unauthorized);
- if (!authorizationStatus.isOK()) {
- continue;
+ client,
+ NamespaceString(i->second->getNS()),
+ id,
+ authorizationStatus.isOK() ? ErrorCodes::OK : ErrorCodes::Unauthorized);
+ if (authorizationStatus.isOK()) {
+ _cursorsMaxTimeMS.erase(i->second->getId());
+ _cursors.erase(i);
}
- server = refsIt->second;
- _refs.erase(refsIt);
- _refsNS.erase(refsNSIt);
- cursorStatsSingleTarget.decrement();
+ continue;
}
- LOG(_myLogLevel) << "CursorCache::found gotKillCursors id: " << id << " server: " << server << endl;
-
- verify( server.size() );
- ScopedDbConnection conn(server);
- conn->killCursor( id );
- conn.done();
+ MapNormal::iterator refsIt = _refs.find(id);
+ MapNormal::iterator refsNSIt = _refsNS.find(id);
+ if (refsIt == _refs.end()) {
+ warning() << "can't find cursor: " << id << endl;
+ continue;
+ }
+ verify(refsNSIt != _refsNS.end());
+ Status authorizationStatus =
+ authSession->checkAuthForKillCursors(NamespaceString(refsNSIt->second), id);
+ audit::logKillCursorsAuthzCheck(client,
+ NamespaceString(refsNSIt->second),
+ id,
+ authorizationStatus.isOK() ? ErrorCodes::OK
+ : ErrorCodes::Unauthorized);
+ if (!authorizationStatus.isOK()) {
+ continue;
+ }
+ server = refsIt->second;
+ _refs.erase(refsIt);
+ _refsNS.erase(refsNSIt);
+ cursorStatsSingleTarget.decrement();
}
- }
- void CursorCache::appendInfo( BSONObjBuilder& result ) const {
- stdx::lock_guard<stdx::mutex> lk( _mutex );
- result.append( "sharded", static_cast<int>(cursorStatsMultiTarget.get()));
- result.appendNumber( "shardedEver" , _shardedTotal );
- result.append( "refs", static_cast<int>(cursorStatsSingleTarget.get()));
- result.append( "totalOpen", static_cast<int>(cursorStatsTotalOpen.get()));
+ LOG(_myLogLevel) << "CursorCache::found gotKillCursors id: " << id << " server: " << server
+ << endl;
+
+ verify(server.size());
+ ScopedDbConnection conn(server);
+ conn->killCursor(id);
+ conn.done();
}
+}
- void CursorCache::doTimeouts() {
- long long now = Listener::getElapsedTimeMillis();
- stdx::lock_guard<stdx::mutex> lk( _mutex );
- for ( MapSharded::iterator i=_cursors.begin(); i!=_cursors.end(); ++i ) {
- // Note: cursors with no timeout will always have an idleTime of 0
- long long idleFor = i->second->idleTime( now );
- if ( idleFor < TIMEOUT ) {
- continue;
- }
- log() << "killing old cursor " << i->second->getId() << " idle for: " << idleFor << "ms" << endl; // TODO: make LOG(1)
- _cursorsMaxTimeMS.erase( i->second->getId() );
- _cursors.erase( i );
- i = _cursors.begin(); // possible 2nd entry will get skipped, will get on next pass
- if ( i == _cursors.end() )
- break;
+void CursorCache::appendInfo(BSONObjBuilder& result) const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ result.append("sharded", static_cast<int>(cursorStatsMultiTarget.get()));
+ result.appendNumber("shardedEver", _shardedTotal);
+ result.append("refs", static_cast<int>(cursorStatsSingleTarget.get()));
+ result.append("totalOpen", static_cast<int>(cursorStatsTotalOpen.get()));
+}
+
+void CursorCache::doTimeouts() {
+ long long now = Listener::getElapsedTimeMillis();
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ for (MapSharded::iterator i = _cursors.begin(); i != _cursors.end(); ++i) {
+ // Note: cursors with no timeout will always have an idleTime of 0
+ long long idleFor = i->second->idleTime(now);
+ if (idleFor < TIMEOUT) {
+ continue;
}
+ log() << "killing old cursor " << i->second->getId() << " idle for: " << idleFor << "ms"
+ << endl; // TODO: make LOG(1)
+ _cursorsMaxTimeMS.erase(i->second->getId());
+ _cursors.erase(i);
+ i = _cursors.begin(); // possible 2nd entry will get skipped, will get on next pass
+ if (i == _cursors.end())
+ break;
}
+}
- CursorCache cursorCache;
-
- const int CursorCache::_myLogLevel = 3;
+CursorCache cursorCache;
- class CursorTimeoutTask : public task::Task {
- public:
- virtual string name() const { return "cursorTimeout"; }
- virtual void doWork() {
- cursorCache.doTimeouts();
- }
- };
+const int CursorCache::_myLogLevel = 3;
- void CursorCache::startTimeoutThread() {
- task::repeat( new CursorTimeoutTask , 4000 );
+class CursorTimeoutTask : public task::Task {
+public:
+ virtual string name() const {
+ return "cursorTimeout";
+ }
+ virtual void doWork() {
+ cursorCache.doTimeouts();
}
+};
- class CmdCursorInfo : public Command {
- public:
- CmdCursorInfo() : Command( "cursorInfo" ) {}
- virtual bool slaveOk() const { return true; }
- virtual void help( stringstream& help ) const {
- help << " example: { cursorInfo : 1 }";
- }
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::cursorInfo);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
- virtual bool isWriteCommandForConfigServer() const { return false; }
- bool run(OperationContext* txn,
- const string&,
- BSONObj& jsobj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- cursorCache.appendInfo( result );
- if ( jsobj["setTimeout"].isNumber() )
- CursorCache::TIMEOUT = jsobj["setTimeout"].numberLong();
- return true;
- }
- } cmdCursorInfo;
+void CursorCache::startTimeoutThread() {
+ task::repeat(new CursorTimeoutTask, 4000);
+}
+class CmdCursorInfo : public Command {
+public:
+ CmdCursorInfo() : Command("cursorInfo") {}
+ virtual bool slaveOk() const {
+ return true;
+ }
+ virtual void help(stringstream& help) const {
+ help << " example: { cursorInfo : 1 }";
+ }
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::cursorInfo);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
+ }
+ bool run(OperationContext* txn,
+ const string&,
+ BSONObj& jsobj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ cursorCache.appendInfo(result);
+ if (jsobj["setTimeout"].isNumber())
+ CursorCache::TIMEOUT = jsobj["setTimeout"].numberLong();
+ return true;
+ }
+} cmdCursorInfo;
}