summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/replsets/server_status_metrics.js60
-rw-r--r--src/mongo/base/counter.h15
-rw-r--r--src/mongo/db/namespacestring.h13
-rw-r--r--src/mongo/db/oplogreader.h6
-rw-r--r--src/mongo/db/pdfile.cpp20
-rw-r--r--src/mongo/db/prefetch.cpp16
-rw-r--r--src/mongo/db/repl.cpp11
-rw-r--r--src/mongo/db/repl/bgsync.cpp92
-rw-r--r--src/mongo/db/repl/bgsync.h7
-rw-r--r--src/mongo/db/repl/rs_sync.cpp18
-rw-r--r--src/mongo/db/stats/timer_stats.h3
-rw-r--r--src/mongo/util/queue.h24
12 files changed, 223 insertions, 62 deletions
diff --git a/jstests/replsets/server_status_metrics.js b/jstests/replsets/server_status_metrics.js
new file mode 100644
index 00000000000..335b1d400a4
--- /dev/null
+++ b/jstests/replsets/server_status_metrics.js
@@ -0,0 +1,60 @@
+/**
+ * Test replication metrics
+ */
+function testSecondaryMetrics(secondary, opCount) {
+ var ss = secondary.getDB("test").serverStatus()
+ printjson(ss.metrics)
+
+ assert(ss.metrics.repl.network.readersCreated > 0, "no (oplog) readers created")
+ assert(ss.metrics.repl.network.getmores.num > 0, "no getmores")
+ assert(ss.metrics.repl.network.getmores.totalMillis > 0, "no getmores time")
+ assert(ss.metrics.repl.network.ops == opCount, "wrong number of ops retrieved")
+ assert(ss.metrics.repl.network.bytes > 0, "zero or missing network bytes")
+
+ assert(ss.metrics.repl.buffer.count >= 0, "buffer count missing")
+ assert(ss.metrics.repl.buffer.sizeBytes >= 0, "size (bytes)] missing")
+ assert(ss.metrics.repl.buffer.maxSizeBytes >= 0, "maxSize (bytes) missing")
+
+ assert(ss.metrics.repl.preload.docs.num >= 0, "preload.docs num missing")
+ assert(ss.metrics.repl.preload.docs.totalMillis >= 0, "preload.docs time missing")
+ assert(ss.metrics.repl.preload.docs.num >= 0, "preload.indexes num missing")
+ assert(ss.metrics.repl.preload.indexes.totalMillis >= 0, "preload.indexes time missing")
+
+ assert(ss.metrics.repl.apply.batches.num > 0, "no batches")
+ assert(ss.metrics.repl.apply.batches.totalMillis > 0, "no batch time")
+ assert(ss.metrics.repl.apply.ops == opCount, "wrong number of applied ops")
+}
+
+function testPrimaryMetrics(primary, opCount) {
+ var ss = primary.getDB("test").serverStatus()
+ printjson(ss.metrics)
+
+ assert(ss.metrics.repl.oplog.insert.num === opCount + 1, "wrong oplog insert count")
+ assert(ss.metrics.repl.oplog.insert.totalMillis > 0, "no oplog inserts time")
+ assert(ss.metrics.repl.oplog.insertBytes > 0, "no oplog inserted bytes")
+}
+
+var rt = new ReplSetTest( { name : "server_status_metrics" , nodes: 2, oplogSize: 100 } );
+rt.startSet()
+rt.initiate()
+
+rt.awaitSecondaryNodes();
+
+var secondary = rt.getSecondary();
+var primary = rt.getPrimary();
+var testDB = primary.getDB("test");
+
+//add test docs
+for(x=0;x<10000;x++){ testDB.a.insert({}) }
+
+testPrimaryMetrics(primary, 10000);
+testDB.getLastError(2);
+
+testSecondaryMetrics(secondary, 10000);
+
+testDB.a.update({}, {$set:{d:new Date()}},true, true)
+testDB.getLastError(2);
+
+testSecondaryMetrics(secondary, 20000);
+
+rt.stopSet();
diff --git a/src/mongo/base/counter.h b/src/mongo/base/counter.h
index c1c69b0cdb6..8fb2f4d2d51 100644
--- a/src/mongo/base/counter.h
+++ b/src/mongo/base/counter.h
@@ -22,12 +22,23 @@
#include "mongo/platform/cstdint.h"
namespace mongo {
-
+ /**
+ * A 64bit (atomic) counter.
+ *
+ * The constructor allows setting the start value, and increment([int]) is used to change it.
+ *
+ * The value can be returned using get() or the (long long) function operator.
+ */
class Counter64 {
public:
-
+
+ /** Atomically increment (or decrement via negative value). */
void increment( uint64_t n = 1 ) { _counter.addAndFetch(n); }
+
+ /** Atomically set value. */
+ void set( uint64_t n ) { _counter.store(n); }
+ /** Return the current value */
long long get() const { return _counter.load(); }
operator long long() const { return get(); }
diff --git a/src/mongo/db/namespacestring.h b/src/mongo/db/namespacestring.h
index cf2c0e64fb3..d108bbaac72 100644
--- a/src/mongo/db/namespacestring.h
+++ b/src/mongo/db/namespacestring.h
@@ -69,14 +69,21 @@ namespace mongo {
string toString() const { return ns(); }
/**
- * @return true if ns is 'normal'. $ used for collections holding index data, which do not contain BSON objects in their records.
- * special case for the local.oplog.$main ns -- naming it as such was a mistake.
+ * @return true if ns is 'normal'. A "$" is used for namespaces holding index data,
+ * which do not contain BSON objects in their records. ("oplog.$main" is the exception)
*/
static bool normal(const char* ns) {
const char *p = strchr(ns, '$');
if( p == 0 )
return true;
- return strcmp( ns, "local.oplog.$main" ) == 0;
+ return oplog(ns);
+ }
+
+ /**
+ * @return true if the ns is an oplog one, otherwise false.
+ */
+ static bool oplog(const char* ns) {
+ return StringData(ns) == StringData("local.oplog.rs") || StringData(ns) == StringData("local.oplog.$main");
}
static bool special(const char *ns) {
diff --git a/src/mongo/db/oplogreader.h b/src/mongo/db/oplogreader.h
index 9087cb49503..d5fd5169550 100644
--- a/src/mongo/db/oplogreader.h
+++ b/src/mongo/db/oplogreader.h
@@ -106,6 +106,12 @@ namespace mongo {
return cursor->moreInCurrentBatch();
}
+ int currentBatchMessageSize() {
+ if( NULL == cursor->getMessage() )
+ return 0;
+ return cursor->getMessage()->size();
+ }
+
/* old mongod's can't do the await flag... */
bool awaitCapable() {
return cursor->hasResultFlag(ResultFlag_AwaitCapable);
diff --git a/src/mongo/db/pdfile.cpp b/src/mongo/db/pdfile.cpp
index 086ca11c15d..de0358094fc 100644
--- a/src/mongo/db/pdfile.cpp
+++ b/src/mongo/db/pdfile.cpp
@@ -57,12 +57,24 @@ _ disallow system* manipulations from the database.
#include "mongo/util/hashtab.h"
#include "mongo/util/mmap.h"
#include "mongo/util/processinfo.h"
+#include "mongo/db/stats/timer_stats.h"
+#include "mongo/db/stats/counters.h"
namespace mongo {
BOOST_STATIC_ASSERT( sizeof(Extent)-4 == 48+128 );
BOOST_STATIC_ASSERT( sizeof(DataFileHeader)-4 == 8192 );
+ //The oplog entries inserted
+ static TimerStats oplogInsertStats;
+ static ServerStatusMetricField<TimerStats> displayInsertedOplogEntries(
+ "repl.oplog.insert",
+ &oplogInsertStats );
+ static Counter64 oplogInsertBytesStats;
+ static ServerStatusMetricField<Counter64> displayInsertedOplogEntryBytes(
+ "repl.oplog.insertBytes",
+ &oplogInsertBytesStats );
+
bool isValidNS( const StringData& ns ) {
// TODO: should check for invalid characters
@@ -1769,6 +1781,14 @@ namespace mongo {
<< " but " << ns << " is not capped",
d->isCapped() );
+ //record timing on oplog inserts
+ boost::optional<TimerHolder> insertTimer;
+ //skip non-oplog collections
+ if (NamespaceString::oplog(ns)) {
+ insertTimer = boost::in_place(&oplogInsertStats);
+ oplogInsertBytesStats.increment(len); //record len of inserted records for oplog
+ }
+
int lenWHdr = len + Record::HeaderSize;
DiskLoc loc = d->alloc(ns, lenWHdr);
verify( !loc.isNull() );
diff --git a/src/mongo/db/prefetch.cpp b/src/mongo/db/prefetch.cpp
index f3fc4ec2d69..73bc3047757 100644
--- a/src/mongo/db/prefetch.cpp
+++ b/src/mongo/db/prefetch.cpp
@@ -25,12 +25,25 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_details.h"
#include "mongo/db/repl/rs.h"
+#include "mongo/db/stats/timer_stats.h"
+#include "mongo/db/commands/server_status.h"
namespace mongo {
// todo / idea: the prefetcher, when it fetches _id, on an upsert, will see if the record exists. if it does not,
// at write time, we can just do an insert, which will be faster.
+ //The count (of batches) and time spent fetching pages before application
+ // -- meaning depends on the prefetch behavior: all, _id index, none, etc.)
+ static TimerStats prefetchIndexStats;
+ static ServerStatusMetricField<TimerStats> displayPrefetchIndexPages(
+ "repl.preload.indexes",
+ &prefetchIndexStats );
+ static TimerStats prefetchDocStats;
+ static ServerStatusMetricField<TimerStats> displayPrefetchDocPages(
+ "repl.preload.docs",
+ &prefetchDocStats );
+
// prefetch for an oplog operation
void prefetchPagesForReplicatedOp(const BSONObj& op) {
const char *opField;
@@ -102,6 +115,7 @@ namespace mongo {
return;
case ReplSetImpl::PREFETCH_ID_ONLY:
{
+ TimerHolder timer( &prefetchIndexStats);
// on the update op case, the call to prefetchRecordPages will touch the _id index.
// thus perhaps this option isn't very useful?
int indexNo = nsd->findIdIndex();
@@ -126,6 +140,7 @@ namespace mongo {
// in the process of being built
int indexCount = nsd->getTotalIndexCount();
for ( int indexNo = 0; indexNo < indexCount; indexNo++ ) {
+ TimerHolder timer( &prefetchIndexStats);
// This will page in all index pages for the given object.
try {
fetchIndexInserters(/*out*/unusedKeys,
@@ -152,6 +167,7 @@ namespace mongo {
void prefetchRecordPages(const char* ns, const BSONObj& obj) {
BSONElement _id;
if( obj.getObjectID(_id) ) {
+ TimerHolder timer(&prefetchDocStats);
BSONObjBuilder builder;
builder.append(_id);
BSONObj result;
diff --git a/src/mongo/db/repl.cpp b/src/mongo/db/repl.cpp
index 02d3b2e5e60..55df1adf0d2 100644
--- a/src/mongo/db/repl.cpp
+++ b/src/mongo/db/repl.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/instance.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/queryutil.h"
+#include "mongo/base/counter.h"
namespace mongo {
@@ -257,7 +258,6 @@ namespace mongo {
}
} replicationInfoServerStatus;
-
class CmdIsMaster : public Command {
public:
virtual bool requiresAuth() { return false; }
@@ -1207,6 +1207,13 @@ namespace mongo {
return true;
}
+ //number of readers created;
+ // this happens when the source source changes, a reconfig/network-error or the cursor dies
+ static Counter64 readersCreatedStats;
+ static ServerStatusMetricField<Counter64> displayReadersCreated(
+ "repl.network.readersCreated",
+ &readersCreatedStats );
+
OplogReader::OplogReader( bool doHandshake ) :
_doHandshake( doHandshake ) {
@@ -1215,6 +1222,8 @@ namespace mongo {
/* TODO: slaveOk maybe shouldn't use? */
_tailingQueryOptions |= QueryOption_AwaitData;
+
+ readersCreatedStats.increment();
}
bool OplogReader::commonConnect(const string& hostName) {
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 90c0c6942ed..17531db3cb1 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -22,6 +22,8 @@
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/util/fail_point_service.h"
+#include "mongo/base/counter.h"
+#include "mongo/db/stats/timer_stats.h"
namespace mongo {
namespace replset {
@@ -30,13 +32,41 @@ namespace replset {
BackgroundSync* BackgroundSync::s_instance = 0;
boost::mutex BackgroundSync::s_mutex;
+ //The number and time spent reading batches off the network
+ static TimerStats getmoreReplStats;
+ static ServerStatusMetricField<TimerStats> displayBatchesRecieved(
+ "repl.network.getmores",
+ &getmoreReplStats );
+ //The oplog entries read via the oplog reader
+ static Counter64 opsReadStats;
+ static ServerStatusMetricField<Counter64> displayOpsRead( "repl.network.ops",
+ &opsReadStats );
+ //The bytes read via the oplog reader
+ static Counter64 networkByteStats;
+ static ServerStatusMetricField<Counter64> displayBytesRead( "repl.network.bytes",
+ &networkByteStats );
+
+ //The count of items in the buffer
+ static Counter64 bufferCountGauge;
+ static ServerStatusMetricField<Counter64> displayBufferCount( "repl.buffer.count",
+ &bufferCountGauge );
+ //The size (bytes) of items in the buffer
+ static Counter64 bufferSizeGauge;
+ static ServerStatusMetricField<Counter64> displayBufferSize( "repl.buffer.sizeBytes",
+ &bufferSizeGauge );
+ //The max size (bytes) of the buffer
+ static int bufferMaxSizeGauge = 256*1024*1024;
+ static ServerStatusMetricField<int> displayBufferMaxSize( "repl.buffer.maxSizeBytes",
+ &bufferMaxSizeGauge );
+
+
BackgroundSyncInterface::~BackgroundSyncInterface() {}
size_t getSize(const BSONObj& o) {
return o.objsize();
}
- BackgroundSync::BackgroundSync() : _buffer(256*1024*1024, &getSize),
+ BackgroundSync::BackgroundSync() : _buffer(bufferMaxSizeGauge, &getSize),
_lastOpTimeFetched(0, 0),
_lastH(0),
_pause(true),
@@ -48,9 +78,6 @@ namespace replset {
_consumedOpTime(0, 0) {
}
- BackgroundSync::QueueCounter::QueueCounter() : waitTime(0), numElems(0) {
- }
-
BackgroundSync* BackgroundSync::get() {
boost::unique_lock<boost::mutex> lock(s_mutex);
if (s_instance == NULL && !inShutdown()) {
@@ -59,18 +86,6 @@ namespace replset {
return s_instance;
}
- BSONObj BackgroundSync::getCounters() {
- BSONObjBuilder counters;
- {
- boost::unique_lock<boost::mutex> lock(_mutex);
- counters.appendIntOrLL("waitTimeMs", _queueCounter.waitTime);
- counters.append("numElems", _queueCounter.numElems);
- }
- // _buffer is protected by its own mutex
- counters.appendNumber("numBytes", _buffer.size());
- return counters.obj();
- }
-
void BackgroundSync::shutdown() {
notify();
}
@@ -310,6 +325,7 @@ namespace replset {
while (!inShutdown()) {
while (!inShutdown()) {
+
if (!r.moreInCurrentBatch()) {
if (theReplSet->gotForceSync()) {
return;
@@ -323,33 +339,37 @@ namespace replset {
if (shouldChangeSyncTarget()) {
return;
}
+ //record time for each getmore
+ {
+ TimerHolder batchTimer(&getmoreReplStats);
+ r.more();
+ }
+ //increment
+ networkByteStats.increment(r.currentBatchMessageSize());
- r.more();
}
if (!r.more())
break;
BSONObj o = r.nextSafe().getOwned();
+ opsReadStats.increment();
{
boost::unique_lock<boost::mutex> lock(_mutex);
_appliedBuffer = false;
}
- Timer timer;
- // the blocking queue will wait (forever) until there's room for us to push
OCCASIONALLY {
LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes" << rsLog;
}
+ // the blocking queue will wait (forever) until there's room for us to push
_buffer.push(o);
+ bufferCountGauge.increment();
+ bufferSizeGauge.increment(getSize(o));
{
boost::unique_lock<boost::mutex> lock(_mutex);
-
- // update counters
- _queueCounter.waitTime += timer.millis();
- _queueCounter.numElems++;
_lastH = o["h"].numberLong();
_lastOpTimeFetched = o["ts"]._opTime();
}
@@ -407,14 +427,11 @@ namespace replset {
}
void BackgroundSync::consume() {
- // this is just to get the op off the queue, it's been peeked at
+ // this is just to get the op off the queue, it's been peeked at
// and queued for application already
- _buffer.blockingPop();
-
- {
- boost::unique_lock<boost::mutex> lock(_mutex);
- _queueCounter.numElems--;
- }
+ BSONObj op = _buffer.blockingPop();
+ bufferCountGauge.increment(-1);
+ bufferSizeGauge.increment(-getSize(op));
}
bool BackgroundSync::isStale(OplogReader& r, BSONObj& remoteOldestOp) {
@@ -549,7 +566,6 @@ namespace replset {
_currentSyncTarget = NULL;
_lastOpTimeFetched = OpTime(0,0);
_lastH = 0;
- _queueCounter.numElems = 0;
_condvar.notify_all();
}
@@ -586,19 +602,5 @@ namespace replset {
_assumingPrimary = false;
}
- class ReplNetworkQueueSSS : public ServerStatusSection {
- public:
- ReplNetworkQueueSSS() : ServerStatusSection( "replNetworkQueue" ){}
- virtual bool includeByDefault() const { return true; }
-
- BSONObj generateSection(const BSONElement& configElement) const {
- if ( ! theReplSet )
- return BSONObj();
-
- return replset::BackgroundSync::get()->getCounters();
- }
-
- } replNetworkQueueSSS;
-
} // namespace replset
} // namespace mongo
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 7dafc8df56d..8459f835b4c 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -89,17 +89,10 @@ namespace replset {
OplogReader _oplogMarker; // not locked, only used by notifier thread
OpTime _consumedOpTime; // not locked, only used by notifier thread
- struct QueueCounter {
- QueueCounter();
- unsigned long long waitTime;
- unsigned int numElems;
- } _queueCounter;
-
BackgroundSync();
BackgroundSync(const BackgroundSync& s);
BackgroundSync operator=(const BackgroundSync& s);
-
// Production thread
void _producerThread();
// Adds elements to the list, up to maxSize.
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index d02d1ba9405..8eb88d34ed1 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -31,6 +31,11 @@
#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/util/fail_point_service.h"
+#include "mongo/db/commands/server_status.h"
+#include "mongo/db/stats/timer_stats.h"
+#include "mongo/base/counter.h"
+
+
namespace mongo {
@@ -41,6 +46,17 @@ namespace replset {
MONGO_FP_DECLARE(rsSyncApplyStop);
+ // Number and time of each ApplyOps worker pool round
+ static TimerStats applyBatchStats;
+ static ServerStatusMetricField<TimerStats> displayOpBatchesApplied(
+ "repl.apply.batches",
+ &applyBatchStats );
+ //The oplog entries applied
+ static Counter64 opsAppliedStats;
+ static ServerStatusMetricField<Counter64> displayOpsApplied( "repl.apply.ops",
+ &opsAppliedStats );
+
+
SyncTail::SyncTail(BackgroundSyncInterface *q) :
Sync(""), oplogVersion(0), _networkQueue(q)
{}
@@ -85,6 +101,7 @@ namespace replset {
// For non-initial-sync, we convert updates to upserts
// to suppress errors when replaying oplog entries.
bool ok = !applyOperation_inlock(op, true, convertUpdateToUpsert);
+ opsAppliedStats.increment();
getDur().commitIfNeeded();
return ok;
@@ -197,6 +214,7 @@ namespace replset {
void SyncTail::applyOps(const std::vector< std::vector<BSONObj> >& writerVectors,
MultiSyncApplyFunc applyFunc) {
ThreadPool& writerPool = theReplSet->getWriterPool();
+ TimerHolder timer(&applyBatchStats);
for (std::vector< std::vector<BSONObj> >::const_iterator it = writerVectors.begin();
it != writerVectors.end();
++it) {
diff --git a/src/mongo/db/stats/timer_stats.h b/src/mongo/db/stats/timer_stats.h
index 13b814f89f9..1f4d89e455f 100644
--- a/src/mongo/db/stats/timer_stats.h
+++ b/src/mongo/db/stats/timer_stats.h
@@ -53,7 +53,9 @@ namespace mongo {
*/
class TimerHolder {
public:
+ /** Destructor will record to TimerStats */
TimerHolder( TimerStats* stats );
+ /** Will record stats if recordMillis hasn't (based on _recorded) */
~TimerHolder();
/**
@@ -61,7 +63,6 @@ namespace mongo {
*/
int millis() const { return _t.millis(); }
-
/**
* records the time in the TimerStats and marks that we've
* already recorded so the destructor doesn't
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h
index 49ff3d27d32..0b0bd9403ad 100644
--- a/src/mongo/util/queue.h
+++ b/src/mongo/util/queue.h
@@ -34,9 +34,9 @@ namespace mongo {
}
/**
- * Simple blocking queue with optional max size.
- * A custom sizing function can optionally be given. By default, size is calculated as
- * _queue.size().
+ * Simple blocking queue with optional max size (by count or custom sizing function).
+ * A custom sizing function can optionally be given. By default the getSize function
+ * returns 1 for each item, resulting in size equaling the number of items queued.
*/
template<typename T>
class BlockingQueue : boost::noncopyable {
@@ -74,11 +74,29 @@ namespace mongo {
return _queue.empty();
}
+ /**
+ * The size as measured by the size function. Default to counting each item
+ */
size_t size() const {
scoped_lock l( _lock );
return _currentSize;
}
+ /**
+ * The max size for this queue
+ */
+ size_t maxSize() const {
+ return _maxSize;
+ }
+
+ /**
+ * The number/count of items in the queue ( _queue.size() )
+ */
+ int count() const {
+ scoped_lock l( _lock );
+ return _queue.size();
+ }
+
void clear() {
scoped_lock l(_lock);
_queue = std::queue<T>();