diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-04-01 20:39:31 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-04-01 20:39:31 +0000 |
commit | e63d139b7f275d2271d4a5f15b9894836ab1e371 (patch) | |
tree | 2460eeef7686fafb1114bb9728b8edddfdd62484 /python/qpid/tests | |
parent | 0f150114eceda34771972fa4fb194b6eef6b1afa (diff) | |
download | qpid-python-e63d139b7f275d2271d4a5f15b9894836ab1e371.tar.gz |
updated reconnect option names to match C++ API
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@930084 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/tests')
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py index aced76feac..a849957374 100644 --- a/python/qpid/tests/messaging/endpoints.py +++ b/python/qpid/tests/messaging/endpoints.py @@ -72,6 +72,81 @@ class SetupTests(Base): while fds: os.close(fds.pop()) + def testReconnect(self): + options = self.connection_options() + import socket + from qpid.messaging import transports + transport = options.get("transport", "plain") + real = getattr(transports, transport) + + class flaky: + + def __init__(self, host, port): + self.real = real(host, port) + self.sent_count = 0 + self.recv_count = 0 + + def fileno(self): + return self.real.fileno() + + def reading(self, reading): + return self.real.reading(reading) + + def writing(self, writing): + return self.real.writing(writing) + + def send(self, bytes): + if self.sent_count > 1024: + raise socket.error("fake error") + n = self.real.send(bytes) + self.sent_count += n + return n + + def recv(self, n): + if self.recv_count > 1024: + return "" + bytes = self.real.recv(n) + self.recv_count += len(bytes) + return bytes + + def close(self): + self.real.close() + + transports.flaky = flaky + + options["reconnect"] = True + options["reconnect_interval"] = 0 + options["transport"] = "flaky" + + self.conn = Connection.open(self.broker.host, self.broker.port, **options) + ssn = self.conn.session() + snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}") + rcv = ssn.receiver(snd.target) + + msgs = [self.message("testReconnect", i) for i in range(10)] + for m in msgs: + snd.send(m) + + content = set() + drained = [] + duplicates = [] + try: + while True: + m = rcv.fetch(timeout=0) + if m.content not in content: + content.add(m.content) + drained.append(m) + else: + duplicates.append(m) + ssn.acknowledge(m) + except Empty: + pass + assert duplicates, "no duplicates" + redelivered = 3*[False] + 3*[True, False] + [True] + assert len(drained) == len(msgs) == len(redelivered) + for m, d, r in zip(msgs, drained, redelivered): + self.assertEcho(m, d, r) + class ConnectionTests(Base): def setup_connection(self): |