diff options
author | DarrenJiang13 <yjjiang1996@163.com> | 2022-05-31 13:07:33 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-31 08:07:33 +0300 |
commit | bb1de082eac26d5242733eb0b40959bd9de2e15b (patch) | |
tree | c3e1125652b263c3fefea1ea0eeca33f686a12bb /src/replication.c | |
parent | 4065b4f27efc539b86beb63829bc148a02adecb1 (diff) | |
download | redis-bb1de082eac26d5242733eb0b40959bd9de2e15b.tar.gz |
Adds isolated netstats for replication. (#10062)
The amount of `server.stat_net_output_bytes/server.stat_net_input_bytes`
is actually the sum of replication flow and users' data flow.
It may cause confusions like this:
"Why does my server get such a large output_bytes while I am doing nothing? ".
After discussions and revisions, now here is the change about what this
PR brings (final version before merge):
- 2 server variables to count the network bytes during replication,
including fullsync and propagate bytes.
- `server.stat_net_repl_output_bytes`/`server.stat_net_repl_input_bytes`
- 3 info fields to print the input and output of repl bytes and instantaneous
value of total repl bytes.
- `total_net_repl_input_bytes` / `total_net_repl_output_bytes`
- `instantaneous_repl_total_kbps`
- 1 new API `rioCheckType()` to check the type of rio. So we can use this
to distinguish between diskless and diskbased replication
- 2 new counting items to keep network statistics consistent between master
and slave
- rdb portion during diskless replica. in `rdbLoadProgressCallback()`
- first line of the full sync payload. in `readSyncBulkPayload()`
Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/replication.c')
-rw-r--r-- | src/replication.c | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/src/replication.c b/src/replication.c index 87b5c138c..385efcf6c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1358,7 +1358,7 @@ void sendBulkToSlave(connection *conn) { freeClient(slave); return; } - atomicIncr(server.stat_net_output_bytes, nwritten); + atomicIncr(server.stat_net_repl_output_bytes, nwritten); sdsrange(slave->replpreamble,nwritten,-1); if (sdslen(slave->replpreamble) == 0) { sdsfree(slave->replpreamble); @@ -1387,7 +1387,7 @@ void sendBulkToSlave(connection *conn) { return; } slave->repldboff += nwritten; - atomicIncr(server.stat_net_output_bytes, nwritten); + atomicIncr(server.stat_net_repl_output_bytes, nwritten); if (slave->repldboff == slave->repldbsize) { close(slave->repldbfd); slave->repldbfd = -1; @@ -1419,7 +1419,7 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) { void rdbPipeWriteHandler(struct connection *conn) { serverAssert(server.rdb_pipe_bufflen>0); client *slave = connGetPrivateData(conn); - int nwritten; + ssize_t nwritten; if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff, server.rdb_pipe_bufflen - slave->repldboff)) == -1) { @@ -1431,7 +1431,7 @@ void rdbPipeWriteHandler(struct connection *conn) { return; } else { slave->repldboff += nwritten; - atomicIncr(server.stat_net_output_bytes, nwritten); + atomicIncr(server.stat_net_repl_output_bytes, nwritten); if (slave->repldboff < server.rdb_pipe_bufflen) { slave->repl_last_partial_write = server.unixtime; return; /* more data to write.. */ @@ -1491,7 +1491,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int stillAlive = 0; for (i=0; i < server.rdb_pipe_numconns; i++) { - int nwritten; + ssize_t nwritten; connection *conn = server.rdb_pipe_conns[i]; if (!conn) continue; @@ -1511,7 +1511,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, /* Note: when use diskless replication, 'repldboff' is the offset * of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */ slave->repldboff = nwritten; - atomicIncr(server.stat_net_output_bytes, nwritten); + atomicIncr(server.stat_net_repl_output_bytes, nwritten); } /* If we were unable to write all the data to one of the replicas, * setup write handler (and disable pipe read handler, below) */ @@ -1817,11 +1817,16 @@ void readSyncBulkPayload(connection *conn) { /* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */ if (server.repl_transfer_size == -1) { - if (connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000) == -1) { + nread = connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000); + if (nread == -1) { serverLog(LL_WARNING, "I/O error reading bulk count from MASTER: %s", strerror(errno)); goto error; + } else { + /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and + * convert "\r\n" to '\0' so 1 byte is lost. */ + atomicIncr(server.stat_net_repl_input_bytes, nread+1); } if (buf[0] == '-') { @@ -1892,7 +1897,7 @@ void readSyncBulkPayload(connection *conn) { cancelReplicationHandshake(1); return; } - atomicIncr(server.stat_net_input_bytes, nread); + atomicIncr(server.stat_net_repl_input_bytes, nread); /* When a mark is used, we want to detect EOF asap in order to avoid * writing the EOF mark into the file... */ |