summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-10-19 15:42:42 +0200
committerOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-10-19 15:42:42 +0200
commit4ab7cf0157ae311aa22c0cb38410a3d2ab9bea06 (patch)
treeacd5c8837397611b0ed515c69c2498395ebd5aa1 /tests
parent0435320f34c56ced8c15899053920fc94fd4f3d7 (diff)
downloadpsycopg2-4ab7cf0157ae311aa22c0cb38410a3d2ab9bea06.tar.gz
Replace stop_replication with requirement for an exception.
Diffstat (limited to 'tests')
-rw-r--r--tests/test_replication.py45
1 files changed, 31 insertions, 14 deletions
diff --git a/tests/test_replication.py b/tests/test_replication.py
index dfe11af..cd1321a 100644
--- a/tests/test_replication.py
+++ b/tests/test_replication.py
@@ -25,6 +25,7 @@
import psycopg2
import psycopg2.extensions
from psycopg2.extras import PhysicalReplicationConnection, LogicalReplicationConnection
+from psycopg2.extras import StopReplication
from testutils import unittest
from testutils import skip_before_postgres
@@ -77,20 +78,6 @@ class ReplicationTest(ReplicationTestCase):
cur.execute("IDENTIFY_SYSTEM")
cur.fetchall()
- @skip_before_postgres(9, 0)
- def test_stop_replication_raises(self):
- conn = self.repl_connect(connection_factory=PhysicalReplicationConnection)
- if conn is None: return
- cur = conn.cursor()
- self.assertRaises(psycopg2.ProgrammingError, cur.stop_replication)
-
- cur.start_replication()
- cur.stop_replication() # doesn't raise now
-
- def consume(msg):
- pass
- cur.consume_replication_stream(consume) # should return at once
-
@skip_before_postgres(9, 4) # slots require 9.4
def test_create_replication_slot(self):
conn = self.repl_connect(connection_factory=PhysicalReplicationConnection)
@@ -115,6 +102,36 @@ class ReplicationTest(ReplicationTestCase):
self.create_replication_slot(cur, slot)
cur.start_replication(slot)
+ @skip_before_postgres(9, 4) # slots require 9.4
+ def test_stop_replication(self):
+ conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
+ if conn is None: return
+ cur = conn.cursor()
+
+ slot = "test_slot1"
+
+ self.create_replication_slot(cur, slot, output_plugin='test_decoding')
+
+ self.make_replication_event()
+
+ cur.start_replication(slot)
+ def consume(msg):
+ raise StopReplication()
+ self.assertRaises(StopReplication, cur.consume_replication_stream, consume)
+
+ # generate an event for our replication stream
+ def make_replication_event(self):
+ conn = self.connect()
+ if conn is None: return
+ cur = conn.cursor()
+
+ try:
+ cur.execute("DROP TABLE dummy1")
+ except psycopg2.ProgrammingError:
+ conn.rollback()
+ cur.execute("CREATE TABLE dummy1()")
+ conn.commit()
+
class AsyncReplicationTest(ReplicationTestCase):
@skip_before_postgres(9, 4)