summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartins Grunskis <martins@grunskis.com>2018-10-29 09:50:31 +0100
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2019-03-30 21:23:11 +0000
commitf946042a79fc340d249589ff6acb8fdcb671e9f1 (patch)
tree559d5324728f0d7d35db82316a2c47f950c2ca67
parent3eecf34beaa0fa2deea9f22baf3b657db412a404 (diff)
downloadpsycopg2-f946042a79fc340d249589ff6acb8fdcb671e9f1.tar.gz
Store WAL end pointer in the replication cursor
-rw-r--r--doc/src/extras.rst8
-rw-r--r--psycopg/pqpath.c5
-rw-r--r--psycopg/replication_cursor.h2
-rw-r--r--psycopg/replication_cursor_type.c15
-rwxr-xr-xtests/test_replication.py1
5 files changed, 29 insertions, 2 deletions
diff --git a/doc/src/extras.rst b/doc/src/extras.rst
index 13bd1bb..e203660 100644
--- a/doc/src/extras.rst
+++ b/doc/src/extras.rst
@@ -481,6 +481,12 @@ The individual messages in the replication stream are represented by
communication with the server (a data or keepalive message in either
direction).
+ .. attribute:: wal_end
+
+ LSN position of the current end of WAL on the server at the
+ moment of last data or keepalive message received from the
+ server.
+
An actual example of asynchronous operation might look like this::
from select import select
@@ -500,7 +506,7 @@ The individual messages in the replication stream are represented by
try:
sel = select([cur], [], [], max(0, timeout))
if not any(sel):
- cur.send_feedback() # timed out, send keepalive message
+ cur.send_feedback(flush_lsn=cur.wal_end) # timed out, send keepalive message
except InterruptedError:
pass # recalculate timeout and continue
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index 3aca0ed..558d93c 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -1559,6 +1559,10 @@ retry:
goto exit;
}
+ wal_end = fe_recvint64(buffer + 1);
+ Dprintf("pq_read_replication_message: wal_end="XLOGFMTSTR, XLOGFMTARGS(wal_end));
+ repl->wal_end = wal_end;
+
reply = buffer[hdr];
if (reply && pq_send_replication_feedback(repl, 0) < 0) {
goto exit;
@@ -1573,6 +1577,7 @@ retry:
goto exit;
}
+ repl->wal_end = wal_end;
ret = 0;
exit:
diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h
index b07e49c..e896426 100644
--- a/psycopg/replication_cursor.h
+++ b/psycopg/replication_cursor.h
@@ -44,6 +44,8 @@ typedef struct replicationCursorObject {
struct timeval last_io; /* timestamp of the last exchange with the server */
struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */
+ XLogRecPtr wal_end; /* WAL end pointer from the last exchange with the server */
+
XLogRecPtr write_lsn; /* LSNs for replication feedback messages */
XLogRecPtr flush_lsn;
XLogRecPtr apply_lsn;
diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c
index 678dc73..4522076 100644
--- a/psycopg/replication_cursor_type.c
+++ b/psycopg/replication_cursor_type.c
@@ -228,6 +228,17 @@ repl_curs_get_io_timestamp(replicationCursorObject *self)
return res;
}
+/* object member list */
+
+#define OFFSETOF(x) offsetof(replicationCursorObject, x)
+
+static struct PyMemberDef replicationCursorObject_members[] = {
+ {"wal_end", T_ULONGLONG, OFFSETOF(wal_end), READONLY,
+ "LSN position of the current end of WAL on the server."},
+ {NULL}
+};
+
+
/* object method list */
static struct PyMethodDef replicationCursorObject_methods[] = {
@@ -259,6 +270,8 @@ replicationCursor_init(PyObject *obj, PyObject *args, PyObject *kwargs)
self->consuming = 0;
self->decode = 0;
+ self->wal_end = 0;
+
self->write_lsn = 0;
self->flush_lsn = 0;
self->apply_lsn = 0;
@@ -308,7 +321,7 @@ PyTypeObject replicationCursorType = {
0, /*tp_iter*/
0, /*tp_iternext*/
replicationCursorObject_methods, /*tp_methods*/
- 0, /*tp_members*/
+ replicationCursorObject_members, /*tp_members*/
replicationCursorObject_getsets, /*tp_getset*/
&cursorType, /*tp_base*/
0, /*tp_dict*/
diff --git a/tests/test_replication.py b/tests/test_replication.py
index 697d53c..15d47f6 100755
--- a/tests/test_replication.py
+++ b/tests/test_replication.py
@@ -242,6 +242,7 @@ class AsyncReplicationTest(ReplicationTestCase):
def consume(msg):
# just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg))
+ "%s: %s" % (cur.wal_end, repr(msg))
self.msg_count += 1
if self.msg_count > 3: