summaryrefslogtreecommitdiff
path: root/psycopg
diff options
context:
space:
mode:
authorOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-10-19 20:00:39 +0200
committerOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-10-19 20:00:39 +0200
commit0bb81fc84811134bca70b59daa4661bd0697f2ff (patch)
tree4b66a805179645ffd6fbad36a8cc442e6b384602 /psycopg
parent7aea2cef6e42c961fadac61f19b570bdf8c61401 (diff)
downloadpsycopg2-0bb81fc84811134bca70b59daa4661bd0697f2ff.tar.gz
Properly subclass ReplicationCursor on C level.
Diffstat (limited to 'psycopg')
-rw-r--r--psycopg/cursor.h27
-rw-r--r--psycopg/cursor_type.c235
-rw-r--r--psycopg/pqpath.c97
-rw-r--r--psycopg/pqpath.h8
-rw-r--r--psycopg/psycopgmodule.c9
-rw-r--r--psycopg/replication_cursor.h77
-rw-r--r--psycopg/replication_cursor_type.c360
-rw-r--r--psycopg/replication_message_type.c2
8 files changed, 505 insertions, 310 deletions
diff --git a/psycopg/cursor.h b/psycopg/cursor.h
index 669e176..18e31e5 100644
--- a/psycopg/cursor.h
+++ b/psycopg/cursor.h
@@ -27,7 +27,6 @@
#define PSYCOPG_CURSOR_H 1
#include "psycopg/connection.h"
-#include "libpq_support.h"
#ifdef __cplusplus
extern "C" {
@@ -74,14 +73,6 @@ struct cursorObject {
#define DEFAULT_COPYBUFF 8192
/* replication cursor attrs */
- int repl_started:1; /* if replication is started */
- int repl_consuming:1; /* if running the consume loop */
- struct timeval repl_keepalive_interval; /* interval for keepalive messages in replication mode */
- XLogRecPtr repl_write_lsn; /* LSN stats for replication feedback messages */
- XLogRecPtr repl_flush_lsn;
- XLogRecPtr repl_apply_lsn;
- int repl_feedback_pending; /* flag set when we couldn't send the feedback to the server */
- struct timeval repl_last_io; /* timestamp of the last exchange with the server */
PyObject *tuple_factory; /* factory for result tuples */
PyObject *tzinfo_factory; /* factory for tzinfo objects */
@@ -106,7 +97,7 @@ HIDDEN void curs_reset(cursorObject *self);
HIDDEN int psyco_curs_withhold_set(cursorObject *self, PyObject *pyvalue);
HIDDEN int psyco_curs_scrollable_set(cursorObject *self, PyObject *pyvalue);
-RAISES_NEG int psyco_curs_datetime_init(void);
+HIDDEN int psyco_curs_init(PyObject *obj, PyObject *args, PyObject *kwargs);
/* exception-raising macros */
#define EXC_IF_CURS_CLOSED(self) \
@@ -149,22 +140,6 @@ do \
return NULL; } \
while (0)
-#define EXC_IF_REPLICATING(self, cmd) \
-do \
- if ((self)->repl_started) { \
- PyErr_SetString(ProgrammingError, \
- #cmd " cannot be used when replication is already in progress"); \
- return NULL; } \
-while (0)
-
-#define EXC_IF_NOT_REPLICATING(self, cmd) \
-do \
- if (!(self)->repl_started) { \
- PyErr_SetString(ProgrammingError, \
- #cmd " cannot be used when replication is not in progress"); \
- return NULL; } \
-while (0)
-
#ifdef __cplusplus
}
#endif
diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c
index d51f7a5..63bd5a1 100644
--- a/psycopg/cursor_type.c
+++ b/psycopg/cursor_type.c
@@ -28,7 +28,6 @@
#include "psycopg/cursor.h"
#include "psycopg/connection.h"
-#include "psycopg/replication_message.h"
#include "psycopg/green.h"
#include "psycopg/pqpath.h"
#include "psycopg/typecast.h"
@@ -39,9 +38,6 @@
#include <stdlib.h>
-/* python */
-#include "datetime.h"
-
/** DBAPI methods **/
@@ -1583,222 +1579,6 @@ exit:
return res;
}
-#define psyco_curs_start_replication_expert_doc \
-"start_replication_expert(command, writer=None, keepalive_interval=10) -- Start and consume replication stream with direct command."
-
-static PyObject *
-psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
-{
- PyObject *res = NULL;
- char *command;
- static char *kwlist[] = {"command", NULL};
-
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s", kwlist, &command)) {
- return NULL;
- }
-
- EXC_IF_CURS_CLOSED(self);
- EXC_IF_GREEN(start_replication_expert);
- EXC_IF_TPC_PREPARED(self->conn, start_replication_expert);
- EXC_IF_REPLICATING(self, start_replication_expert);
-
- Dprintf("psyco_curs_start_replication_expert: %s", command);
-
- self->copysize = 0;
- self->repl_consuming = 0;
-
- self->repl_write_lsn = InvalidXLogRecPtr;
- self->repl_flush_lsn = InvalidXLogRecPtr;
- self->repl_apply_lsn = InvalidXLogRecPtr;
- self->repl_feedback_pending = 0;
-
- gettimeofday(&self->repl_last_io, NULL);
-
- if (pq_execute(self, command, self->conn->async,
- 1 /* no_result */, 1 /* no_begin */) >= 0) {
- res = Py_None;
- Py_INCREF(res);
-
- self->repl_started = 1;
- }
-
- return res;
-}
-
-#define psyco_curs_consume_replication_stream_doc \
-"consume_replication_stream(consumer, keepalive_interval=10) -- Consume replication stream."
-
-static PyObject *
-psyco_curs_consume_replication_stream(cursorObject *self, PyObject *args, PyObject *kwargs)
-{
- PyObject *consume = NULL, *res = NULL;
- int decode = 0;
- double keepalive_interval = 10;
- static char *kwlist[] = {"consume", "decode", "keepalive_interval", NULL};
-
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist,
- &consume, &decode, &keepalive_interval)) {
- return NULL;
- }
-
- EXC_IF_CURS_CLOSED(self);
- EXC_IF_CURS_ASYNC(self, consume_replication_stream);
- EXC_IF_GREEN(consume_replication_stream);
- EXC_IF_TPC_PREPARED(self->conn, consume_replication_stream);
- EXC_IF_NOT_REPLICATING(self, consume_replication_stream);
-
- if (self->repl_consuming) {
- PyErr_SetString(ProgrammingError,
- "consume_replication_stream cannot be used when already in the consume loop");
- return NULL;
- }
-
- Dprintf("psyco_curs_consume_replication_stream");
-
- if (keepalive_interval < 1.0) {
- psyco_set_error(ProgrammingError, self, "keepalive_interval must be >= 1 (sec)");
- return NULL;
- }
-
- self->repl_consuming = 1;
-
- if (pq_copy_both(self, consume, decode, keepalive_interval) >= 0) {
- res = Py_None;
- Py_INCREF(res);
- }
-
- self->repl_consuming = 0;
-
- return res;
-}
-
-#define psyco_curs_read_replication_message_doc \
-"read_replication_message(decode=True) -- Try reading a replication message from the server (non-blocking)."
-
-static PyObject *
-psyco_curs_read_replication_message(cursorObject *self, PyObject *args, PyObject *kwargs)
-{
- int decode = 1;
- static char *kwlist[] = {"decode", NULL};
-
- EXC_IF_CURS_CLOSED(self);
- EXC_IF_GREEN(read_replication_message);
- EXC_IF_TPC_PREPARED(self->conn, read_replication_message);
- EXC_IF_NOT_REPLICATING(self, read_replication_message);
-
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
- &decode)) {
- return NULL;
- }
-
- return pq_read_replication_message(self, decode);
-}
-
-static PyObject *
-curs_flush_replication_feedback(cursorObject *self, int reply)
-{
- if (!(self->repl_feedback_pending || reply))
- Py_RETURN_TRUE;
-
- if (pq_send_replication_feedback(self, reply)) {
- self->repl_feedback_pending = 0;
- Py_RETURN_TRUE;
- } else {
- self->repl_feedback_pending = 1;
- Py_RETURN_FALSE;
- }
-}
-
-#define psyco_curs_send_replication_feedback_doc \
-"send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) -- Try sending a replication feedback message to the server and optionally request a reply."
-
-static PyObject *
-psyco_curs_send_replication_feedback(cursorObject *self, PyObject *args, PyObject *kwargs)
-{
- XLogRecPtr write_lsn = InvalidXLogRecPtr,
- flush_lsn = InvalidXLogRecPtr,
- apply_lsn = InvalidXLogRecPtr;
- int reply = 0;
- static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL};
-
- EXC_IF_CURS_CLOSED(self);
- EXC_IF_NOT_REPLICATING(self, send_replication_feedback);
-
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist,
- &write_lsn, &flush_lsn, &apply_lsn, &reply)) {
- return NULL;
- }
-
- if (write_lsn > self->repl_write_lsn)
- self->repl_write_lsn = write_lsn;
-
- if (flush_lsn > self->repl_flush_lsn)
- self->repl_flush_lsn = flush_lsn;
-
- if (apply_lsn > self->repl_apply_lsn)
- self->repl_apply_lsn = apply_lsn;
-
- self->repl_feedback_pending = 1;
-
- return curs_flush_replication_feedback(self, reply);
-}
-
-#define psyco_curs_flush_replication_feedback_doc \
-"flush_replication_feedback(reply=False) -- Try flushing the latest pending replication feedback message to the server and optionally request a reply."
-
-static PyObject *
-psyco_curs_flush_replication_feedback(cursorObject *self, PyObject *args, PyObject *kwargs)
-{
- int reply = 0;
- static char *kwlist[] = {"reply", NULL};
-
- EXC_IF_CURS_CLOSED(self);
- EXC_IF_NOT_REPLICATING(self, flush_replication_feedback);
-
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
- &reply)) {
- return NULL;
- }
-
- return curs_flush_replication_feedback(self, reply);
-}
-
-
-RAISES_NEG int
-psyco_curs_datetime_init(void)
-{
- Dprintf("psyco_curs_datetime_init: datetime init");
-
- PyDateTime_IMPORT;
-
- if (!PyDateTimeAPI) {
- PyErr_SetString(PyExc_ImportError, "datetime initialization failed");
- return -1;
- }
- return 0;
-}
-
-#define psyco_curs_replication_io_timestamp_doc \
-"replication_io_timestamp -- the timestamp of latest IO with the server"
-
-static PyObject *
-psyco_curs_get_replication_io_timestamp(cursorObject *self)
-{
- PyObject *tval, *res = NULL;
- double seconds;
-
- EXC_IF_CURS_CLOSED(self);
-
- seconds = self->repl_last_io.tv_sec + self->repl_last_io.tv_usec / 1.0e6;
-
- tval = Py_BuildValue("(d)", seconds);
- if (tval) {
- res = PyDateTime_FromTimestamp(tval);
- Py_DECREF(tval);
- }
- return res;
-}
-
/* extension: closed - return true if cursor is closed */
#define psyco_curs_closed_doc \
@@ -1973,16 +1753,6 @@ static struct PyMethodDef cursorObject_methods[] = {
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_to_doc},
{"copy_expert", (PyCFunction)psyco_curs_copy_expert,
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc},
- {"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert,
- METH_VARARGS|METH_KEYWORDS, psyco_curs_start_replication_expert_doc},
- {"consume_replication_stream", (PyCFunction)psyco_curs_consume_replication_stream,
- METH_VARARGS|METH_KEYWORDS, psyco_curs_consume_replication_stream_doc},
- {"read_replication_message", (PyCFunction)psyco_curs_read_replication_message,
- METH_VARARGS|METH_KEYWORDS, psyco_curs_read_replication_message_doc},
- {"send_replication_feedback", (PyCFunction)psyco_curs_send_replication_feedback,
- METH_VARARGS|METH_KEYWORDS, psyco_curs_send_replication_feedback_doc},
- {"flush_replication_feedback", (PyCFunction)psyco_curs_flush_replication_feedback,
- METH_VARARGS|METH_KEYWORDS, psyco_curs_flush_replication_feedback_doc},
{NULL}
};
@@ -2033,9 +1803,6 @@ static struct PyGetSetDef cursorObject_getsets[] = {
(getter)psyco_curs_scrollable_get,
(setter)psyco_curs_scrollable_set,
psyco_curs_scrollable_doc, NULL },
- { "replication_io_timestamp",
- (getter)psyco_curs_get_replication_io_timestamp, NULL,
- psyco_curs_replication_io_timestamp_doc, NULL },
{NULL}
};
@@ -2134,7 +1901,7 @@ cursor_dealloc(PyObject* obj)
Py_TYPE(obj)->tp_free(obj);
}
-static int
+int
cursor_init(PyObject *obj, PyObject *args, PyObject *kwargs)
{
PyObject *conn;
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index f38fbd3..d688698 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -35,6 +35,7 @@
#include "psycopg/pqpath.h"
#include "psycopg/connection.h"
#include "psycopg/cursor.h"
+#include "psycopg/replication_cursor.h"
#include "psycopg/replication_message.h"
#include "psycopg/green.h"
#include "psycopg/typecast.h"
@@ -1542,19 +1543,23 @@ exit:
are never returned to the caller.
*/
PyObject *
-pq_read_replication_message(cursorObject *curs, int decode)
+pq_read_replication_message(replicationCursorObject *repl, int decode)
{
+ cursorObject *curs = &repl->cur;
+ connectionObject *conn = curs->conn;
+ PGconn *pgconn = conn->pgconn;
char *buffer = NULL;
int len, data_size, consumed, hdr, reply;
XLogRecPtr data_start, wal_end;
pg_int64 send_time;
- PyObject *str = NULL, *msg = NULL;
+ PyObject *str = NULL, *result = NULL;
+ replicationMessageObject *msg = NULL;
Dprintf("pq_read_replication_message(decode=%d)", decode);
consumed = 0;
retry:
- len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */);
+ len = PQgetCopyData(pgconn, &buffer, 1 /* async */);
if (len == 0) {
/* If we've tried reading some data, but there was none, bail out. */
@@ -1566,8 +1571,8 @@ retry:
server we might be reading a number of messages for every single
one we process, thus overgrowing the internal buffer until the
client system runs out of memory. */
- if (!PQconsumeInput(curs->conn->pgconn)) {
- pq_raise(curs->conn, curs, NULL);
+ if (!PQconsumeInput(pgconn)) {
+ pq_raise(conn, curs, NULL);
goto exit;
}
/* But PQconsumeInput() doesn't tell us if it has actually read
@@ -1581,15 +1586,15 @@ retry:
if (len == -2) {
/* serious error */
- pq_raise(curs->conn, curs, NULL);
+ pq_raise(conn, curs, NULL);
goto exit;
}
if (len == -1) {
/* EOF */
- curs->pgres = PQgetResult(curs->conn->pgconn);
+ curs->pgres = PQgetResult(pgconn);
if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) {
- pq_raise(curs->conn, curs, NULL);
+ pq_raise(conn, curs, NULL);
goto exit;
}
@@ -1603,7 +1608,7 @@ retry:
consumed = 1;
/* ok, we did really read something: update the io timestamp */
- gettimeofday(&curs->repl_last_io, NULL);
+ gettimeofday(&repl->last_io, NULL);
Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len);
if (buffer[0] == 'w') {
@@ -1626,21 +1631,22 @@ retry:
/* XXX it would be wise to check if it's really a logical replication */
if (decode) {
- str = PyUnicode_Decode(buffer + hdr, data_size, curs->conn->codec, NULL);
+ str = PyUnicode_Decode(buffer + hdr, data_size, conn->codec, NULL);
} else {
str = Bytes_FromStringAndSize(buffer + hdr, data_size);
}
if (!str) { goto exit; }
- msg = PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType,
- curs, str, NULL);
+ result = PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType,
+ curs, str, NULL);
Py_DECREF(str);
- if (!msg) { goto exit; }
+ if (!result) { goto exit; }
- ((replicationMessageObject *)msg)->data_size = data_size;
- ((replicationMessageObject *)msg)->data_start = data_start;
- ((replicationMessageObject *)msg)->wal_end = wal_end;
- ((replicationMessageObject *)msg)->send_time = send_time;
+ msg = (replicationMessageObject *)result;
+ msg->data_size = data_size;
+ msg->data_start = data_start;
+ msg->wal_end = wal_end;
+ msg->send_time = send_time;
}
else if (buffer[0] == 'k') {
/* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */
@@ -1652,17 +1658,17 @@ retry:
reply = buffer[hdr];
if (reply) {
- if (!pq_send_replication_feedback(curs, 0)) {
- if (curs->conn->async) {
- curs->repl_feedback_pending = 1;
+ if (!pq_send_replication_feedback(repl, 0)) {
+ if (conn->async) {
+ repl->feedback_pending = 1;
} else {
/* XXX not sure if this was a good idea after all */
- pq_raise(curs->conn, curs, NULL);
+ pq_raise(conn, curs, NULL);
goto exit;
}
}
else {
- gettimeofday(&curs->repl_last_io, NULL);
+ gettimeofday(&repl->last_io, NULL);
}
}
@@ -1680,37 +1686,38 @@ exit:
PQfreemem(buffer);
}
- return msg;
+ return result;
none:
- msg = Py_None;
- Py_INCREF(msg);
+ result = Py_None;
+ Py_INCREF(result);
goto exit;
}
int
-pq_send_replication_feedback(cursorObject* curs, int reply_requested)
+pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested)
{
+ cursorObject *curs = &repl->cur;
+ PGconn *pgconn = curs->conn->pgconn;
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0;
Dprintf("pq_send_replication_feedback: write="XLOGFMTSTR", flush="XLOGFMTSTR", apply="XLOGFMTSTR,
- XLOGFMTARGS(curs->repl_write_lsn),
- XLOGFMTARGS(curs->repl_flush_lsn),
- XLOGFMTARGS(curs->repl_apply_lsn));
+ XLOGFMTARGS(repl->write_lsn),
+ XLOGFMTARGS(repl->flush_lsn),
+ XLOGFMTARGS(repl->apply_lsn));
replybuf[len] = 'r'; len += 1;
- fe_sendint64(curs->repl_write_lsn, &replybuf[len]); len += 8;
- fe_sendint64(curs->repl_flush_lsn, &replybuf[len]); len += 8;
- fe_sendint64(curs->repl_apply_lsn, &replybuf[len]); len += 8;
+ fe_sendint64(repl->write_lsn, &replybuf[len]); len += 8;
+ fe_sendint64(repl->flush_lsn, &replybuf[len]); len += 8;
+ fe_sendint64(repl->apply_lsn, &replybuf[len]); len += 8;
fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); len += 8;
replybuf[len] = reply_requested ? 1 : 0; len += 1;
- if (PQputCopyData(curs->conn->pgconn, replybuf, len) <= 0 ||
- PQflush(curs->conn->pgconn) != 0) {
+ if (PQputCopyData(pgconn, replybuf, len) <= 0 || PQflush(pgconn) != 0) {
return 0;
}
- gettimeofday(&curs->repl_last_io, NULL);
+ gettimeofday(&repl->last_io, NULL);
return 1;
}
@@ -1723,12 +1730,15 @@ pq_send_replication_feedback(cursorObject* curs, int reply_requested)
manages to send keepalive messages to the server as needed.
*/
int
-pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive_interval)
+pq_copy_both(replicationCursorObject *repl, PyObject *consume, int decode,
+ double keepalive_interval)
{
+ cursorObject *curs = &repl->cur;
+ connectionObject *conn = curs->conn;
+ PGconn *pgconn = conn->pgconn;
PyObject *msg, *tmp = NULL;
PyObject *consume_func = NULL;
int fd, sel, ret = -1;
- PGconn *pgconn;
fd_set fds;
struct timeval keep_intr, curr_time, ping_time, timeout;
@@ -1738,13 +1748,12 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
}
CLEARPGRES(curs->pgres);
- pgconn = curs->conn->pgconn;
keep_intr.tv_sec = (int)keepalive_interval;
keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6;
while (1) {
- msg = pq_read_replication_message(curs, decode);
+ msg = pq_read_replication_message(repl, decode);
if (!msg) {
goto exit;
}
@@ -1753,7 +1762,7 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
fd = PQsocket(pgconn);
if (fd < 0) {
- pq_raise(curs->conn, curs, NULL);
+ pq_raise(conn, curs, NULL);
goto exit;
}
@@ -1763,7 +1772,7 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
/* how long can we wait before we need to send a keepalive? */
gettimeofday(&curr_time, NULL);
- timeradd(&curs->repl_last_io, &keep_intr, &ping_time);
+ timeradd(&repl->last_io, &keep_intr, &ping_time);
timersub(&ping_time, &curr_time, &timeout);
if (timeout.tv_sec >= 0) {
@@ -1787,8 +1796,8 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
}
if (sel == 0) {
- if (!pq_send_replication_feedback(curs, 0)) {
- pq_raise(curs->conn, curs, NULL);
+ if (!pq_send_replication_feedback(repl, 0)) {
+ pq_raise(conn, curs, NULL);
goto exit;
}
}
@@ -1876,7 +1885,7 @@ pq_fetch(cursorObject *curs, int no_result)
Dprintf("pq_fetch: data from a streaming replication slot (no tuples)");
curs->rowcount = -1;
ex = 0;
- /* nothing to do here: _pq_copy_both_v3 will be called separately */
+ /* nothing to do here: pq_copy_both will be called separately */
CLEARPGRES(curs->pgres);
break;
diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h
index a858a26..568f076 100644
--- a/psycopg/pqpath.h
+++ b/psycopg/pqpath.h
@@ -27,6 +27,7 @@
#define PSYCOPG_PQPATH_H 1
#include "psycopg/cursor.h"
+#include "psycopg/replication_cursor.h"
#include "psycopg/connection.h"
/* macro to clean the pg result */
@@ -72,9 +73,10 @@ HIDDEN int pq_execute_command_locked(connectionObject *conn,
RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres,
char **error);
-HIDDEN int pq_copy_both(cursorObject *curs, PyObject *consumer,
+/* replication protocol support */
+HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer,
int decode, double keepalive_interval);
-HIDDEN PyObject *pq_read_replication_message(cursorObject *curs, int decode);
-HIDDEN int pq_send_replication_feedback(cursorObject *curs, int reply_requested);
+HIDDEN PyObject *pq_read_replication_message(replicationCursorObject *repl, int decode);
+HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested);
#endif /* !defined(PSYCOPG_PQPATH_H) */
diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c
index f9f29a2..25e3259 100644
--- a/psycopg/psycopgmodule.c
+++ b/psycopg/psycopgmodule.c
@@ -28,6 +28,7 @@
#include "psycopg/connection.h"
#include "psycopg/cursor.h"
+#include "psycopg/replication_cursor.h"
#include "psycopg/replication_message.h"
#include "psycopg/green.h"
#include "psycopg/lobject.h"
@@ -917,6 +918,9 @@ INIT_MODULE(_psycopg)(void)
Py_TYPE(&cursorType) = &PyType_Type;
if (PyType_Ready(&cursorType) == -1) goto exit;
+ Py_TYPE(&replicationCursorType) = &PyType_Type;
+ if (PyType_Ready(&replicationCursorType) == -1) goto exit;
+
Py_TYPE(&replicationMessageType) = &PyType_Type;
if (PyType_Ready(&replicationMessageType) == -1) goto exit;
@@ -1000,7 +1004,7 @@ INIT_MODULE(_psycopg)(void)
/* Initialize the PyDateTimeAPI everywhere is used */
PyDateTime_IMPORT;
if (psyco_adapter_datetime_init()) { goto exit; }
- if (psyco_curs_datetime_init()) { goto exit; }
+ if (psyco_repl_curs_datetime_init()) { goto exit; }
if (psyco_replmsg_datetime_init()) { goto exit; }
Py_TYPE(&pydatetimeType) = &PyType_Type;
@@ -1044,7 +1048,8 @@ INIT_MODULE(_psycopg)(void)
/* put new types in module dictionary */
PyModule_AddObject(module, "connection", (PyObject*)&connectionType);
PyModule_AddObject(module, "cursor", (PyObject*)&cursorType);
- PyModule_AddObject(module, "replicationMessage", (PyObject*)&replicationMessageType);
+ PyModule_AddObject(module, "ReplicationCursor", (PyObject*)&replicationCursorType);
+ PyModule_AddObject(module, "ReplicationMessage", (PyObject*)&replicationMessageType);
PyModule_AddObject(module, "ISQLQuote", (PyObject*)&isqlquoteType);
PyModule_AddObject(module, "Notify", (PyObject*)&notifyType);
PyModule_AddObject(module, "Xid", (PyObject*)&xidType);
diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h
new file mode 100644
index 0000000..1b6dbfa
--- /dev/null
+++ b/psycopg/replication_cursor.h
@@ -0,0 +1,77 @@
+/* replication_cursor.h - definition for the psycopg replication cursor type
+ *
+ * Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+ *
+ * This file is part of psycopg.
+ *
+ * psycopg2 is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders give
+ * permission to link this program with the OpenSSL library (or with
+ * modified versions of OpenSSL that use the same license as OpenSSL),
+ * and distribute linked combinations including the two.
+ *
+ * You must obey the GNU Lesser General Public License in all respects for
+ * all of the code used other than OpenSSL.
+ *
+ * psycopg2 is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ */
+
+#ifndef PSYCOPG_REPLICATION_CURSOR_H
+#define PSYCOPG_REPLICATION_CURSOR_H 1
+
+#include "psycopg/cursor.h"
+#include "libpq_support.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern HIDDEN PyTypeObject replicationCursorType;
+
+typedef struct replicationCursorObject {
+ cursorObject cur;
+
+ int started:1; /* if replication is started */
+ int consuming:1; /* if running the consume loop */
+
+ struct timeval last_io; /* timestamp of the last exchange with the server */
+ struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */
+
+ XLogRecPtr write_lsn; /* LSN stats for replication feedback messages */
+ XLogRecPtr flush_lsn;
+ XLogRecPtr apply_lsn;
+ int feedback_pending; /* flag set when we couldn't send the feedback to the server */
+} replicationCursorObject;
+
+
+RAISES_NEG int psyco_repl_curs_datetime_init(void);
+
+/* exception-raising macros */
+#define EXC_IF_REPLICATING(self, cmd) \
+do \
+ if ((self)->started) { \
+ PyErr_SetString(ProgrammingError, \
+ #cmd " cannot be used when replication is already in progress"); \
+ return NULL; } \
+while (0)
+
+#define EXC_IF_NOT_REPLICATING(self, cmd) \
+do \
+ if (!(self)->started) { \
+ PyErr_SetString(ProgrammingError, \
+ #cmd " cannot be used when replication is not in progress"); \
+ return NULL; } \
+while (0)
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* !defined(PSYCOPG_REPLICATION_CURSOR_H) */
diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c
new file mode 100644
index 0000000..d1f7939
--- /dev/null
+++ b/psycopg/replication_cursor_type.c
@@ -0,0 +1,360 @@
+/* replication_cursor_type.c - python interface to replication cursor objects
+ *
+ * Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+ *
+ * This file is part of psycopg.
+ *
+ * psycopg2 is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders give
+ * permission to link this program with the OpenSSL library (or with
+ * modified versions of OpenSSL that use the same license as OpenSSL),
+ * and distribute linked combinations including the two.
+ *
+ * You must obey the GNU Lesser General Public License in all respects for
+ * all of the code used other than OpenSSL.
+ *
+ * psycopg2 is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ * License for more details.
+ */
+
+#define PSYCOPG_MODULE
+#include "psycopg/psycopg.h"
+
+#include "psycopg/replication_cursor.h"
+#include "psycopg/replication_message.h"
+#include "psycopg/green.h"
+#include "psycopg/pqpath.h"
+
+#include <string.h>
+#include <stdlib.h>
+
+/* python */
+#include "datetime.h"
+
+
+#define psyco_repl_curs_start_replication_expert_doc \
+"start_replication_expert(command, writer=None, keepalive_interval=10) -- Start replication stream with a directly given command."
+
+static PyObject *
+psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
+ PyObject *args, PyObject *kwargs)
+{
+ cursorObject *curs = &self->cur;
+ connectionObject *conn = self->cur.conn;
+ PyObject *res = NULL;
+ char *command;
+ static char *kwlist[] = {"command", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s", kwlist, &command)) {
+ return NULL;
+ }
+
+ EXC_IF_CURS_CLOSED(curs);
+ EXC_IF_GREEN(start_replication_expert);
+ EXC_IF_TPC_PREPARED(conn, start_replication_expert);
+ EXC_IF_REPLICATING(self, start_replication_expert);
+
+ Dprintf("psyco_repl_curs_start_replication_expert: %s", command);
+
+ /* self->copysize = 0;*/
+
+ gettimeofday(&self->last_io, NULL);
+
+ if (pq_execute(curs, command, conn->async, 1 /* no_result */, 1 /* no_begin */) >= 0) {
+ res = Py_None;
+ Py_INCREF(res);
+
+ self->started = 1;
+ }
+
+ return res;
+}
+
+#define psyco_repl_curs_consume_stream_doc \
+"consume_stream(consumer, keepalive_interval=10) -- Consume replication stream."
+
+static PyObject *
+psyco_repl_curs_consume_stream(replicationCursorObject *self,
+ PyObject *args, PyObject *kwargs)
+{
+ cursorObject *curs = &self->cur;
+ PyObject *consume = NULL, *res = NULL;
+ int decode = 0;
+ double keepalive_interval = 10;
+ static char *kwlist[] = {"consume", "decode", "keepalive_interval", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist,
+ &consume, &decode, &keepalive_interval)) {
+ return NULL;
+ }
+
+ EXC_IF_CURS_CLOSED(curs);
+ EXC_IF_CURS_ASYNC(curs, consume_stream);
+ EXC_IF_GREEN(consume_stream);
+ EXC_IF_TPC_PREPARED(self->cur.conn, consume_stream);
+ EXC_IF_NOT_REPLICATING(self, consume_stream);
+
+ if (self->consuming) {
+ PyErr_SetString(ProgrammingError,
+ "consume_stream cannot be used when already in the consume loop");
+ return NULL;
+ }
+
+ Dprintf("psyco_repl_curs_consume_stream");
+
+ if (keepalive_interval < 1.0) {
+ psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)");
+ return NULL;
+ }
+
+ self->consuming = 1;
+
+ if (pq_copy_both(self, consume, decode, keepalive_interval) >= 0) {
+ res = Py_None;
+ Py_INCREF(res);
+ }
+
+ self->consuming = 0;
+
+ return res;
+}
+
+#define psyco_repl_curs_read_message_doc \
+"read_message(decode=True) -- Try reading a replication message from the server (non-blocking)."
+
+static PyObject *
+psyco_repl_curs_read_message(replicationCursorObject *self,
+ PyObject *args, PyObject *kwargs)
+{
+ cursorObject *curs = &self->cur;
+ int decode = 1;
+ static char *kwlist[] = {"decode", NULL};
+
+ EXC_IF_CURS_CLOSED(curs);
+ EXC_IF_GREEN(read_message);
+ EXC_IF_TPC_PREPARED(self->cur.conn, read_message);
+ EXC_IF_NOT_REPLICATING(self, read_message);
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
+ &decode)) {
+ return NULL;
+ }
+
+ return pq_read_replication_message(self, decode);
+}
+
+static PyObject *
+repl_curs_flush_feedback(replicationCursorObject *self, int reply)
+{
+ if (!(self->feedback_pending || reply))
+ Py_RETURN_TRUE;
+
+ if (pq_send_replication_feedback(self, reply)) {
+ self->feedback_pending = 0;
+ Py_RETURN_TRUE;
+ } else {
+ self->feedback_pending = 1;
+ Py_RETURN_FALSE;
+ }
+}
+
+#define psyco_repl_curs_send_feedback_doc \
+"send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) -- Try sending a replication feedback message to the server and optionally request a reply."
+
+static PyObject *
+psyco_repl_curs_send_feedback(replicationCursorObject *self,
+ PyObject *args, PyObject *kwargs)
+{
+ cursorObject *curs = &self->cur;
+ XLogRecPtr write_lsn = InvalidXLogRecPtr,
+ flush_lsn = InvalidXLogRecPtr,
+ apply_lsn = InvalidXLogRecPtr;
+ int reply = 0;
+ static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL};
+
+ EXC_IF_CURS_CLOSED(curs);
+ EXC_IF_NOT_REPLICATING(self, send_feedback);
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist,
+ &write_lsn, &flush_lsn, &apply_lsn, &reply)) {
+ return NULL;
+ }
+
+ if (write_lsn > self->write_lsn)
+ self->write_lsn = write_lsn;
+
+ if (flush_lsn > self->flush_lsn)
+ self->flush_lsn = flush_lsn;
+
+ if (apply_lsn > self->apply_lsn)
+ self->apply_lsn = apply_lsn;
+
+ self->feedback_pending = 1;
+
+ return repl_curs_flush_feedback(self, reply);
+}
+
+#define psyco_repl_curs_flush_feedback_doc \
+"flush_feedback(reply=False) -- Try flushing the latest pending replication feedback message to the server and optionally request a reply."
+
+static PyObject *
+psyco_repl_curs_flush_feedback(replicationCursorObject *self,
+ PyObject *args, PyObject *kwargs)
+{
+ cursorObject *curs = &self->cur;
+ int reply = 0;
+ static char *kwlist[] = {"reply", NULL};
+
+ EXC_IF_CURS_CLOSED(curs);
+ EXC_IF_NOT_REPLICATING(self, flush_feedback);
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
+ &reply)) {
+ return NULL;
+ }
+
+ return repl_curs_flush_feedback(self, reply);
+}
+
+
+RAISES_NEG int
+psyco_repl_curs_datetime_init(void)
+{
+ Dprintf("psyco_repl_curs_datetime_init: datetime init");
+
+ PyDateTime_IMPORT;
+
+ if (!PyDateTimeAPI) {
+ PyErr_SetString(PyExc_ImportError, "datetime initialization failed");
+ return -1;
+ }
+ return 0;
+}
+
+#define psyco_repl_curs_io_timestamp_doc \
+"io_timestamp -- the timestamp of latest IO with the server"
+
+static PyObject *
+psyco_repl_curs_get_io_timestamp(replicationCursorObject *self)
+{
+ cursorObject *curs = &self->cur;
+ PyObject *tval, *res = NULL;
+ double seconds;
+
+ EXC_IF_CURS_CLOSED(curs);
+
+ seconds = self->last_io.tv_sec + self->last_io.tv_usec / 1.0e6;
+
+ tval = Py_BuildValue("(d)", seconds);
+ if (tval) {
+ res = PyDateTime_FromTimestamp(tval);
+ Py_DECREF(tval);
+ }
+ return res;
+}
+
+/* object method list */
+
+static struct PyMethodDef replicationCursorObject_methods[] = {
+ {"start_replication_expert", (PyCFunction)psyco_repl_curs_start_replication_expert,
+ METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_start_replication_expert_doc},
+ {"consume_stream", (PyCFunction)psyco_repl_curs_consume_stream,
+ METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_consume_stream_doc},
+ {"read_message", (PyCFunction)psyco_repl_curs_read_message,
+ METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_read_message_doc},
+ {"send_feedback", (PyCFunction)psyco_repl_curs_send_feedback,
+ METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_send_feedback_doc},
+ {"flush_feedback", (PyCFunction)psyco_repl_curs_flush_feedback,
+ METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_flush_feedback_doc},
+ {NULL}
+};
+
+/* object calculated member list */
+
+static struct PyGetSetDef replicationCursorObject_getsets[] = {
+ { "io_timestamp",
+ (getter)psyco_repl_curs_get_io_timestamp, NULL,
+ psyco_repl_curs_io_timestamp_doc, NULL },
+ {NULL}
+};
+
+static int
+replicationCursor_setup(replicationCursorObject* self)
+{
+ self->started = 0;
+ self->consuming = 0;
+
+ self->write_lsn = InvalidXLogRecPtr;
+ self->flush_lsn = InvalidXLogRecPtr;
+ self->apply_lsn = InvalidXLogRecPtr;
+ self->feedback_pending = 0;
+
+ return 0;
+}
+
+static int
+replicationCursor_init(PyObject *obj, PyObject *args, PyObject *kwargs)
+{
+ replicationCursor_setup((replicationCursorObject *)obj);
+ return cursor_init(obj, args, kwargs);
+}
+
+static PyObject *
+replicationCursor_repr(replicationCursorObject *self)
+{
+ return PyString_FromFormat(
+ "<ReplicationCursor object at %p; closed: %d>", self, self->cur.closed);
+}
+
+
+/* object type */
+
+#define replicationCursorType_doc \
+"A database replication cursor."
+
+PyTypeObject replicationCursorType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ "psycopg2.extensions.ReplicationCursor",
+ sizeof(replicationCursorObject), 0,
+ 0, /*tp_dealloc*/
+ 0, /*tp_print*/
+ 0, /*tp_getattr*/
+ 0, /*tp_setattr*/
+ 0, /*tp_compare*/
+ (reprfunc)replicationCursor_repr, /*tp_repr*/
+ 0, /*tp_as_number*/
+ 0, /*tp_as_sequence*/
+ 0, /*tp_as_mapping*/
+ 0, /*tp_hash*/
+ 0, /*tp_call*/
+ (reprfunc)replicationCursor_repr, /*tp_str*/
+ 0, /*tp_getattro*/
+ 0, /*tp_setattro*/
+ 0, /*tp_as_buffer*/
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER |
+ Py_TPFLAGS_HAVE_GC, /*tp_flags*/
+ replicationCursorType_doc, /*tp_doc*/
+ 0, /*tp_traverse*/
+ 0, /*tp_clear*/
+ 0, /*tp_richcompare*/
+ 0, /*tp_weaklistoffset*/
+ 0, /*tp_iter*/
+ 0, /*tp_iternext*/
+ replicationCursorObject_methods, /*tp_methods*/
+ 0, /*tp_members*/
+ replicationCursorObject_getsets, /*tp_getset*/
+ &cursorType, /*tp_base*/
+ 0, /*tp_dict*/
+ 0, /*tp_descr_get*/
+ 0, /*tp_descr_set*/
+ 0, /*tp_dictoffset*/
+ replicationCursor_init, /*tp_init*/
+ 0, /*tp_alloc*/
+ 0, /*tp_new*/
+};
diff --git a/psycopg/replication_message_type.c b/psycopg/replication_message_type.c
index 6183393..d4b0457 100644
--- a/psycopg/replication_message_type.c
+++ b/psycopg/replication_message_type.c
@@ -49,7 +49,7 @@ static PyObject *
replmsg_repr(replicationMessageObject *self)
{
return PyString_FromFormat(
- "<replicationMessage object at %p; data_size: %d; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR"; send_time: %lld>",
+ "<ReplicationMessage object at %p; data_size: %d; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR"; send_time: %lld>",
self, self->data_size, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end),
self->send_time);
}