diff options
author | grunskis-bonial <martins.grunskis@bonial.com> | 2018-10-30 14:29:09 +0100 |
---|---|---|
committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2019-03-30 21:23:13 +0000 |
commit | ff91ad5186f2989a26aed38feccb87fe0410e1c9 (patch) | |
tree | 75128ee18c5958164f6be57938fafa3317ad6d6f | |
parent | f946042a79fc340d249589ff6acb8fdcb671e9f1 (diff) | |
download | psycopg2-ff91ad5186f2989a26aed38feccb87fe0410e1c9.tar.gz |
Address code review feedback
-rw-r--r-- | doc/src/extras.rst | 15 | ||||
-rw-r--r-- | psycopg/pqpath.c | 3 | ||||
-rw-r--r-- | psycopg/replication_cursor.h | 4 |
3 files changed, 18 insertions, 4 deletions
diff --git a/doc/src/extras.rst b/doc/src/extras.rst index e203660..8c0de15 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -506,10 +506,23 @@ The individual messages in the replication stream are represented by try: sel = select([cur], [], [], max(0, timeout)) if not any(sel): - cur.send_feedback(flush_lsn=cur.wal_end) # timed out, send keepalive message + cur.send_feedback() # timed out, send keepalive message except InterruptedError: pass # recalculate timeout and continue + .. warning:: + + The ``consume({msg})`` function will only be called when + there are new database writes on the server e.g. any DML or + DDL statement. Depending on your Postgres cluster + configuration this might cause the server to run out of disk + space if the writes are far apart. To prevent this from + happening you can use `~ReplicationCursor.wal_end` value to + periodically send feedback to the server to notify that your + replication client has received and processed all the + messages. + + .. index:: pair: Cursor; Replication diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 558d93c..26667eb 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1550,6 +1550,8 @@ retry: (*msg)->data_start = data_start; (*msg)->wal_end = wal_end; (*msg)->send_time = send_time; + + repl->wal_end = wal_end; } else if (buffer[0] == 'k') { /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */ @@ -1577,7 +1579,6 @@ retry: goto exit; } - repl->wal_end = wal_end; ret = 0; exit: diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h index e896426..db549c8 100644 --- a/psycopg/replication_cursor.h +++ b/psycopg/replication_cursor.h @@ -44,11 +44,11 @@ typedef struct replicationCursorObject { struct timeval last_io; /* timestamp of the last exchange with the server */ struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */ - XLogRecPtr wal_end; /* WAL end pointer from the last exchange with the server */ - XLogRecPtr write_lsn; /* LSNs for replication feedback messages */ XLogRecPtr flush_lsn; XLogRecPtr apply_lsn; + + XLogRecPtr wal_end; /* WAL end pointer from the last exchange with the server */ } replicationCursorObject; |