diff options
author | Martins Grunskis <martins@grunskis.com> | 2018-10-29 09:50:31 +0100 |
---|---|---|
committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2019-03-30 21:23:11 +0000 |
commit | f946042a79fc340d249589ff6acb8fdcb671e9f1 (patch) | |
tree | 559d5324728f0d7d35db82316a2c47f950c2ca67 | |
parent | 3eecf34beaa0fa2deea9f22baf3b657db412a404 (diff) | |
download | psycopg2-f946042a79fc340d249589ff6acb8fdcb671e9f1.tar.gz |
Store WAL end pointer in the replication cursor
-rw-r--r-- | doc/src/extras.rst | 8 | ||||
-rw-r--r-- | psycopg/pqpath.c | 5 | ||||
-rw-r--r-- | psycopg/replication_cursor.h | 2 | ||||
-rw-r--r-- | psycopg/replication_cursor_type.c | 15 | ||||
-rwxr-xr-x | tests/test_replication.py | 1 |
5 files changed, 29 insertions, 2 deletions
diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 13bd1bb..e203660 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -481,6 +481,12 @@ The individual messages in the replication stream are represented by communication with the server (a data or keepalive message in either direction). + .. attribute:: wal_end + + LSN position of the current end of WAL on the server at the + moment of last data or keepalive message received from the + server. + An actual example of asynchronous operation might look like this:: from select import select @@ -500,7 +506,7 @@ The individual messages in the replication stream are represented by try: sel = select([cur], [], [], max(0, timeout)) if not any(sel): - cur.send_feedback() # timed out, send keepalive message + cur.send_feedback(flush_lsn=cur.wal_end) # timed out, send keepalive message except InterruptedError: pass # recalculate timeout and continue diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 3aca0ed..558d93c 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1559,6 +1559,10 @@ retry: goto exit; } + wal_end = fe_recvint64(buffer + 1); + Dprintf("pq_read_replication_message: wal_end="XLOGFMTSTR, XLOGFMTARGS(wal_end)); + repl->wal_end = wal_end; + reply = buffer[hdr]; if (reply && pq_send_replication_feedback(repl, 0) < 0) { goto exit; @@ -1573,6 +1577,7 @@ retry: goto exit; } + repl->wal_end = wal_end; ret = 0; exit: diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h index b07e49c..e896426 100644 --- a/psycopg/replication_cursor.h +++ b/psycopg/replication_cursor.h @@ -44,6 +44,8 @@ typedef struct replicationCursorObject { struct timeval last_io; /* timestamp of the last exchange with the server */ struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */ + XLogRecPtr wal_end; /* WAL end pointer from the last exchange with the server */ + XLogRecPtr write_lsn; /* LSNs for replication feedback messages */ XLogRecPtr flush_lsn; XLogRecPtr apply_lsn; diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c index 678dc73..4522076 100644 --- a/psycopg/replication_cursor_type.c +++ b/psycopg/replication_cursor_type.c @@ -228,6 +228,17 @@ repl_curs_get_io_timestamp(replicationCursorObject *self) return res; } +/* object member list */ + +#define OFFSETOF(x) offsetof(replicationCursorObject, x) + +static struct PyMemberDef replicationCursorObject_members[] = { + {"wal_end", T_ULONGLONG, OFFSETOF(wal_end), READONLY, + "LSN position of the current end of WAL on the server."}, + {NULL} +}; + + /* object method list */ static struct PyMethodDef replicationCursorObject_methods[] = { @@ -259,6 +270,8 @@ replicationCursor_init(PyObject *obj, PyObject *args, PyObject *kwargs) self->consuming = 0; self->decode = 0; + self->wal_end = 0; + self->write_lsn = 0; self->flush_lsn = 0; self->apply_lsn = 0; @@ -308,7 +321,7 @@ PyTypeObject replicationCursorType = { 0, /*tp_iter*/ 0, /*tp_iternext*/ replicationCursorObject_methods, /*tp_methods*/ - 0, /*tp_members*/ + replicationCursorObject_members, /*tp_members*/ replicationCursorObject_getsets, /*tp_getset*/ &cursorType, /*tp_base*/ 0, /*tp_dict*/ diff --git a/tests/test_replication.py b/tests/test_replication.py index 697d53c..15d47f6 100755 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -242,6 +242,7 @@ class AsyncReplicationTest(ReplicationTestCase): def consume(msg): # just check the methods "%s: %s" % (cur.io_timestamp, repr(msg)) + "%s: %s" % (cur.wal_end, repr(msg)) self.msg_count += 1 if self.msg_count > 3: |