summaryrefslogtreecommitdiff
path: root/src/replication.c
diff options
context:
space:
mode:
authorDarrenJiang13 <yjjiang1996@163.com>2022-05-31 13:07:33 +0800
committerGitHub <noreply@github.com>2022-05-31 08:07:33 +0300
commitbb1de082eac26d5242733eb0b40959bd9de2e15b (patch)
treec3e1125652b263c3fefea1ea0eeca33f686a12bb /src/replication.c
parent4065b4f27efc539b86beb63829bc148a02adecb1 (diff)
downloadredis-bb1de082eac26d5242733eb0b40959bd9de2e15b.tar.gz
Adds isolated netstats for replication. (#10062)
The amount of `server.stat_net_output_bytes/server.stat_net_input_bytes` is actually the sum of replication flow and users' data flow. It may cause confusions like this: "Why does my server get such a large output_bytes while I am doing nothing? ". After discussions and revisions, now here is the change about what this PR brings (final version before merge): - 2 server variables to count the network bytes during replication, including fullsync and propagate bytes. - `server.stat_net_repl_output_bytes`/`server.stat_net_repl_input_bytes` - 3 info fields to print the input and output of repl bytes and instantaneous value of total repl bytes. - `total_net_repl_input_bytes` / `total_net_repl_output_bytes` - `instantaneous_repl_total_kbps` - 1 new API `rioCheckType()` to check the type of rio. So we can use this to distinguish between diskless and diskbased replication - 2 new counting items to keep network statistics consistent between master and slave - rdb portion during diskless replica. in `rdbLoadProgressCallback()` - first line of the full sync payload. in `readSyncBulkPayload()` Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/replication.c')
-rw-r--r--src/replication.c21
1 files changed, 13 insertions, 8 deletions
diff --git a/src/replication.c b/src/replication.c
index 87b5c138c..385efcf6c 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1358,7 +1358,7 @@ void sendBulkToSlave(connection *conn) {
freeClient(slave);
return;
}
- atomicIncr(server.stat_net_output_bytes, nwritten);
+ atomicIncr(server.stat_net_repl_output_bytes, nwritten);
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
@@ -1387,7 +1387,7 @@ void sendBulkToSlave(connection *conn) {
return;
}
slave->repldboff += nwritten;
- atomicIncr(server.stat_net_output_bytes, nwritten);
+ atomicIncr(server.stat_net_repl_output_bytes, nwritten);
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
@@ -1419,7 +1419,7 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
void rdbPipeWriteHandler(struct connection *conn) {
serverAssert(server.rdb_pipe_bufflen>0);
client *slave = connGetPrivateData(conn);
- int nwritten;
+ ssize_t nwritten;
if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff,
server.rdb_pipe_bufflen - slave->repldboff)) == -1)
{
@@ -1431,7 +1431,7 @@ void rdbPipeWriteHandler(struct connection *conn) {
return;
} else {
slave->repldboff += nwritten;
- atomicIncr(server.stat_net_output_bytes, nwritten);
+ atomicIncr(server.stat_net_repl_output_bytes, nwritten);
if (slave->repldboff < server.rdb_pipe_bufflen) {
slave->repl_last_partial_write = server.unixtime;
return; /* more data to write.. */
@@ -1491,7 +1491,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
int stillAlive = 0;
for (i=0; i < server.rdb_pipe_numconns; i++)
{
- int nwritten;
+ ssize_t nwritten;
connection *conn = server.rdb_pipe_conns[i];
if (!conn)
continue;
@@ -1511,7 +1511,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
/* Note: when use diskless replication, 'repldboff' is the offset
* of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */
slave->repldboff = nwritten;
- atomicIncr(server.stat_net_output_bytes, nwritten);
+ atomicIncr(server.stat_net_repl_output_bytes, nwritten);
}
/* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */
@@ -1817,11 +1817,16 @@ void readSyncBulkPayload(connection *conn) {
/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
if (server.repl_transfer_size == -1) {
- if (connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000) == -1) {
+ nread = connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000);
+ if (nread == -1) {
serverLog(LL_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
+ } else {
+ /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and
+ * convert "\r\n" to '\0' so 1 byte is lost. */
+ atomicIncr(server.stat_net_repl_input_bytes, nread+1);
}
if (buf[0] == '-') {
@@ -1892,7 +1897,7 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake(1);
return;
}
- atomicIncr(server.stat_net_input_bytes, nread);
+ atomicIncr(server.stat_net_repl_input_bytes, nread);
/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */