summaryrefslogtreecommitdiff
path: root/psycopg/pqpath.c
diff options
context:
space:
mode:
authorOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-06-10 09:06:08 +0200
committerOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-06-30 10:38:18 +0200
commit61e52ce8793472ff1348ab93ccdeb682a1e7b3df (patch)
tree4ddf3fd948a0b250af06a02c4eea760d1f8c1b18 /psycopg/pqpath.c
parent9ed90b1216828351ccbd9e9e28951bf7933fb1b3 (diff)
downloadpsycopg2-61e52ce8793472ff1348ab93ccdeb682a1e7b3df.tar.gz
Rework replication protocol
This change exposes lower level functions for operating the (logical) replication protocol, while keeping the high-level start_replication function that does all the job for you in case of a synchronous connection. A number of other changes and fixes are put into this commit.
Diffstat (limited to 'psycopg/pqpath.c')
-rw-r--r--psycopg/pqpath.c362
1 files changed, 179 insertions, 183 deletions
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index 7ce06a8..03d928c 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -1062,6 +1062,9 @@ pq_get_last_result(connectionObject *conn)
PQclear(result);
}
result = res;
+ if (PQresultStatus(result) == PGRES_COPY_BOTH) {
+ break;
+ }
}
return result;
@@ -1522,32 +1525,151 @@ exit:
return ret;
}
-static int
-sendFeedback(PGconn *conn, XLogRecPtr written_lsn, XLogRecPtr fsync_lsn,
- int replyRequested)
+/* ignores keepalive messages */
+PyObject *
+pq_read_replication_message(cursorObject *curs, int decode)
+{
+ char *buffer = NULL;
+ int len, hdr, reply;
+ XLogRecPtr data_start, wal_end;
+ pg_int64 send_time;
+ PyObject *str = NULL, *msg = NULL;
+
+ Dprintf("pq_read_replication_message(decode=%d)", decode);
+
+retry:
+ if (!PQconsumeInput(curs->conn->pgconn)) {
+ goto none;
+ }
+
+ Py_BEGIN_ALLOW_THREADS;
+ len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */);
+ Py_END_ALLOW_THREADS;
+ if (len == 0) {
+ goto none;
+ }
+
+ if (len == -2) {
+ pq_raise(curs->conn, curs, NULL);
+ goto exit;
+ }
+ if (len == -1) {
+ curs->pgres = PQgetResult(curs->conn->pgconn);
+
+ if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) {
+ pq_raise(curs->conn, curs, NULL);
+ goto exit;
+ }
+
+ CLEARPGRES(curs->pgres);
+ goto none;
+ }
+
+ /* ok, we did really read something: update the io timestamp */
+ gettimeofday(&curs->repl_last_io, NULL);
+
+ Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len);
+ if (buffer[0] == 'w') {
+ /* msgtype(1), dataStart(8), walEnd(8), sendTime(8) */
+ hdr = 1 + 8 + 8 + 8;
+ if (len < hdr + 1) {
+ psyco_set_error(OperationalError, curs, "data message header too small");
+ goto exit;
+ }
+
+ data_start = fe_recvint64(buffer + 1);
+ wal_end = fe_recvint64(buffer + 1 + 8);
+ send_time = fe_recvint64(buffer + 1 + 8 + 8);
+
+ Dprintf("pq_read_replication_message: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR,
+ XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end));
+
+ Dprintf("pq_read_replication_message: >>%.*s<<", len - hdr, buffer + hdr);
+
+ if (decode) {
+ str = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL);
+ } else {
+ str = Bytes_FromStringAndSize(buffer + hdr, len - hdr);
+ }
+ if (!str) { goto exit; }
+
+ msg = PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType,
+ curs, str, NULL);
+ Py_DECREF(str);
+ if (!msg) { goto exit; }
+
+ ((replicationMessageObject *)msg)->data_start = data_start;
+ ((replicationMessageObject *)msg)->wal_end = wal_end;
+ ((replicationMessageObject *)msg)->send_time = send_time;
+ }
+ else if (buffer[0] == 'k') {
+ /* msgtype(1), walEnd(8), sendTime(8), reply(1) */
+ hdr = 1 + 8 + 8;
+ if (len < hdr + 1) {
+ psyco_set_error(OperationalError, curs, "keepalive message header too small");
+ goto exit;
+ }
+
+ reply = buffer[hdr];
+ if (reply) {
+ if (!pq_send_replication_feedback(curs, 0)) {
+ if (curs->conn->async) {
+ curs->repl_feedback_pending = 1;
+ } else {
+ pq_raise(curs->conn, curs, NULL);
+ goto exit;
+ }
+ }
+ else {
+ gettimeofday(&curs->repl_last_io, NULL);
+ }
+ }
+
+ PQfreemem(buffer);
+ buffer = NULL;
+ goto retry;
+ }
+ else {
+ psyco_set_error(OperationalError, curs, "unrecognized replication message type");
+ goto exit;
+ }
+
+exit:
+ if (buffer) {
+ PQfreemem(buffer);
+ }
+
+ return msg;
+
+none:
+ msg = Py_None;
+ Py_INCREF(msg);
+ goto exit;
+}
+
+int
+pq_send_replication_feedback(cursorObject* curs, int reply_requested)
{
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0;
- Dprintf("_pq_copy_both_v3: confirming write up to "XLOGFMTSTR", flush to "XLOGFMTSTR,
- XLOGFMTARGS(written_lsn), XLOGFMTARGS(fsync_lsn));
-
- replybuf[len] = 'r';
- len += 1;
- fe_sendint64(written_lsn, &replybuf[len]); /* write */
- len += 8;
- fe_sendint64(fsync_lsn, &replybuf[len]); /* flush */
- len += 8;
- fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
- len += 8;
- fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); /* sendTime */
- len += 8;
- replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
- len += 1;
-
- if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) {
+ Dprintf("pq_send_replication_feedback: write="XLOGFMTSTR", flush="XLOGFMTSTR", apply="XLOGFMTSTR,
+ XLOGFMTARGS(curs->repl_write_lsn),
+ XLOGFMTARGS(curs->repl_flush_lsn),
+ XLOGFMTARGS(curs->repl_apply_lsn));
+
+ replybuf[len] = 'r'; len += 1;
+ fe_sendint64(curs->repl_write_lsn, &replybuf[len]); len += 8;
+ fe_sendint64(curs->repl_flush_lsn, &replybuf[len]); len += 8;
+ fe_sendint64(curs->repl_apply_lsn, &replybuf[len]); len += 8;
+ fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); len += 8;
+ replybuf[len] = reply_requested ? 1 : 0; len += 1;
+
+ if (PQputCopyData(curs->conn->pgconn, replybuf, len) <= 0 ||
+ PQflush(curs->conn->pgconn) != 0) {
return 0;
}
+ gettimeofday(&curs->repl_last_io, NULL);
return 1;
}
@@ -1556,33 +1678,19 @@ sendFeedback(PGconn *conn, XLogRecPtr written_lsn, XLogRecPtr fsync_lsn,
static int
_pq_copy_both_v3(cursorObject *curs)
{
- PyObject *tmp = NULL;
+ PyObject *msg, *tmp = NULL;
PyObject *write_func = NULL;
- PyObject *obj = NULL;
- replicationMessageObject *msg = NULL;
int ret = -1;
int is_text;
- PGconn *conn;
- char *buffer = NULL;
+ PGconn *pgconn;
fd_set fds;
- struct timeval last_comm, curr_time, ping_time, time_diff;
- int len, hdr, reply, sel;
-
- XLogRecPtr written_lsn = InvalidXLogRecPtr,
- fsync_lsn = InvalidXLogRecPtr,
- data_start, wal_end;
- pg_int64 send_time;
+ struct timeval curr_time, ping_time, time_diff;
+ int sel;
if (!curs->copyfile) {
- PyErr_SetString(ProgrammingError,
- "can't execute START_REPLICATION: use the start_replication() method instead");
- goto exit;
- }
-
- if (curs->keepalive_interval <= 0) {
- PyErr_Format(PyExc_RuntimeError, "keepalive_interval must be > 0: %d",
- curs->keepalive_interval);
+ psyco_set_error(ProgrammingError, curs,
+ "can't execute START_REPLICATION directly: use the start_replication() method instead");
goto exit;
}
@@ -1597,31 +1705,29 @@ _pq_copy_both_v3(cursorObject *curs)
}
CLEARPGRES(curs->pgres);
-
- /* timestamp of last communication with the server */
- gettimeofday(&last_comm, NULL);
-
- conn = curs->conn->pgconn;
+ pgconn = curs->conn->pgconn;
while (1) {
- len = PQgetCopyData(conn, &buffer, 1 /* async! */);
- if (len < 0) {
- break;
+ msg = pq_read_replication_message(curs, is_text);
+ if (!msg) {
+ goto exit;
}
- if (len == 0) {
+ else if (msg == Py_None) {
+ Py_DECREF(msg);
+
FD_ZERO(&fds);
- FD_SET(PQsocket(conn), &fds);
+ FD_SET(PQsocket(pgconn), &fds);
- /* set up timeout according to keepalive_interval, but no less than 1 second */
gettimeofday(&curr_time, NULL);
- ping_time = last_comm;
- ping_time.tv_sec += curs->keepalive_interval;
+ ping_time = curs->repl_last_io;
+ ping_time.tv_sec += curs->repl_keepalive_interval.tv_sec;
+ ping_time.tv_usec += curs->repl_keepalive_interval.tv_usec;
timersub(&ping_time, &curr_time, &time_diff);
if (time_diff.tv_sec > 0) {
Py_BEGIN_ALLOW_THREADS;
- sel = select(PQsocket(conn) + 1, &fds, NULL, NULL, &time_diff);
+ sel = select(PQsocket(pgconn) + 1, &fds, NULL, NULL, &time_diff);
Py_END_ALLOW_THREADS;
}
else {
@@ -1639,148 +1745,34 @@ _pq_copy_both_v3(cursorObject *curs)
continue;
}
- if (sel > 0) {
- if (!PQconsumeInput(conn)) {
- Dprintf("_pq_copy_both_v3: PQconsumeInput failed");
+ if (sel == 0) {
+ if (!pq_send_replication_feedback(curs, 0)) {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
}
- else { /* timeout */
- if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) {
- pq_raise(curs->conn, curs, NULL);
- goto exit;
- }
- }
- gettimeofday(&last_comm, NULL);
continue;
}
- if (len > 0 && buffer) {
- gettimeofday(&last_comm, NULL);
-
- Dprintf("_pq_copy_both_v3: msg=%c, len=%d", buffer[0], len);
- if (buffer[0] == 'w') {
- /* msgtype(1), dataStart(8), walEnd(8), sendTime(8) */
- hdr = 1 + 8 + 8 + 8;
- if (len < hdr + 1) {
- PyErr_Format(PyExc_RuntimeError,
- "streaming header too small in data message: %d", len);
- goto exit;
- }
-
- data_start = fe_recvint64(buffer + 1);
- wal_end = fe_recvint64(buffer + 1 + 8);
- send_time = fe_recvint64(buffer + 1 + 8 + 8);
-
- Dprintf("_pq_copy_both_v3: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR", send_time=%lld",
- XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end), send_time);
-
- if (is_text) {
- obj = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL);
- }
- else {
- obj = Bytes_FromStringAndSize(buffer + hdr, len - hdr);
- }
- if (!obj) { goto exit; }
-
- msg = (replicationMessageObject *)
- PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType,
- curs, obj, NULL);
- Py_DECREF(obj);
- if (!msg) { goto exit; }
-
- msg->data_start = data_start;
- msg->wal_end = wal_end;
- msg->send_time = send_time;
-
- tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL);
-
- if (tmp == NULL) {
- Dprintf("_pq_copy_both_v3: write_func returned NULL");
- goto exit;
- }
- Py_DECREF(tmp);
-
- /* update the LSN position we've written up to */
- if (written_lsn < wal_end)
- written_lsn = wal_end;
-
- /* if requested by sync_server(msg), we confirm LSN with the server */
- if (curs->repl_sync_lsn != InvalidXLogRecPtr) {
- Dprintf("_pq_copy_both_v3: server sync requested at "XLOGFMTSTR,
- XLOGFMTARGS(curs->repl_sync_lsn));
-
- if (fsync_lsn < curs->repl_sync_lsn)
- fsync_lsn = curs->repl_sync_lsn;
-
- curs->repl_sync_lsn = InvalidXLogRecPtr;
-
- if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) {
- pq_raise(curs->conn, curs, NULL);
- goto exit;
- }
- gettimeofday(&last_comm, NULL);
- }
-
- if (curs->stop_replication) {
- Dprintf("_pq_copy_both_v3: stop_replication flag set by write_func");
- break;
- }
-
- Py_DECREF(msg);
- msg = NULL;
- }
- else if (buffer[0] == 'k') {
- /* msgtype(1), walEnd(8), sendTime(8), reply(1) */
- hdr = 1 + 8 + 8;
- if (len < hdr + 1) {
- PyErr_Format(PyExc_RuntimeError,
- "streaming header too small in keepalive message: %d", len);
- goto exit;
- }
+ else {
+ tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL);
+ Py_DECREF(msg);
- reply = buffer[hdr];
- if (reply) {
- if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) {
- pq_raise(curs->conn, curs, NULL);
- goto exit;
- }
- gettimeofday(&last_comm, NULL);
- }
- }
- else {
- PyErr_Format(PyExc_RuntimeError,
- "unrecognized streaming message type: \"%c\"", buffer[0]);
+ if (tmp == NULL) {
+ Dprintf("_pq_copy_both_v3: write_func returned NULL");
goto exit;
}
+ Py_DECREF(tmp);
- /* buffer is allocated on every PQgetCopyData() call */
- PQfreemem(buffer);
- buffer = NULL;
+ if (curs->repl_stop) {
+ Dprintf("_pq_copy_both_v3: repl_stop flag set by write_func");
+ break;
+ }
}
}
- if (len == -2) {
- pq_raise(curs->conn, curs, NULL);
- goto exit;
- }
- if (len == -1) {
- curs->pgres = PQgetResult(curs->conn->pgconn);
-
- if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
- pq_raise(curs->conn, curs, NULL);
-
- CLEARPGRES(curs->pgres);
- }
-
ret = 1;
exit:
- if (buffer) {
- PQfreemem(buffer);
- }
-
- Py_XDECREF(msg);
Py_XDECREF(write_func);
return ret;
}
@@ -1847,9 +1839,13 @@ pq_fetch(cursorObject *curs, int no_result)
case PGRES_COPY_BOTH:
Dprintf("pq_fetch: data from a streaming replication slot (no tuples)");
curs->rowcount = -1;
- ex = _pq_copy_both_v3(curs);
- /* error caught by out glorious notice handler */
- if (PyErr_Occurred()) ex = -1;
+ if (curs->conn->async) {
+ ex = 0;
+ } else {
+ ex = _pq_copy_both_v3(curs);
+ /* error caught by out glorious notice handler */
+ if (PyErr_Occurred()) ex = -1;
+ }
CLEARPGRES(curs->pgres);
break;