summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Kukushkin <alexander.kukushkin@zalando.de>2019-05-06 15:26:21 +0200
committerAlexander Kukushkin <alexander.kukushkin@zalando.de>2019-05-06 15:26:21 +0200
commitf827e49f558878a8e842550e19cbc44b2de6ef13 (patch)
treecc4408691d7d6f155c194e1355d7c758e2df00ac
parent6cff5a3e089f155519765f9cdd49ef78b23ab740 (diff)
downloadpsycopg2-f827e49f558878a8e842550e19cbc44b2de6ef13.tar.gz
Change the default value of keepalive_interval parameter to None
The previous default value was 10 seconds, what might cause silent overwrite of the *status_interval* specified in the `start_replication()`
-rw-r--r--doc/src/extras.rst5
-rw-r--r--psycopg/replication_cursor_type.c34
2 files changed, 29 insertions, 10 deletions
diff --git a/doc/src/extras.rst b/doc/src/extras.rst
index b7136fe..68c9e3a 100644
--- a/doc/src/extras.rst
+++ b/doc/src/extras.rst
@@ -360,7 +360,7 @@ The individual messages in the replication stream are represented by
:param status_interval: time between feedback packets sent to the server
- .. method:: consume_stream(consume, keepalive_interval=10)
+ .. method:: consume_stream(consume, keepalive_interval=None)
:param consume: a callable object with signature :samp:`consume({msg})`
:param keepalive_interval: interval (in seconds) to send keepalive
@@ -386,6 +386,9 @@ The individual messages in the replication stream are represented by
This method also sends feedback messages to the server every
*keepalive_interval* (in seconds). The value of this parameter must
be set to at least 1 second, but it can have a fractional part.
+ If the *keepalive_interval* is not specified, the value of
+ *status_interval* specified in the `start_replication()` or
+ `start_replication_expert()` will be used.
The client must confirm every processed message by calling
`send_feedback()` method on the corresponding replication cursor. A
diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c
index a31f6b8..a590386 100644
--- a/psycopg/replication_cursor_type.c
+++ b/psycopg/replication_cursor_type.c
@@ -96,19 +96,19 @@ exit:
}
#define consume_stream_doc \
-"consume_stream(consumer, keepalive_interval=10) -- Consume replication stream."
+"consume_stream(consumer, keepalive_interval=None) -- Consume replication stream."
static PyObject *
consume_stream(replicationCursorObject *self,
PyObject *args, PyObject *kwargs)
{
cursorObject *curs = &self->cur;
- PyObject *consume = NULL, *res = NULL;
- double keepalive_interval = 10;
+ PyObject *consume = NULL, *interval = NULL, *res = NULL;
+ double keepalive_interval = 0;
static char *kwlist[] = {"consume", "keepalive_interval", NULL};
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kwlist,
- &consume, &keepalive_interval)) {
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|O", kwlist,
+ &consume, &interval)) {
return NULL;
}
@@ -119,9 +119,23 @@ consume_stream(replicationCursorObject *self,
Dprintf("consume_stream");
- if (keepalive_interval < 1.0) {
- psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)");
- return NULL;
+ if (interval && interval != Py_None) {
+
+ if (PyFloat_Check(interval)) {
+ keepalive_interval = PyFloat_AsDouble(interval);
+ } else if (PyLong_Check(interval)) {
+ keepalive_interval = PyLong_AsDouble(interval);
+ } else if (PyInt_Check(interval)) {
+ keepalive_interval = PyInt_AsLong(interval);
+ } else {
+ psyco_set_error(ProgrammingError, curs, "keepalive_interval must be int or float");
+ return NULL;
+ }
+
+ if (keepalive_interval < 1.0) {
+ psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)");
+ return NULL;
+ }
}
if (self->consuming) {
@@ -138,7 +152,9 @@ consume_stream(replicationCursorObject *self,
CLEARPGRES(curs->pgres);
self->consuming = 1;
- set_status_interval(self, keepalive_interval);
+ if (keepalive_interval >= 1) {
+ set_status_interval(self, keepalive_interval);
+ }
if (pq_copy_both(self, consume) >= 0) {
res = Py_None;