diff options
author | Oleksandr Shulgin <oleksandr.shulgin@zalando.de> | 2015-06-10 09:06:08 +0200 |
---|---|---|
committer | Oleksandr Shulgin <oleksandr.shulgin@zalando.de> | 2015-06-30 10:38:18 +0200 |
commit | 61e52ce8793472ff1348ab93ccdeb682a1e7b3df (patch) | |
tree | 4ddf3fd948a0b250af06a02c4eea760d1f8c1b18 /psycopg/pqpath.c | |
parent | 9ed90b1216828351ccbd9e9e28951bf7933fb1b3 (diff) | |
download | psycopg2-61e52ce8793472ff1348ab93ccdeb682a1e7b3df.tar.gz |
Rework replication protocol
This change exposes lower level functions for operating the
(logical) replication protocol, while keeping the high-level
start_replication function that does all the job for you in
case of a synchronous connection.
A number of other changes and fixes are put into this commit.
Diffstat (limited to 'psycopg/pqpath.c')
-rw-r--r-- | psycopg/pqpath.c | 362 |
1 files changed, 179 insertions, 183 deletions
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 7ce06a8..03d928c 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1062,6 +1062,9 @@ pq_get_last_result(connectionObject *conn) PQclear(result); } result = res; + if (PQresultStatus(result) == PGRES_COPY_BOTH) { + break; + } } return result; @@ -1522,32 +1525,151 @@ exit: return ret; } -static int -sendFeedback(PGconn *conn, XLogRecPtr written_lsn, XLogRecPtr fsync_lsn, - int replyRequested) +/* ignores keepalive messages */ +PyObject * +pq_read_replication_message(cursorObject *curs, int decode) +{ + char *buffer = NULL; + int len, hdr, reply; + XLogRecPtr data_start, wal_end; + pg_int64 send_time; + PyObject *str = NULL, *msg = NULL; + + Dprintf("pq_read_replication_message(decode=%d)", decode); + +retry: + if (!PQconsumeInput(curs->conn->pgconn)) { + goto none; + } + + Py_BEGIN_ALLOW_THREADS; + len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */); + Py_END_ALLOW_THREADS; + if (len == 0) { + goto none; + } + + if (len == -2) { + pq_raise(curs->conn, curs, NULL); + goto exit; + } + if (len == -1) { + curs->pgres = PQgetResult(curs->conn->pgconn); + + if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) { + pq_raise(curs->conn, curs, NULL); + goto exit; + } + + CLEARPGRES(curs->pgres); + goto none; + } + + /* ok, we did really read something: update the io timestamp */ + gettimeofday(&curs->repl_last_io, NULL); + + Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len); + if (buffer[0] == 'w') { + /* msgtype(1), dataStart(8), walEnd(8), sendTime(8) */ + hdr = 1 + 8 + 8 + 8; + if (len < hdr + 1) { + psyco_set_error(OperationalError, curs, "data message header too small"); + goto exit; + } + + data_start = fe_recvint64(buffer + 1); + wal_end = fe_recvint64(buffer + 1 + 8); + send_time = fe_recvint64(buffer + 1 + 8 + 8); + + Dprintf("pq_read_replication_message: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR, + XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end)); + + Dprintf("pq_read_replication_message: >>%.*s<<", len - hdr, buffer + hdr); + + if (decode) { + str = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL); + } else { + str = Bytes_FromStringAndSize(buffer + hdr, len - hdr); + } + if (!str) { goto exit; } + + msg = PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType, + curs, str, NULL); + Py_DECREF(str); + if (!msg) { goto exit; } + + ((replicationMessageObject *)msg)->data_start = data_start; + ((replicationMessageObject *)msg)->wal_end = wal_end; + ((replicationMessageObject *)msg)->send_time = send_time; + } + else if (buffer[0] == 'k') { + /* msgtype(1), walEnd(8), sendTime(8), reply(1) */ + hdr = 1 + 8 + 8; + if (len < hdr + 1) { + psyco_set_error(OperationalError, curs, "keepalive message header too small"); + goto exit; + } + + reply = buffer[hdr]; + if (reply) { + if (!pq_send_replication_feedback(curs, 0)) { + if (curs->conn->async) { + curs->repl_feedback_pending = 1; + } else { + pq_raise(curs->conn, curs, NULL); + goto exit; + } + } + else { + gettimeofday(&curs->repl_last_io, NULL); + } + } + + PQfreemem(buffer); + buffer = NULL; + goto retry; + } + else { + psyco_set_error(OperationalError, curs, "unrecognized replication message type"); + goto exit; + } + +exit: + if (buffer) { + PQfreemem(buffer); + } + + return msg; + +none: + msg = Py_None; + Py_INCREF(msg); + goto exit; +} + +int +pq_send_replication_feedback(cursorObject* curs, int reply_requested) { char replybuf[1 + 8 + 8 + 8 + 8 + 1]; int len = 0; - Dprintf("_pq_copy_both_v3: confirming write up to "XLOGFMTSTR", flush to "XLOGFMTSTR, - XLOGFMTARGS(written_lsn), XLOGFMTARGS(fsync_lsn)); - - replybuf[len] = 'r'; - len += 1; - fe_sendint64(written_lsn, &replybuf[len]); /* write */ - len += 8; - fe_sendint64(fsync_lsn, &replybuf[len]); /* flush */ - len += 8; - fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ - len += 8; - fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); /* sendTime */ - len += 8; - replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ - len += 1; - - if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) { + Dprintf("pq_send_replication_feedback: write="XLOGFMTSTR", flush="XLOGFMTSTR", apply="XLOGFMTSTR, + XLOGFMTARGS(curs->repl_write_lsn), + XLOGFMTARGS(curs->repl_flush_lsn), + XLOGFMTARGS(curs->repl_apply_lsn)); + + replybuf[len] = 'r'; len += 1; + fe_sendint64(curs->repl_write_lsn, &replybuf[len]); len += 8; + fe_sendint64(curs->repl_flush_lsn, &replybuf[len]); len += 8; + fe_sendint64(curs->repl_apply_lsn, &replybuf[len]); len += 8; + fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); len += 8; + replybuf[len] = reply_requested ? 1 : 0; len += 1; + + if (PQputCopyData(curs->conn->pgconn, replybuf, len) <= 0 || + PQflush(curs->conn->pgconn) != 0) { return 0; } + gettimeofday(&curs->repl_last_io, NULL); return 1; } @@ -1556,33 +1678,19 @@ sendFeedback(PGconn *conn, XLogRecPtr written_lsn, XLogRecPtr fsync_lsn, static int _pq_copy_both_v3(cursorObject *curs) { - PyObject *tmp = NULL; + PyObject *msg, *tmp = NULL; PyObject *write_func = NULL; - PyObject *obj = NULL; - replicationMessageObject *msg = NULL; int ret = -1; int is_text; - PGconn *conn; - char *buffer = NULL; + PGconn *pgconn; fd_set fds; - struct timeval last_comm, curr_time, ping_time, time_diff; - int len, hdr, reply, sel; - - XLogRecPtr written_lsn = InvalidXLogRecPtr, - fsync_lsn = InvalidXLogRecPtr, - data_start, wal_end; - pg_int64 send_time; + struct timeval curr_time, ping_time, time_diff; + int sel; if (!curs->copyfile) { - PyErr_SetString(ProgrammingError, - "can't execute START_REPLICATION: use the start_replication() method instead"); - goto exit; - } - - if (curs->keepalive_interval <= 0) { - PyErr_Format(PyExc_RuntimeError, "keepalive_interval must be > 0: %d", - curs->keepalive_interval); + psyco_set_error(ProgrammingError, curs, + "can't execute START_REPLICATION directly: use the start_replication() method instead"); goto exit; } @@ -1597,31 +1705,29 @@ _pq_copy_both_v3(cursorObject *curs) } CLEARPGRES(curs->pgres); - - /* timestamp of last communication with the server */ - gettimeofday(&last_comm, NULL); - - conn = curs->conn->pgconn; + pgconn = curs->conn->pgconn; while (1) { - len = PQgetCopyData(conn, &buffer, 1 /* async! */); - if (len < 0) { - break; + msg = pq_read_replication_message(curs, is_text); + if (!msg) { + goto exit; } - if (len == 0) { + else if (msg == Py_None) { + Py_DECREF(msg); + FD_ZERO(&fds); - FD_SET(PQsocket(conn), &fds); + FD_SET(PQsocket(pgconn), &fds); - /* set up timeout according to keepalive_interval, but no less than 1 second */ gettimeofday(&curr_time, NULL); - ping_time = last_comm; - ping_time.tv_sec += curs->keepalive_interval; + ping_time = curs->repl_last_io; + ping_time.tv_sec += curs->repl_keepalive_interval.tv_sec; + ping_time.tv_usec += curs->repl_keepalive_interval.tv_usec; timersub(&ping_time, &curr_time, &time_diff); if (time_diff.tv_sec > 0) { Py_BEGIN_ALLOW_THREADS; - sel = select(PQsocket(conn) + 1, &fds, NULL, NULL, &time_diff); + sel = select(PQsocket(pgconn) + 1, &fds, NULL, NULL, &time_diff); Py_END_ALLOW_THREADS; } else { @@ -1639,148 +1745,34 @@ _pq_copy_both_v3(cursorObject *curs) continue; } - if (sel > 0) { - if (!PQconsumeInput(conn)) { - Dprintf("_pq_copy_both_v3: PQconsumeInput failed"); + if (sel == 0) { + if (!pq_send_replication_feedback(curs, 0)) { pq_raise(curs->conn, curs, NULL); goto exit; } } - else { /* timeout */ - if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) { - pq_raise(curs->conn, curs, NULL); - goto exit; - } - } - gettimeofday(&last_comm, NULL); continue; } - if (len > 0 && buffer) { - gettimeofday(&last_comm, NULL); - - Dprintf("_pq_copy_both_v3: msg=%c, len=%d", buffer[0], len); - if (buffer[0] == 'w') { - /* msgtype(1), dataStart(8), walEnd(8), sendTime(8) */ - hdr = 1 + 8 + 8 + 8; - if (len < hdr + 1) { - PyErr_Format(PyExc_RuntimeError, - "streaming header too small in data message: %d", len); - goto exit; - } - - data_start = fe_recvint64(buffer + 1); - wal_end = fe_recvint64(buffer + 1 + 8); - send_time = fe_recvint64(buffer + 1 + 8 + 8); - - Dprintf("_pq_copy_both_v3: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR", send_time=%lld", - XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end), send_time); - - if (is_text) { - obj = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL); - } - else { - obj = Bytes_FromStringAndSize(buffer + hdr, len - hdr); - } - if (!obj) { goto exit; } - - msg = (replicationMessageObject *) - PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType, - curs, obj, NULL); - Py_DECREF(obj); - if (!msg) { goto exit; } - - msg->data_start = data_start; - msg->wal_end = wal_end; - msg->send_time = send_time; - - tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL); - - if (tmp == NULL) { - Dprintf("_pq_copy_both_v3: write_func returned NULL"); - goto exit; - } - Py_DECREF(tmp); - - /* update the LSN position we've written up to */ - if (written_lsn < wal_end) - written_lsn = wal_end; - - /* if requested by sync_server(msg), we confirm LSN with the server */ - if (curs->repl_sync_lsn != InvalidXLogRecPtr) { - Dprintf("_pq_copy_both_v3: server sync requested at "XLOGFMTSTR, - XLOGFMTARGS(curs->repl_sync_lsn)); - - if (fsync_lsn < curs->repl_sync_lsn) - fsync_lsn = curs->repl_sync_lsn; - - curs->repl_sync_lsn = InvalidXLogRecPtr; - - if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) { - pq_raise(curs->conn, curs, NULL); - goto exit; - } - gettimeofday(&last_comm, NULL); - } - - if (curs->stop_replication) { - Dprintf("_pq_copy_both_v3: stop_replication flag set by write_func"); - break; - } - - Py_DECREF(msg); - msg = NULL; - } - else if (buffer[0] == 'k') { - /* msgtype(1), walEnd(8), sendTime(8), reply(1) */ - hdr = 1 + 8 + 8; - if (len < hdr + 1) { - PyErr_Format(PyExc_RuntimeError, - "streaming header too small in keepalive message: %d", len); - goto exit; - } + else { + tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL); + Py_DECREF(msg); - reply = buffer[hdr]; - if (reply) { - if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) { - pq_raise(curs->conn, curs, NULL); - goto exit; - } - gettimeofday(&last_comm, NULL); - } - } - else { - PyErr_Format(PyExc_RuntimeError, - "unrecognized streaming message type: \"%c\"", buffer[0]); + if (tmp == NULL) { + Dprintf("_pq_copy_both_v3: write_func returned NULL"); goto exit; } + Py_DECREF(tmp); - /* buffer is allocated on every PQgetCopyData() call */ - PQfreemem(buffer); - buffer = NULL; + if (curs->repl_stop) { + Dprintf("_pq_copy_both_v3: repl_stop flag set by write_func"); + break; + } } } - if (len == -2) { - pq_raise(curs->conn, curs, NULL); - goto exit; - } - if (len == -1) { - curs->pgres = PQgetResult(curs->conn->pgconn); - - if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) - pq_raise(curs->conn, curs, NULL); - - CLEARPGRES(curs->pgres); - } - ret = 1; exit: - if (buffer) { - PQfreemem(buffer); - } - - Py_XDECREF(msg); Py_XDECREF(write_func); return ret; } @@ -1847,9 +1839,13 @@ pq_fetch(cursorObject *curs, int no_result) case PGRES_COPY_BOTH: Dprintf("pq_fetch: data from a streaming replication slot (no tuples)"); curs->rowcount = -1; - ex = _pq_copy_both_v3(curs); - /* error caught by out glorious notice handler */ - if (PyErr_Occurred()) ex = -1; + if (curs->conn->async) { + ex = 0; + } else { + ex = _pq_copy_both_v3(curs); + /* error caught by out glorious notice handler */ + if (PyErr_Occurred()) ex = -1; + } CLEARPGRES(curs->pgres); break; |