diff options
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r-- | src/bin/pg_basebackup/receivelog.c | 57 |
1 files changed, 22 insertions, 35 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index c6c90fb2ea..e51cac9b87 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -31,14 +31,13 @@ static char current_walfile_name[MAXPGPATH] = ""; static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; -static int64 last_fsync = -1; /* timestamp of last WAL file flush */ static bool still_sending = true; /* feedback still needs to be sent? */ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos, - int fsync_interval); + bool synchronous); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, @@ -55,8 +54,7 @@ static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, stream_stop_callback stream_stop, char *partial_suffix, XLogRecPtr *stoppos); static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, - int64 last_status, int fsync_interval, - XLogRecPtr blockpos); + int64 last_status); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); @@ -209,7 +207,6 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) progname, current_walfile_name, partial_suffix); lastFlushPosition = pos; - last_fsync = feGetCurrentTimestamp(); return true; } @@ -440,8 +437,8 @@ CheckServerVersionForStreaming(PGconn *conn) * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * - * fsync_interval controls how often we flush to the received WAL file, - * in milliseconds. + * If 'synchronous' is true, the received WAL is flushed as soon as written, + * otherwise only when the WAL file is closed. * * Note: The log position *must* be at a log segment start! */ @@ -450,7 +447,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, - int fsync_interval) + bool synchronous) { char query[128]; char slotcmd[128]; @@ -595,7 +592,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, - &stoppos, fsync_interval); + &stoppos, synchronous); if (res == NULL) goto error; @@ -760,7 +757,7 @@ static PGresult * HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, - XLogRecPtr *stoppos, int fsync_interval) + XLogRecPtr *stoppos, bool synchronous) { char *copybuf = NULL; int64 last_status = -1; @@ -784,14 +781,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, now = feGetCurrentTimestamp(); /* - * If fsync_interval has elapsed since last WAL flush and we've written - * some WAL data, flush them to disk. + * If synchronous option is true, issue sync command as soon as + * there are WAL data which has not been flushed yet. */ - if (lastFlushPosition < blockpos && - walfile != -1 && - ((fsync_interval > 0 && - feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) || - fsync_interval < 0)) + if (synchronous && lastFlushPosition < blockpos && walfile != -1) { if (fsync(walfile) != 0) { @@ -799,9 +792,15 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, progname, current_walfile_name, strerror(errno)); goto error; } - lastFlushPosition = blockpos; - last_fsync = now; + + /* + * Send feedback so that the server sees the latest WAL locations + * immediately. + */ + if (!sendFeedback(conn, blockpos, now, false)) + goto error; + last_status = now; } /* @@ -821,7 +820,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * Calculate how long send/receive loops should sleep */ sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout, - last_status, fsync_interval, blockpos); + last_status); r = CopyStreamReceive(conn, sleeptime, ©buf); while (r != 0) @@ -1244,34 +1243,22 @@ CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, */ static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, - int64 last_status, int fsync_interval, XLogRecPtr blockpos) + int64 last_status) { - int64 targettime = 0; int64 status_targettime = 0; - int64 fsync_targettime = 0; long sleeptime; if (standby_message_timeout && still_sending) status_targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); - if (fsync_interval > 0 && lastFlushPosition < blockpos) - fsync_targettime = last_fsync + - (fsync_interval - 1) * ((int64) 1000); - - if ((status_targettime < fsync_targettime && status_targettime > 0) || - fsync_targettime == 0) - targettime = status_targettime; - else - targettime = fsync_targettime; - - if (targettime > 0) + if (status_targettime > 0) { long secs; int usecs; feTimestampDifference(now, - targettime, + status_targettime, &secs, &usecs); /* Always sleep at least 1 sec */ |