diff options
-rw-r--r-- | jstests/replsets/server_status_metrics.js | 60 | ||||
-rw-r--r-- | src/mongo/base/counter.h | 15 | ||||
-rw-r--r-- | src/mongo/db/namespacestring.h | 13 | ||||
-rw-r--r-- | src/mongo/db/oplogreader.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pdfile.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/prefetch.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 92 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/stats/timer_stats.h | 3 | ||||
-rw-r--r-- | src/mongo/util/queue.h | 24 |
12 files changed, 223 insertions, 62 deletions
diff --git a/jstests/replsets/server_status_metrics.js b/jstests/replsets/server_status_metrics.js new file mode 100644 index 00000000000..335b1d400a4 --- /dev/null +++ b/jstests/replsets/server_status_metrics.js @@ -0,0 +1,60 @@ +/** + * Test replication metrics + */ +function testSecondaryMetrics(secondary, opCount) { + var ss = secondary.getDB("test").serverStatus() + printjson(ss.metrics) + + assert(ss.metrics.repl.network.readersCreated > 0, "no (oplog) readers created") + assert(ss.metrics.repl.network.getmores.num > 0, "no getmores") + assert(ss.metrics.repl.network.getmores.totalMillis > 0, "no getmores time") + assert(ss.metrics.repl.network.ops == opCount, "wrong number of ops retrieved") + assert(ss.metrics.repl.network.bytes > 0, "zero or missing network bytes") + + assert(ss.metrics.repl.buffer.count >= 0, "buffer count missing") + assert(ss.metrics.repl.buffer.sizeBytes >= 0, "size (bytes)] missing") + assert(ss.metrics.repl.buffer.maxSizeBytes >= 0, "maxSize (bytes) missing") + + assert(ss.metrics.repl.preload.docs.num >= 0, "preload.docs num missing") + assert(ss.metrics.repl.preload.docs.totalMillis >= 0, "preload.docs time missing") + assert(ss.metrics.repl.preload.docs.num >= 0, "preload.indexes num missing") + assert(ss.metrics.repl.preload.indexes.totalMillis >= 0, "preload.indexes time missing") + + assert(ss.metrics.repl.apply.batches.num > 0, "no batches") + assert(ss.metrics.repl.apply.batches.totalMillis > 0, "no batch time") + assert(ss.metrics.repl.apply.ops == opCount, "wrong number of applied ops") +} + +function testPrimaryMetrics(primary, opCount) { + var ss = primary.getDB("test").serverStatus() + printjson(ss.metrics) + + assert(ss.metrics.repl.oplog.insert.num === opCount + 1, "wrong oplog insert count") + assert(ss.metrics.repl.oplog.insert.totalMillis > 0, "no oplog inserts time") + assert(ss.metrics.repl.oplog.insertBytes > 0, "no oplog inserted bytes") +} + +var rt = new ReplSetTest( { name : "server_status_metrics" , nodes: 2, oplogSize: 100 } ); +rt.startSet() +rt.initiate() + +rt.awaitSecondaryNodes(); + +var secondary = rt.getSecondary(); +var primary = rt.getPrimary(); +var testDB = primary.getDB("test"); + +//add test docs +for(x=0;x<10000;x++){ testDB.a.insert({}) } + +testPrimaryMetrics(primary, 10000); +testDB.getLastError(2); + +testSecondaryMetrics(secondary, 10000); + +testDB.a.update({}, {$set:{d:new Date()}},true, true) +testDB.getLastError(2); + +testSecondaryMetrics(secondary, 20000); + +rt.stopSet(); diff --git a/src/mongo/base/counter.h b/src/mongo/base/counter.h index c1c69b0cdb6..8fb2f4d2d51 100644 --- a/src/mongo/base/counter.h +++ b/src/mongo/base/counter.h @@ -22,12 +22,23 @@ #include "mongo/platform/cstdint.h" namespace mongo { - + /** + * A 64bit (atomic) counter. + * + * The constructor allows setting the start value, and increment([int]) is used to change it. + * + * The value can be returned using get() or the (long long) function operator. + */ class Counter64 { public: - + + /** Atomically increment (or decrement via negative value). */ void increment( uint64_t n = 1 ) { _counter.addAndFetch(n); } + + /** Atomically set value. */ + void set( uint64_t n ) { _counter.store(n); } + /** Return the current value */ long long get() const { return _counter.load(); } operator long long() const { return get(); } diff --git a/src/mongo/db/namespacestring.h b/src/mongo/db/namespacestring.h index cf2c0e64fb3..d108bbaac72 100644 --- a/src/mongo/db/namespacestring.h +++ b/src/mongo/db/namespacestring.h @@ -69,14 +69,21 @@ namespace mongo { string toString() const { return ns(); } /** - * @return true if ns is 'normal'. $ used for collections holding index data, which do not contain BSON objects in their records. - * special case for the local.oplog.$main ns -- naming it as such was a mistake. + * @return true if ns is 'normal'. A "$" is used for namespaces holding index data, + * which do not contain BSON objects in their records. ("oplog.$main" is the exception) */ static bool normal(const char* ns) { const char *p = strchr(ns, '$'); if( p == 0 ) return true; - return strcmp( ns, "local.oplog.$main" ) == 0; + return oplog(ns); + } + + /** + * @return true if the ns is an oplog one, otherwise false. + */ + static bool oplog(const char* ns) { + return StringData(ns) == StringData("local.oplog.rs") || StringData(ns) == StringData("local.oplog.$main"); } static bool special(const char *ns) { diff --git a/src/mongo/db/oplogreader.h b/src/mongo/db/oplogreader.h index 9087cb49503..d5fd5169550 100644 --- a/src/mongo/db/oplogreader.h +++ b/src/mongo/db/oplogreader.h @@ -106,6 +106,12 @@ namespace mongo { return cursor->moreInCurrentBatch(); } + int currentBatchMessageSize() { + if( NULL == cursor->getMessage() ) + return 0; + return cursor->getMessage()->size(); + } + /* old mongod's can't do the await flag... */ bool awaitCapable() { return cursor->hasResultFlag(ResultFlag_AwaitCapable); diff --git a/src/mongo/db/pdfile.cpp b/src/mongo/db/pdfile.cpp index 086ca11c15d..de0358094fc 100644 --- a/src/mongo/db/pdfile.cpp +++ b/src/mongo/db/pdfile.cpp @@ -57,12 +57,24 @@ _ disallow system* manipulations from the database. #include "mongo/util/hashtab.h" #include "mongo/util/mmap.h" #include "mongo/util/processinfo.h" +#include "mongo/db/stats/timer_stats.h" +#include "mongo/db/stats/counters.h" namespace mongo { BOOST_STATIC_ASSERT( sizeof(Extent)-4 == 48+128 ); BOOST_STATIC_ASSERT( sizeof(DataFileHeader)-4 == 8192 ); + //The oplog entries inserted + static TimerStats oplogInsertStats; + static ServerStatusMetricField<TimerStats> displayInsertedOplogEntries( + "repl.oplog.insert", + &oplogInsertStats ); + static Counter64 oplogInsertBytesStats; + static ServerStatusMetricField<Counter64> displayInsertedOplogEntryBytes( + "repl.oplog.insertBytes", + &oplogInsertBytesStats ); + bool isValidNS( const StringData& ns ) { // TODO: should check for invalid characters @@ -1769,6 +1781,14 @@ namespace mongo { << " but " << ns << " is not capped", d->isCapped() ); + //record timing on oplog inserts + boost::optional<TimerHolder> insertTimer; + //skip non-oplog collections + if (NamespaceString::oplog(ns)) { + insertTimer = boost::in_place(&oplogInsertStats); + oplogInsertBytesStats.increment(len); //record len of inserted records for oplog + } + int lenWHdr = len + Record::HeaderSize; DiskLoc loc = d->alloc(ns, lenWHdr); verify( !loc.isNull() ); diff --git a/src/mongo/db/prefetch.cpp b/src/mongo/db/prefetch.cpp index f3fc4ec2d69..73bc3047757 100644 --- a/src/mongo/db/prefetch.cpp +++ b/src/mongo/db/prefetch.cpp @@ -25,12 +25,25 @@ #include "mongo/db/jsobj.h" #include "mongo/db/namespace_details.h" #include "mongo/db/repl/rs.h" +#include "mongo/db/stats/timer_stats.h" +#include "mongo/db/commands/server_status.h" namespace mongo { // todo / idea: the prefetcher, when it fetches _id, on an upsert, will see if the record exists. if it does not, // at write time, we can just do an insert, which will be faster. + //The count (of batches) and time spent fetching pages before application + // -- meaning depends on the prefetch behavior: all, _id index, none, etc.) + static TimerStats prefetchIndexStats; + static ServerStatusMetricField<TimerStats> displayPrefetchIndexPages( + "repl.preload.indexes", + &prefetchIndexStats ); + static TimerStats prefetchDocStats; + static ServerStatusMetricField<TimerStats> displayPrefetchDocPages( + "repl.preload.docs", + &prefetchDocStats ); + // prefetch for an oplog operation void prefetchPagesForReplicatedOp(const BSONObj& op) { const char *opField; @@ -102,6 +115,7 @@ namespace mongo { return; case ReplSetImpl::PREFETCH_ID_ONLY: { + TimerHolder timer( &prefetchIndexStats); // on the update op case, the call to prefetchRecordPages will touch the _id index. // thus perhaps this option isn't very useful? int indexNo = nsd->findIdIndex(); @@ -126,6 +140,7 @@ namespace mongo { // in the process of being built int indexCount = nsd->getTotalIndexCount(); for ( int indexNo = 0; indexNo < indexCount; indexNo++ ) { + TimerHolder timer( &prefetchIndexStats); // This will page in all index pages for the given object. try { fetchIndexInserters(/*out*/unusedKeys, @@ -152,6 +167,7 @@ namespace mongo { void prefetchRecordPages(const char* ns, const BSONObj& obj) { BSONElement _id; if( obj.getObjectID(_id) ) { + TimerHolder timer(&prefetchDocStats); BSONObjBuilder builder; builder.append(_id); BSONObj result; diff --git a/src/mongo/db/repl.cpp b/src/mongo/db/repl.cpp index 02d3b2e5e60..55df1adf0d2 100644 --- a/src/mongo/db/repl.cpp +++ b/src/mongo/db/repl.cpp @@ -56,6 +56,7 @@ #include "mongo/db/instance.h" #include "mongo/db/server_parameters.h" #include "mongo/db/queryutil.h" +#include "mongo/base/counter.h" namespace mongo { @@ -257,7 +258,6 @@ namespace mongo { } } replicationInfoServerStatus; - class CmdIsMaster : public Command { public: virtual bool requiresAuth() { return false; } @@ -1207,6 +1207,13 @@ namespace mongo { return true; } + //number of readers created; + // this happens when the source source changes, a reconfig/network-error or the cursor dies + static Counter64 readersCreatedStats; + static ServerStatusMetricField<Counter64> displayReadersCreated( + "repl.network.readersCreated", + &readersCreatedStats ); + OplogReader::OplogReader( bool doHandshake ) : _doHandshake( doHandshake ) { @@ -1215,6 +1222,8 @@ namespace mongo { /* TODO: slaveOk maybe shouldn't use? */ _tailingQueryOptions |= QueryOption_AwaitData; + + readersCreatedStats.increment(); } bool OplogReader::commonConnect(const string& hostName) { diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 90c0c6942ed..17531db3cb1 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -22,6 +22,8 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/rs_sync.h" #include "mongo/util/fail_point_service.h" +#include "mongo/base/counter.h" +#include "mongo/db/stats/timer_stats.h" namespace mongo { namespace replset { @@ -30,13 +32,41 @@ namespace replset { BackgroundSync* BackgroundSync::s_instance = 0; boost::mutex BackgroundSync::s_mutex; + //The number and time spent reading batches off the network + static TimerStats getmoreReplStats; + static ServerStatusMetricField<TimerStats> displayBatchesRecieved( + "repl.network.getmores", + &getmoreReplStats ); + //The oplog entries read via the oplog reader + static Counter64 opsReadStats; + static ServerStatusMetricField<Counter64> displayOpsRead( "repl.network.ops", + &opsReadStats ); + //The bytes read via the oplog reader + static Counter64 networkByteStats; + static ServerStatusMetricField<Counter64> displayBytesRead( "repl.network.bytes", + &networkByteStats ); + + //The count of items in the buffer + static Counter64 bufferCountGauge; + static ServerStatusMetricField<Counter64> displayBufferCount( "repl.buffer.count", + &bufferCountGauge ); + //The size (bytes) of items in the buffer + static Counter64 bufferSizeGauge; + static ServerStatusMetricField<Counter64> displayBufferSize( "repl.buffer.sizeBytes", + &bufferSizeGauge ); + //The max size (bytes) of the buffer + static int bufferMaxSizeGauge = 256*1024*1024; + static ServerStatusMetricField<int> displayBufferMaxSize( "repl.buffer.maxSizeBytes", + &bufferMaxSizeGauge ); + + BackgroundSyncInterface::~BackgroundSyncInterface() {} size_t getSize(const BSONObj& o) { return o.objsize(); } - BackgroundSync::BackgroundSync() : _buffer(256*1024*1024, &getSize), + BackgroundSync::BackgroundSync() : _buffer(bufferMaxSizeGauge, &getSize), _lastOpTimeFetched(0, 0), _lastH(0), _pause(true), @@ -48,9 +78,6 @@ namespace replset { _consumedOpTime(0, 0) { } - BackgroundSync::QueueCounter::QueueCounter() : waitTime(0), numElems(0) { - } - BackgroundSync* BackgroundSync::get() { boost::unique_lock<boost::mutex> lock(s_mutex); if (s_instance == NULL && !inShutdown()) { @@ -59,18 +86,6 @@ namespace replset { return s_instance; } - BSONObj BackgroundSync::getCounters() { - BSONObjBuilder counters; - { - boost::unique_lock<boost::mutex> lock(_mutex); - counters.appendIntOrLL("waitTimeMs", _queueCounter.waitTime); - counters.append("numElems", _queueCounter.numElems); - } - // _buffer is protected by its own mutex - counters.appendNumber("numBytes", _buffer.size()); - return counters.obj(); - } - void BackgroundSync::shutdown() { notify(); } @@ -310,6 +325,7 @@ namespace replset { while (!inShutdown()) { while (!inShutdown()) { + if (!r.moreInCurrentBatch()) { if (theReplSet->gotForceSync()) { return; @@ -323,33 +339,37 @@ namespace replset { if (shouldChangeSyncTarget()) { return; } + //record time for each getmore + { + TimerHolder batchTimer(&getmoreReplStats); + r.more(); + } + //increment + networkByteStats.increment(r.currentBatchMessageSize()); - r.more(); } if (!r.more()) break; BSONObj o = r.nextSafe().getOwned(); + opsReadStats.increment(); { boost::unique_lock<boost::mutex> lock(_mutex); _appliedBuffer = false; } - Timer timer; - // the blocking queue will wait (forever) until there's room for us to push OCCASIONALLY { LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes" << rsLog; } + // the blocking queue will wait (forever) until there's room for us to push _buffer.push(o); + bufferCountGauge.increment(); + bufferSizeGauge.increment(getSize(o)); { boost::unique_lock<boost::mutex> lock(_mutex); - - // update counters - _queueCounter.waitTime += timer.millis(); - _queueCounter.numElems++; _lastH = o["h"].numberLong(); _lastOpTimeFetched = o["ts"]._opTime(); } @@ -407,14 +427,11 @@ namespace replset { } void BackgroundSync::consume() { - // this is just to get the op off the queue, it's been peeked at + // this is just to get the op off the queue, it's been peeked at // and queued for application already - _buffer.blockingPop(); - - { - boost::unique_lock<boost::mutex> lock(_mutex); - _queueCounter.numElems--; - } + BSONObj op = _buffer.blockingPop(); + bufferCountGauge.increment(-1); + bufferSizeGauge.increment(-getSize(op)); } bool BackgroundSync::isStale(OplogReader& r, BSONObj& remoteOldestOp) { @@ -549,7 +566,6 @@ namespace replset { _currentSyncTarget = NULL; _lastOpTimeFetched = OpTime(0,0); _lastH = 0; - _queueCounter.numElems = 0; _condvar.notify_all(); } @@ -586,19 +602,5 @@ namespace replset { _assumingPrimary = false; } - class ReplNetworkQueueSSS : public ServerStatusSection { - public: - ReplNetworkQueueSSS() : ServerStatusSection( "replNetworkQueue" ){} - virtual bool includeByDefault() const { return true; } - - BSONObj generateSection(const BSONElement& configElement) const { - if ( ! theReplSet ) - return BSONObj(); - - return replset::BackgroundSync::get()->getCounters(); - } - - } replNetworkQueueSSS; - } // namespace replset } // namespace mongo diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 7dafc8df56d..8459f835b4c 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -89,17 +89,10 @@ namespace replset { OplogReader _oplogMarker; // not locked, only used by notifier thread OpTime _consumedOpTime; // not locked, only used by notifier thread - struct QueueCounter { - QueueCounter(); - unsigned long long waitTime; - unsigned int numElems; - } _queueCounter; - BackgroundSync(); BackgroundSync(const BackgroundSync& s); BackgroundSync operator=(const BackgroundSync& s); - // Production thread void _producerThread(); // Adds elements to the list, up to maxSize. diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index d02d1ba9405..8eb88d34ed1 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -31,6 +31,11 @@ #include "mongo/db/repl/rs.h" #include "mongo/db/repl/rs_sync.h" #include "mongo/util/fail_point_service.h" +#include "mongo/db/commands/server_status.h" +#include "mongo/db/stats/timer_stats.h" +#include "mongo/base/counter.h" + + namespace mongo { @@ -41,6 +46,17 @@ namespace replset { MONGO_FP_DECLARE(rsSyncApplyStop); + // Number and time of each ApplyOps worker pool round + static TimerStats applyBatchStats; + static ServerStatusMetricField<TimerStats> displayOpBatchesApplied( + "repl.apply.batches", + &applyBatchStats ); + //The oplog entries applied + static Counter64 opsAppliedStats; + static ServerStatusMetricField<Counter64> displayOpsApplied( "repl.apply.ops", + &opsAppliedStats ); + + SyncTail::SyncTail(BackgroundSyncInterface *q) : Sync(""), oplogVersion(0), _networkQueue(q) {} @@ -85,6 +101,7 @@ namespace replset { // For non-initial-sync, we convert updates to upserts // to suppress errors when replaying oplog entries. bool ok = !applyOperation_inlock(op, true, convertUpdateToUpsert); + opsAppliedStats.increment(); getDur().commitIfNeeded(); return ok; @@ -197,6 +214,7 @@ namespace replset { void SyncTail::applyOps(const std::vector< std::vector<BSONObj> >& writerVectors, MultiSyncApplyFunc applyFunc) { ThreadPool& writerPool = theReplSet->getWriterPool(); + TimerHolder timer(&applyBatchStats); for (std::vector< std::vector<BSONObj> >::const_iterator it = writerVectors.begin(); it != writerVectors.end(); ++it) { diff --git a/src/mongo/db/stats/timer_stats.h b/src/mongo/db/stats/timer_stats.h index 13b814f89f9..1f4d89e455f 100644 --- a/src/mongo/db/stats/timer_stats.h +++ b/src/mongo/db/stats/timer_stats.h @@ -53,7 +53,9 @@ namespace mongo { */ class TimerHolder { public: + /** Destructor will record to TimerStats */ TimerHolder( TimerStats* stats ); + /** Will record stats if recordMillis hasn't (based on _recorded) */ ~TimerHolder(); /** @@ -61,7 +63,6 @@ namespace mongo { */ int millis() const { return _t.millis(); } - /** * records the time in the TimerStats and marks that we've * already recorded so the destructor doesn't diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h index 49ff3d27d32..0b0bd9403ad 100644 --- a/src/mongo/util/queue.h +++ b/src/mongo/util/queue.h @@ -34,9 +34,9 @@ namespace mongo { } /** - * Simple blocking queue with optional max size. - * A custom sizing function can optionally be given. By default, size is calculated as - * _queue.size(). + * Simple blocking queue with optional max size (by count or custom sizing function). + * A custom sizing function can optionally be given. By default the getSize function + * returns 1 for each item, resulting in size equaling the number of items queued. */ template<typename T> class BlockingQueue : boost::noncopyable { @@ -74,11 +74,29 @@ namespace mongo { return _queue.empty(); } + /** + * The size as measured by the size function. Default to counting each item + */ size_t size() const { scoped_lock l( _lock ); return _currentSize; } + /** + * The max size for this queue + */ + size_t maxSize() const { + return _maxSize; + } + + /** + * The number/count of items in the queue ( _queue.size() ) + */ + int count() const { + scoped_lock l( _lock ); + return _queue.size(); + } + void clear() { scoped_lock l(_lock); _queue = std::queue<T>(); |