diff options
author | Oleksandr Shulgin <oleksandr.shulgin@zalando.de> | 2015-10-23 18:30:18 +0200 |
---|---|---|
committer | Oleksandr Shulgin <oleksandr.shulgin@zalando.de> | 2015-10-23 18:30:18 +0200 |
commit | 8b79bf43ace9b7d09f16b4c829c96a6c1784dacf (patch) | |
tree | 6ff38c937c45f020ba33050db362d0417bce5160 /psycopg/pqpath.c | |
parent | dd6bcbd04fc9714ac87b827af12647590ef131a1 (diff) | |
download | psycopg2-8b79bf43ace9b7d09f16b4c829c96a6c1784dacf.tar.gz |
Drop ReplicationCursor.flush_feedback(), rectify pq_*_replication_*() interface.
Diffstat (limited to 'psycopg/pqpath.c')
-rw-r--r-- | psycopg/pqpath.c | 69 |
1 files changed, 29 insertions, 40 deletions
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 424ed90..6315417 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1542,8 +1542,8 @@ exit: Any keepalive messages from the server are silently consumed and are never returned to the caller. */ -PyObject * -pq_read_replication_message(replicationCursorObject *repl) +int +pq_read_replication_message(replicationCursorObject *repl, replicationMessageObject **msg) { cursorObject *curs = &repl->cur; connectionObject *conn = curs->conn; @@ -1553,18 +1553,21 @@ pq_read_replication_message(replicationCursorObject *repl) XLogRecPtr data_start, wal_end; pg_int64 send_time; PyObject *str = NULL, *result = NULL; - replicationMessageObject *msg = NULL; + int ret = -1; Dprintf("pq_read_replication_message"); + *msg = NULL; consumed = 0; + retry: len = PQgetCopyData(pgconn, &buffer, 1 /* async */); if (len == 0) { /* If we've tried reading some data, but there was none, bail out. */ if (consumed) { - goto none; + ret = 0; + goto exit; } /* We should only try reading more data when there is nothing available at the moment. Otherwise, with a really highly loaded @@ -1599,7 +1602,8 @@ retry: } CLEARPGRES(curs->pgres); - goto none; + ret = 0; + goto exit; } /* It also makes sense to set this flag here to make us return early in @@ -1641,11 +1645,11 @@ retry: Py_DECREF(str); if (!result) { goto exit; } - msg = (replicationMessageObject *)result; - msg->data_size = data_size; - msg->data_start = data_start; - msg->wal_end = wal_end; - msg->send_time = send_time; + *msg = (replicationMessageObject *)result; + (*msg)->data_size = data_size; + (*msg)->data_start = data_start; + (*msg)->wal_end = wal_end; + (*msg)->send_time = send_time; } else if (buffer[0] == 'k') { /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */ @@ -1656,19 +1660,8 @@ retry: } reply = buffer[hdr]; - if (reply) { - if (!pq_send_replication_feedback(repl, 0)) { - if (conn->async) { - repl->feedback_pending = 1; - } else { - /* XXX not sure if this was a good idea after all */ - pq_raise(conn, curs, NULL); - goto exit; - } - } - else { - gettimeofday(&repl->last_io, NULL); - } + if (reply && pq_send_replication_feedback(repl, 0) < 0) { + goto exit; } PQfreemem(buffer); @@ -1680,24 +1673,22 @@ retry: goto exit; } + ret = 0; + exit: if (buffer) { PQfreemem(buffer); } - return result; - -none: - result = Py_None; - Py_INCREF(result); - goto exit; + return ret; } int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested) { cursorObject *curs = &repl->cur; - PGconn *pgconn = curs->conn->pgconn; + connectionObject *conn = curs->conn; + PGconn *pgconn = conn->pgconn; char replybuf[1 + 8 + 8 + 8 + 8 + 1]; int len = 0; @@ -1714,11 +1705,12 @@ pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested) replybuf[len] = reply_requested ? 1 : 0; len += 1; if (PQputCopyData(pgconn, replybuf, len) <= 0 || PQflush(pgconn) != 0) { - return 0; + pq_raise(conn, curs, NULL); + return -1; } gettimeofday(&repl->last_io, NULL); - return 1; + return 0; } /* Calls pq_read_replication_message in an endless loop, until @@ -1734,7 +1726,8 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_ cursorObject *curs = &repl->cur; connectionObject *conn = curs->conn; PGconn *pgconn = conn->pgconn; - PyObject *msg, *tmp = NULL; + replicationMessageObject *msg = NULL; + PyObject *tmp = NULL; int fd, sel, ret = -1; fd_set fds; struct timeval keep_intr, curr_time, ping_time, timeout; @@ -1750,13 +1743,10 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_ keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6; while (1) { - msg = pq_read_replication_message(repl); - if (!msg) { + if (pq_read_replication_message(repl, &msg) < 0) { goto exit; } - else if (msg == Py_None) { - Py_DECREF(msg); - + else if (msg == NULL) { fd = PQsocket(pgconn); if (fd < 0) { pq_raise(conn, curs, NULL); @@ -1793,8 +1783,7 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_ } if (sel == 0) { - if (!pq_send_replication_feedback(repl, 0)) { - pq_raise(conn, curs, NULL); + if (pq_send_replication_feedback(repl, 0) < 0) { goto exit; } } |