summaryrefslogtreecommitdiff
path: root/src/replication.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/replication.c')
-rw-r--r--src/replication.c37
1 files changed, 30 insertions, 7 deletions
diff --git a/src/replication.c b/src/replication.c
index 55636fd77..652dbf8c0 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -111,7 +111,7 @@ int bg_unlink(const char *filename) {
errno = old_errno;
return -1;
}
- bioCreateCloseJob(fd, 0);
+ bioCreateCloseJob(fd, 0, 0);
return 0; /* Success. */
}
}
@@ -858,8 +858,10 @@ int startBgsaveForReplication(int mincapa, int req) {
if (rsiptr) {
if (socket_target)
retval = rdbSaveToSlavesSockets(req,rsiptr);
- else
- retval = rdbSaveBackground(req,server.rdb_filename,rsiptr);
+ else {
+ /* Keep the page cache since it'll get used soon */
+ retval = rdbSaveBackground(req,server.rdb_filename,rsiptr,RDBFLAGS_KEEP_CACHE);
+ }
} else {
serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
retval = C_ERR;
@@ -1350,6 +1352,29 @@ void removeRDBUsedToSyncReplicas(void) {
}
}
+/* Close the repldbfd and reclaim the page cache if the client hold
+ * the last reference to replication DB */
+void closeRepldbfd(client *myself) {
+ listNode *ln;
+ listIter li;
+ int reclaim = 1;
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ client *slave = ln->value;
+ if (slave != myself && slave->replstate == SLAVE_STATE_SEND_BULK) {
+ reclaim = 0;
+ break;
+ }
+ }
+
+ if (reclaim) {
+ bioCreateCloseJob(myself->repldbfd, 0, 1);
+ } else {
+ close(myself->repldbfd);
+ }
+ myself->repldbfd = -1;
+}
+
void sendBulkToSlave(connection *conn) {
client *slave = connGetPrivateData(conn);
char buf[PROTO_IOBUF_LEN];
@@ -1398,8 +1423,7 @@ void sendBulkToSlave(connection *conn) {
slave->repldboff += nwritten;
atomicIncr(server.stat_net_repl_output_bytes, nwritten);
if (slave->repldboff == slave->repldbsize) {
- close(slave->repldbfd);
- slave->repldbfd = -1;
+ closeRepldbfd(slave);
connSetWriteHandler(slave->conn,NULL);
if (!replicaPutOnline(slave)) {
freeClient(slave);
@@ -2164,7 +2188,7 @@ void readSyncBulkPayload(connection *conn) {
return;
}
/* Close old rdb asynchronously. */
- if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd, 0);
+ if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd, 0, 0);
/* Sync the directory to ensure rename is persisted */
if (fsyncFileDir(server.rdb_filename) == -1) {
@@ -2201,7 +2225,6 @@ void readSyncBulkPayload(connection *conn) {
}
zfree(server.repl_transfer_tmpfile);
- close(server.repl_transfer_fd);
server.repl_transfer_fd = -1;
server.repl_transfer_tmpfile = NULL;
}