summaryrefslogtreecommitdiff
path: root/psycopg/pqpath.c
diff options
context:
space:
mode:
authorOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-10-23 18:30:18 +0200
committerOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-10-23 18:30:18 +0200
commit8b79bf43ace9b7d09f16b4c829c96a6c1784dacf (patch)
tree6ff38c937c45f020ba33050db362d0417bce5160 /psycopg/pqpath.c
parentdd6bcbd04fc9714ac87b827af12647590ef131a1 (diff)
downloadpsycopg2-8b79bf43ace9b7d09f16b4c829c96a6c1784dacf.tar.gz
Drop ReplicationCursor.flush_feedback(), rectify pq_*_replication_*() interface.
Diffstat (limited to 'psycopg/pqpath.c')
-rw-r--r--psycopg/pqpath.c69
1 files changed, 29 insertions, 40 deletions
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index 424ed90..6315417 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -1542,8 +1542,8 @@ exit:
Any keepalive messages from the server are silently consumed and
are never returned to the caller.
*/
-PyObject *
-pq_read_replication_message(replicationCursorObject *repl)
+int
+pq_read_replication_message(replicationCursorObject *repl, replicationMessageObject **msg)
{
cursorObject *curs = &repl->cur;
connectionObject *conn = curs->conn;
@@ -1553,18 +1553,21 @@ pq_read_replication_message(replicationCursorObject *repl)
XLogRecPtr data_start, wal_end;
pg_int64 send_time;
PyObject *str = NULL, *result = NULL;
- replicationMessageObject *msg = NULL;
+ int ret = -1;
Dprintf("pq_read_replication_message");
+ *msg = NULL;
consumed = 0;
+
retry:
len = PQgetCopyData(pgconn, &buffer, 1 /* async */);
if (len == 0) {
/* If we've tried reading some data, but there was none, bail out. */
if (consumed) {
- goto none;
+ ret = 0;
+ goto exit;
}
/* We should only try reading more data when there is nothing
available at the moment. Otherwise, with a really highly loaded
@@ -1599,7 +1602,8 @@ retry:
}
CLEARPGRES(curs->pgres);
- goto none;
+ ret = 0;
+ goto exit;
}
/* It also makes sense to set this flag here to make us return early in
@@ -1641,11 +1645,11 @@ retry:
Py_DECREF(str);
if (!result) { goto exit; }
- msg = (replicationMessageObject *)result;
- msg->data_size = data_size;
- msg->data_start = data_start;
- msg->wal_end = wal_end;
- msg->send_time = send_time;
+ *msg = (replicationMessageObject *)result;
+ (*msg)->data_size = data_size;
+ (*msg)->data_start = data_start;
+ (*msg)->wal_end = wal_end;
+ (*msg)->send_time = send_time;
}
else if (buffer[0] == 'k') {
/* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */
@@ -1656,19 +1660,8 @@ retry:
}
reply = buffer[hdr];
- if (reply) {
- if (!pq_send_replication_feedback(repl, 0)) {
- if (conn->async) {
- repl->feedback_pending = 1;
- } else {
- /* XXX not sure if this was a good idea after all */
- pq_raise(conn, curs, NULL);
- goto exit;
- }
- }
- else {
- gettimeofday(&repl->last_io, NULL);
- }
+ if (reply && pq_send_replication_feedback(repl, 0) < 0) {
+ goto exit;
}
PQfreemem(buffer);
@@ -1680,24 +1673,22 @@ retry:
goto exit;
}
+ ret = 0;
+
exit:
if (buffer) {
PQfreemem(buffer);
}
- return result;
-
-none:
- result = Py_None;
- Py_INCREF(result);
- goto exit;
+ return ret;
}
int
pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested)
{
cursorObject *curs = &repl->cur;
- PGconn *pgconn = curs->conn->pgconn;
+ connectionObject *conn = curs->conn;
+ PGconn *pgconn = conn->pgconn;
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0;
@@ -1714,11 +1705,12 @@ pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested)
replybuf[len] = reply_requested ? 1 : 0; len += 1;
if (PQputCopyData(pgconn, replybuf, len) <= 0 || PQflush(pgconn) != 0) {
- return 0;
+ pq_raise(conn, curs, NULL);
+ return -1;
}
gettimeofday(&repl->last_io, NULL);
- return 1;
+ return 0;
}
/* Calls pq_read_replication_message in an endless loop, until
@@ -1734,7 +1726,8 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_
cursorObject *curs = &repl->cur;
connectionObject *conn = curs->conn;
PGconn *pgconn = conn->pgconn;
- PyObject *msg, *tmp = NULL;
+ replicationMessageObject *msg = NULL;
+ PyObject *tmp = NULL;
int fd, sel, ret = -1;
fd_set fds;
struct timeval keep_intr, curr_time, ping_time, timeout;
@@ -1750,13 +1743,10 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_
keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6;
while (1) {
- msg = pq_read_replication_message(repl);
- if (!msg) {
+ if (pq_read_replication_message(repl, &msg) < 0) {
goto exit;
}
- else if (msg == Py_None) {
- Py_DECREF(msg);
-
+ else if (msg == NULL) {
fd = PQsocket(pgconn);
if (fd < 0) {
pq_raise(conn, curs, NULL);
@@ -1793,8 +1783,7 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_
}
if (sel == 0) {
- if (!pq_send_replication_feedback(repl, 0)) {
- pq_raise(conn, curs, NULL);
+ if (pq_send_replication_feedback(repl, 0) < 0) {
goto exit;
}
}