diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_replication.py | 45 |
1 files changed, 31 insertions, 14 deletions
diff --git a/tests/test_replication.py b/tests/test_replication.py index dfe11af..cd1321a 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -25,6 +25,7 @@ import psycopg2 import psycopg2.extensions from psycopg2.extras import PhysicalReplicationConnection, LogicalReplicationConnection +from psycopg2.extras import StopReplication from testutils import unittest from testutils import skip_before_postgres @@ -77,20 +78,6 @@ class ReplicationTest(ReplicationTestCase): cur.execute("IDENTIFY_SYSTEM") cur.fetchall() - @skip_before_postgres(9, 0) - def test_stop_replication_raises(self): - conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) - if conn is None: return - cur = conn.cursor() - self.assertRaises(psycopg2.ProgrammingError, cur.stop_replication) - - cur.start_replication() - cur.stop_replication() # doesn't raise now - - def consume(msg): - pass - cur.consume_replication_stream(consume) # should return at once - @skip_before_postgres(9, 4) # slots require 9.4 def test_create_replication_slot(self): conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) @@ -115,6 +102,36 @@ class ReplicationTest(ReplicationTestCase): self.create_replication_slot(cur, slot) cur.start_replication(slot) + @skip_before_postgres(9, 4) # slots require 9.4 + def test_stop_replication(self): + conn = self.repl_connect(connection_factory=LogicalReplicationConnection) + if conn is None: return + cur = conn.cursor() + + slot = "test_slot1" + + self.create_replication_slot(cur, slot, output_plugin='test_decoding') + + self.make_replication_event() + + cur.start_replication(slot) + def consume(msg): + raise StopReplication() + self.assertRaises(StopReplication, cur.consume_replication_stream, consume) + + # generate an event for our replication stream + def make_replication_event(self): + conn = self.connect() + if conn is None: return + cur = conn.cursor() + + try: + cur.execute("DROP TABLE dummy1") + except psycopg2.ProgrammingError: + conn.rollback() + cur.execute("CREATE TABLE dummy1()") + conn.commit() + class AsyncReplicationTest(ReplicationTestCase): @skip_before_postgres(9, 4) |