summaryrefslogtreecommitdiff
path: root/python/qpid/tests/messaging/endpoints.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/tests/messaging/endpoints.py')
-rw-r--r--python/qpid/tests/messaging/endpoints.py75
1 files changed, 75 insertions, 0 deletions
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
index aced76feac..a849957374 100644
--- a/python/qpid/tests/messaging/endpoints.py
+++ b/python/qpid/tests/messaging/endpoints.py
@@ -72,6 +72,81 @@ class SetupTests(Base):
while fds:
os.close(fds.pop())
+ def testReconnect(self):
+ options = self.connection_options()
+ import socket
+ from qpid.messaging import transports
+ transport = options.get("transport", "plain")
+ real = getattr(transports, transport)
+
+ class flaky:
+
+ def __init__(self, host, port):
+ self.real = real(host, port)
+ self.sent_count = 0
+ self.recv_count = 0
+
+ def fileno(self):
+ return self.real.fileno()
+
+ def reading(self, reading):
+ return self.real.reading(reading)
+
+ def writing(self, writing):
+ return self.real.writing(writing)
+
+ def send(self, bytes):
+ if self.sent_count > 1024:
+ raise socket.error("fake error")
+ n = self.real.send(bytes)
+ self.sent_count += n
+ return n
+
+ def recv(self, n):
+ if self.recv_count > 1024:
+ return ""
+ bytes = self.real.recv(n)
+ self.recv_count += len(bytes)
+ return bytes
+
+ def close(self):
+ self.real.close()
+
+ transports.flaky = flaky
+
+ options["reconnect"] = True
+ options["reconnect_interval"] = 0
+ options["transport"] = "flaky"
+
+ self.conn = Connection.open(self.broker.host, self.broker.port, **options)
+ ssn = self.conn.session()
+ snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}")
+ rcv = ssn.receiver(snd.target)
+
+ msgs = [self.message("testReconnect", i) for i in range(10)]
+ for m in msgs:
+ snd.send(m)
+
+ content = set()
+ drained = []
+ duplicates = []
+ try:
+ while True:
+ m = rcv.fetch(timeout=0)
+ if m.content not in content:
+ content.add(m.content)
+ drained.append(m)
+ else:
+ duplicates.append(m)
+ ssn.acknowledge(m)
+ except Empty:
+ pass
+ assert duplicates, "no duplicates"
+ redelivered = 3*[False] + 3*[True, False] + [True]
+ assert len(drained) == len(msgs) == len(redelivered)
+ for m, d, r in zip(msgs, drained, redelivered):
+ self.assertEcho(m, d, r)
+
class ConnectionTests(Base):
def setup_connection(self):