summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDarrenJiang13 <yjjiang1996@163.com>2022-05-31 13:07:33 +0800
committerGitHub <noreply@github.com>2022-05-31 08:07:33 +0300
commitbb1de082eac26d5242733eb0b40959bd9de2e15b (patch)
treec3e1125652b263c3fefea1ea0eeca33f686a12bb
parent4065b4f27efc539b86beb63829bc148a02adecb1 (diff)
downloadredis-bb1de082eac26d5242733eb0b40959bd9de2e15b.tar.gz
Adds isolated netstats for replication. (#10062)
The amount of `server.stat_net_output_bytes/server.stat_net_input_bytes` is actually the sum of replication flow and users' data flow. It may cause confusions like this: "Why does my server get such a large output_bytes while I am doing nothing? ". After discussions and revisions, now here is the change about what this PR brings (final version before merge): - 2 server variables to count the network bytes during replication, including fullsync and propagate bytes. - `server.stat_net_repl_output_bytes`/`server.stat_net_repl_input_bytes` - 3 info fields to print the input and output of repl bytes and instantaneous value of total repl bytes. - `total_net_repl_input_bytes` / `total_net_repl_output_bytes` - `instantaneous_repl_total_kbps` - 1 new API `rioCheckType()` to check the type of rio. So we can use this to distinguish between diskless and diskbased replication - 2 new counting items to keep network statistics consistent between master and slave - rdb portion during diskless replica. in `rdbLoadProgressCallback()` - first line of the full sync payload. in `readSyncBulkPayload()` Co-authored-by: Oran Agra <oran@redislabs.com>
-rw-r--r--src/networking.c17
-rw-r--r--src/rdb.c3
-rw-r--r--src/replication.c21
-rw-r--r--src/rio.c14
-rw-r--r--src/rio.h7
-rw-r--r--src/server.c24
-rw-r--r--src/server.h7
7 files changed, 75 insertions, 18 deletions
diff --git a/src/networking.c b/src/networking.c
index 1260be067..c1d6aa4bd 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1953,7 +1953,13 @@ int writeToClient(client *c, int handler_installed) {
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
- atomicIncr(server.stat_net_output_bytes, totwritten);
+
+ if (getClientType(c) == CLIENT_TYPE_SLAVE) {
+ atomicIncr(server.stat_net_repl_output_bytes, totwritten);
+ } else {
+ atomicIncr(server.stat_net_output_bytes, totwritten);
+ }
+
if (nwritten == -1) {
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE,
@@ -2655,8 +2661,13 @@ void readQueryFromClient(connection *conn) {
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->lastinteraction = server.unixtime;
- if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
- atomicIncr(server.stat_net_input_bytes, nread);
+ if (c->flags & CLIENT_MASTER) {
+ c->read_reploff += nread;
+ atomicIncr(server.stat_net_repl_input_bytes, nread);
+ } else {
+ atomicIncr(server.stat_net_input_bytes, nread);
+ }
+
if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
diff --git a/src/rdb.c b/src/rdb.c
index 62ec5bbb2..1faecd2f3 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -2782,6 +2782,9 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
processEventsWhileBlocked();
processModuleLoadingProgressEvent(0);
}
+ if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) {
+ atomicIncr(server.stat_net_repl_input_bytes, len);
+ }
}
/* Save the given functions_ctx to the rdb.
diff --git a/src/replication.c b/src/replication.c
index 87b5c138c..385efcf6c 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1358,7 +1358,7 @@ void sendBulkToSlave(connection *conn) {
freeClient(slave);
return;
}
- atomicIncr(server.stat_net_output_bytes, nwritten);
+ atomicIncr(server.stat_net_repl_output_bytes, nwritten);
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
@@ -1387,7 +1387,7 @@ void sendBulkToSlave(connection *conn) {
return;
}
slave->repldboff += nwritten;
- atomicIncr(server.stat_net_output_bytes, nwritten);
+ atomicIncr(server.stat_net_repl_output_bytes, nwritten);
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
@@ -1419,7 +1419,7 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
void rdbPipeWriteHandler(struct connection *conn) {
serverAssert(server.rdb_pipe_bufflen>0);
client *slave = connGetPrivateData(conn);
- int nwritten;
+ ssize_t nwritten;
if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff,
server.rdb_pipe_bufflen - slave->repldboff)) == -1)
{
@@ -1431,7 +1431,7 @@ void rdbPipeWriteHandler(struct connection *conn) {
return;
} else {
slave->repldboff += nwritten;
- atomicIncr(server.stat_net_output_bytes, nwritten);
+ atomicIncr(server.stat_net_repl_output_bytes, nwritten);
if (slave->repldboff < server.rdb_pipe_bufflen) {
slave->repl_last_partial_write = server.unixtime;
return; /* more data to write.. */
@@ -1491,7 +1491,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
int stillAlive = 0;
for (i=0; i < server.rdb_pipe_numconns; i++)
{
- int nwritten;
+ ssize_t nwritten;
connection *conn = server.rdb_pipe_conns[i];
if (!conn)
continue;
@@ -1511,7 +1511,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
/* Note: when use diskless replication, 'repldboff' is the offset
* of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */
slave->repldboff = nwritten;
- atomicIncr(server.stat_net_output_bytes, nwritten);
+ atomicIncr(server.stat_net_repl_output_bytes, nwritten);
}
/* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */
@@ -1817,11 +1817,16 @@ void readSyncBulkPayload(connection *conn) {
/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
if (server.repl_transfer_size == -1) {
- if (connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000) == -1) {
+ nread = connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000);
+ if (nread == -1) {
serverLog(LL_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
+ } else {
+ /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and
+ * convert "\r\n" to '\0' so 1 byte is lost. */
+ atomicIncr(server.stat_net_repl_input_bytes, nread+1);
}
if (buf[0] == '-') {
@@ -1892,7 +1897,7 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake(1);
return;
}
- atomicIncr(server.stat_net_input_bytes, nread);
+ atomicIncr(server.stat_net_repl_input_bytes, nread);
/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */
diff --git a/src/rio.c b/src/rio.c
index f99913152..bcda3767b 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -438,6 +438,20 @@ void rioSetAutoSync(rio *r, off_t bytes) {
r->io.file.autosync = bytes;
}
+/* Check the type of rio. */
+uint8_t rioCheckType(rio *r) {
+ if (r->read == rioFileRead) {
+ return RIO_TYPE_FILE;
+ } else if (r->read == rioBufferRead) {
+ return RIO_TYPE_BUFFER;
+ } else if (r->read == rioConnRead) {
+ return RIO_TYPE_CONN;
+ } else {
+ /* r->read == rioFdRead */
+ return RIO_TYPE_FD;
+ }
+}
+
/* --------------------------- Higher level interface --------------------------
*
* The following higher level functions use lower level rio.c functions to help
diff --git a/src/rio.h b/src/rio.h
index 9576335e8..51738366a 100644
--- a/src/rio.h
+++ b/src/rio.h
@@ -40,6 +40,11 @@
#define RIO_FLAG_READ_ERROR (1<<0)
#define RIO_FLAG_WRITE_ERROR (1<<1)
+#define RIO_TYPE_FILE (1<<0)
+#define RIO_TYPE_BUFFER (1<<1)
+#define RIO_TYPE_CONN (1<<2)
+#define RIO_TYPE_FD (1<<3)
+
struct _rio {
/* Backend functions.
* Since this functions do not tolerate short writes or reads the return
@@ -174,5 +179,5 @@ int rioWriteBulkObject(rio *r, struct redisObject *obj);
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
void rioSetAutoSync(rio *r, off_t bytes);
-
+uint8_t rioCheckType(rio *r);
#endif
diff --git a/src/server.c b/src/server.c
index 75dbab0d2..231ec5648 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1188,14 +1188,19 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
run_with_period(100) {
long long stat_net_input_bytes, stat_net_output_bytes;
+ long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
+ atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes);
+ atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes);
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
- stat_net_input_bytes);
+ stat_net_input_bytes + stat_net_repl_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
- stat_net_output_bytes);
+ stat_net_output_bytes + stat_net_repl_output_bytes);
+ trackInstantaneousMetric(STATS_METRIC_NET_TOTAL_REPLICATION,
+ stat_net_repl_input_bytes + stat_net_repl_output_bytes);
}
/* We have just LRU_BITS bits per object for LRU information.
@@ -2356,6 +2361,8 @@ void resetServerStats(void) {
server.stat_aofrw_consecutive_failures = 0;
atomicSet(server.stat_net_input_bytes, 0);
atomicSet(server.stat_net_output_bytes, 0);
+ atomicSet(server.stat_net_repl_input_bytes, 0);
+ atomicSet(server.stat_net_repl_output_bytes, 0);
server.stat_unexpected_error_replies = 0;
server.stat_total_error_replies = 0;
server.stat_dump_payload_sanitizations = 0;
@@ -5576,6 +5583,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
if (all_sections || (dictFind(section_dict,"stats") != NULL)) {
long long stat_total_reads_processed, stat_total_writes_processed;
long long stat_net_input_bytes, stat_net_output_bytes;
+ long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
long long current_eviction_exceeded_time = server.stat_last_eviction_exceeded_time ?
(long long) elapsedUs(server.stat_last_eviction_exceeded_time): 0;
long long current_active_defrag_time = server.stat_last_active_defrag_time ?
@@ -5584,6 +5592,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
atomicGet(server.stat_total_writes_processed, stat_total_writes_processed);
atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
+ atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes);
+ atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes);
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info,
@@ -5593,8 +5603,11 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"instantaneous_ops_per_sec:%lld\r\n"
"total_net_input_bytes:%lld\r\n"
"total_net_output_bytes:%lld\r\n"
+ "total_net_repl_input_bytes:%lld\r\n"
+ "total_net_repl_output_bytes:%lld\r\n"
"instantaneous_input_kbps:%.2f\r\n"
"instantaneous_output_kbps:%.2f\r\n"
+ "instantaneous_repl_total_kbps:%.2f\r\n"
"rejected_connections:%lld\r\n"
"sync_full:%lld\r\n"
"sync_partial_ok:%lld\r\n"
@@ -5636,10 +5649,13 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
- stat_net_input_bytes,
- stat_net_output_bytes,
+ stat_net_input_bytes + stat_net_repl_input_bytes,
+ stat_net_output_bytes + stat_net_repl_output_bytes,
+ stat_net_repl_input_bytes,
+ stat_net_repl_output_bytes,
(float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
(float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
+ (float)getInstantaneousMetric(STATS_METRIC_NET_TOTAL_REPLICATION)/1024,
server.stat_rejected_conn,
server.stat_sync_full,
server.stat_sync_partial_ok,
diff --git a/src/server.h b/src/server.h
index b34a84cd7..e76053513 100644
--- a/src/server.h
+++ b/src/server.h
@@ -155,9 +155,10 @@ typedef long long ustime_t; /* microsecond time type. */
/* Instantaneous metrics tracking. */
#define STATS_METRIC_SAMPLES 16 /* Number of samples per metric. */
#define STATS_METRIC_COMMAND 0 /* Number of commands executed. */
-#define STATS_METRIC_NET_INPUT 1 /* Bytes read to network .*/
+#define STATS_METRIC_NET_INPUT 1 /* Bytes read to network. */
#define STATS_METRIC_NET_OUTPUT 2 /* Bytes written to network. */
-#define STATS_METRIC_COUNT 3
+#define STATS_METRIC_NET_TOTAL_REPLICATION 3 /* Bytes written and read to network during replication. */
+#define STATS_METRIC_COUNT 4
/* Protocol and I/O related defines */
#define PROTO_IOBUF_LEN (1024*16) /* Generic I/O buffer size */
@@ -1586,6 +1587,8 @@ struct redisServer {
struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */
redisAtomic long long stat_net_input_bytes; /* Bytes read from network. */
redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */
+ redisAtomic long long stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */
+ redisAtomic long long stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */
size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */
size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */
monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */