summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOleksandr Shulgin <oleksandr.shulgin@zalando.de>2016-01-05 12:31:57 +0100
committerOleksandr Shulgin <oleksandr.shulgin@zalando.de>2016-01-05 12:31:57 +0100
commit09a4bb70a168799a91f63f1c2039f456c485960f (patch)
treea6c08b7e079fe652dce69267928f030cb6c985c7
parent4b9a6f48f33f9f80a1a12850f5029bf7ab97145b (diff)
downloadpsycopg2-09a4bb70a168799a91f63f1c2039f456c485960f.tar.gz
Allow retrying start_replication after syntax or data error.
-rw-r--r--psycopg/pqpath.c7
-rw-r--r--psycopg/replication_cursor.h20
-rw-r--r--psycopg/replication_cursor_type.c21
-rw-r--r--tests/test_replication.py13
4 files changed, 30 insertions, 31 deletions
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index 760fc97..6d6728c 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -1870,8 +1870,11 @@ pq_fetch(cursorObject *curs, int no_result)
Dprintf("pq_fetch: data from a streaming replication slot (no tuples)");
curs->rowcount = -1;
ex = 0;
- /* nothing to do here: pq_copy_both will be called separately */
- CLEARPGRES(curs->pgres);
+ /* Nothing to do here: pq_copy_both will be called separately.
+
+ Also don't clear the result status: it's checked in
+ consume_stream. */
+ /*CLEARPGRES(curs->pgres);*/
break;
case PGRES_TUPLES_OK:
diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h
index 36ced13..71c6e19 100644
--- a/psycopg/replication_cursor.h
+++ b/psycopg/replication_cursor.h
@@ -38,11 +38,10 @@ extern HIDDEN PyTypeObject replicationCursorType;
typedef struct replicationCursorObject {
cursorObject cur;
- int started:1; /* if replication is started */
int consuming:1; /* if running the consume loop */
int decode:1; /* if we should use character decoding on the messages */
- struct timeval last_io ; /* timestamp of the last exchange with the server */
+ struct timeval last_io; /* timestamp of the last exchange with the server */
struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */
XLogRecPtr write_lsn; /* LSNs for replication feedback messages */
@@ -53,23 +52,6 @@ typedef struct replicationCursorObject {
RAISES_NEG int psyco_repl_curs_datetime_init(void);
-/* exception-raising macros */
-#define EXC_IF_REPLICATING(self, cmd) \
-do \
- if ((self)->started) { \
- PyErr_SetString(ProgrammingError, \
- #cmd " cannot be used when replication is already in progress"); \
- return NULL; } \
-while (0)
-
-#define EXC_IF_NOT_REPLICATING(self, cmd) \
-do \
- if (!(self)->started) { \
- PyErr_SetString(ProgrammingError, \
- #cmd " cannot be used when replication is not in progress"); \
- return NULL; } \
-while (0)
-
#ifdef __cplusplus
}
#endif
diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c
index f652984..204ff20 100644
--- a/psycopg/replication_cursor_type.c
+++ b/psycopg/replication_cursor_type.c
@@ -59,7 +59,6 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
EXC_IF_CURS_CLOSED(curs);
EXC_IF_GREEN(start_replication_expert);
EXC_IF_TPC_PREPARED(conn, start_replication_expert);
- EXC_IF_REPLICATING(self, start_replication_expert);
Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %d", command, decode);
@@ -67,7 +66,6 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
res = Py_None;
Py_INCREF(res);
- self->started = 1;
self->decode = decode;
gettimeofday(&self->last_io, NULL);
}
@@ -96,7 +94,13 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self,
EXC_IF_CURS_ASYNC(curs, consume_stream);
EXC_IF_GREEN(consume_stream);
EXC_IF_TPC_PREPARED(self->cur.conn, consume_stream);
- EXC_IF_NOT_REPLICATING(self, consume_stream);
+
+ Dprintf("psyco_repl_curs_consume_stream");
+
+ if (keepalive_interval < 1.0) {
+ psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)");
+ return NULL;
+ }
if (self->consuming) {
PyErr_SetString(ProgrammingError,
@@ -104,12 +108,12 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self,
return NULL;
}
- Dprintf("psyco_repl_curs_consume_stream");
-
- if (keepalive_interval < 1.0) {
- psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)");
+ if (curs->pgres == NULL || PQresultStatus(curs->pgres) != PGRES_COPY_BOTH) {
+ PyErr_SetString(ProgrammingError,
+ "consume_stream: not replicating, call start_replication first");
return NULL;
}
+ CLEARPGRES(curs->pgres);
self->consuming = 1;
@@ -135,7 +139,6 @@ psyco_repl_curs_read_message(replicationCursorObject *self)
EXC_IF_CURS_CLOSED(curs);
EXC_IF_GREEN(read_message);
EXC_IF_TPC_PREPARED(self->cur.conn, read_message);
- EXC_IF_NOT_REPLICATING(self, read_message);
if (pq_read_replication_message(self, &msg) < 0) {
return NULL;
@@ -160,7 +163,6 @@ psyco_repl_curs_send_feedback(replicationCursorObject *self,
static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL};
EXC_IF_CURS_CLOSED(curs);
- EXC_IF_NOT_REPLICATING(self, send_feedback);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist,
&write_lsn, &flush_lsn, &apply_lsn, &reply)) {
@@ -246,7 +248,6 @@ static struct PyGetSetDef replicationCursorObject_getsets[] = {
static int
replicationCursor_setup(replicationCursorObject* self)
{
- self->started = 0;
self->consuming = 0;
self->decode = 0;
diff --git a/tests/test_replication.py b/tests/test_replication.py
index 4441a26..a316135 100644
--- a/tests/test_replication.py
+++ b/tests/test_replication.py
@@ -119,6 +119,18 @@ class ReplicationTest(ReplicationTestCase):
cur.start_replication(self.slot)
@skip_before_postgres(9, 4) # slots require 9.4
+ def test_start_and_recover_from_error(self):
+ conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
+ if conn is None: return
+ cur = conn.cursor()
+
+ self.create_replication_slot(cur, output_plugin='test_decoding')
+
+ self.assertRaises(psycopg2.DataError, cur.start_replication,
+ slot_name=self.slot, options=dict(invalid_param='value'))
+ cur.start_replication(slot_name=self.slot)
+
+ @skip_before_postgres(9, 4) # slots require 9.4
def test_stop_replication(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None: return
@@ -162,6 +174,7 @@ class AsyncReplicationTest(ReplicationTestCase):
cur.send_feedback(flush_lsn=msg.data_start)
+ # cannot be used in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
def process_stream():