summaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/receivelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r--src/bin/pg_basebackup/receivelog.c57
1 files changed, 22 insertions, 35 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index c6c90fb2ea..e51cac9b87 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -31,14 +31,13 @@ static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
-static int64 last_fsync = -1; /* timestamp of last WAL file flush */
static bool still_sending = true; /* feedback still needs to be sent? */
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos,
- int fsync_interval);
+ bool synchronous);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
@@ -55,8 +54,7 @@ static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
stream_stop_callback stream_stop,
char *partial_suffix, XLogRecPtr *stoppos);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
- int64 last_status, int fsync_interval,
- XLogRecPtr blockpos);
+ int64 last_status);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
@@ -209,7 +207,6 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
progname, current_walfile_name, partial_suffix);
lastFlushPosition = pos;
- last_fsync = feGetCurrentTimestamp();
return true;
}
@@ -440,8 +437,8 @@ CheckServerVersionForStreaming(PGconn *conn)
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
- * fsync_interval controls how often we flush to the received WAL file,
- * in milliseconds.
+ * If 'synchronous' is true, the received WAL is flushed as soon as written,
+ * otherwise only when the WAL file is closed.
*
* Note: The log position *must* be at a log segment start!
*/
@@ -450,7 +447,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
- int fsync_interval)
+ bool synchronous)
{
char query[128];
char slotcmd[128];
@@ -595,7 +592,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
- &stoppos, fsync_interval);
+ &stoppos, synchronous);
if (res == NULL)
goto error;
@@ -760,7 +757,7 @@ static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
- XLogRecPtr *stoppos, int fsync_interval)
+ XLogRecPtr *stoppos, bool synchronous)
{
char *copybuf = NULL;
int64 last_status = -1;
@@ -784,14 +781,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
now = feGetCurrentTimestamp();
/*
- * If fsync_interval has elapsed since last WAL flush and we've written
- * some WAL data, flush them to disk.
+ * If synchronous option is true, issue sync command as soon as
+ * there are WAL data which has not been flushed yet.
*/
- if (lastFlushPosition < blockpos &&
- walfile != -1 &&
- ((fsync_interval > 0 &&
- feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) ||
- fsync_interval < 0))
+ if (synchronous && lastFlushPosition < blockpos && walfile != -1)
{
if (fsync(walfile) != 0)
{
@@ -799,9 +792,15 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
progname, current_walfile_name, strerror(errno));
goto error;
}
-
lastFlushPosition = blockpos;
- last_fsync = now;
+
+ /*
+ * Send feedback so that the server sees the latest WAL locations
+ * immediately.
+ */
+ if (!sendFeedback(conn, blockpos, now, false))
+ goto error;
+ last_status = now;
}
/*
@@ -821,7 +820,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Calculate how long send/receive loops should sleep
*/
sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
- last_status, fsync_interval, blockpos);
+ last_status);
r = CopyStreamReceive(conn, sleeptime, &copybuf);
while (r != 0)
@@ -1244,34 +1243,22 @@ CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
*/
static long
CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
- int64 last_status, int fsync_interval, XLogRecPtr blockpos)
+ int64 last_status)
{
- int64 targettime = 0;
int64 status_targettime = 0;
- int64 fsync_targettime = 0;
long sleeptime;
if (standby_message_timeout && still_sending)
status_targettime = last_status +
(standby_message_timeout - 1) * ((int64) 1000);
- if (fsync_interval > 0 && lastFlushPosition < blockpos)
- fsync_targettime = last_fsync +
- (fsync_interval - 1) * ((int64) 1000);
-
- if ((status_targettime < fsync_targettime && status_targettime > 0) ||
- fsync_targettime == 0)
- targettime = status_targettime;
- else
- targettime = fsync_targettime;
-
- if (targettime > 0)
+ if (status_targettime > 0)
{
long secs;
int usecs;
feTimestampDifference(now,
- targettime,
+ status_targettime,
&secs,
&usecs);
/* Always sleep at least 1 sec */