diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-06-30 12:44:58 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-06-30 12:44:58 +0000 |
commit | a374dcc0c34f51a2086122d834009716eb86cd54 (patch) | |
tree | a16d9dbdc5103b0144bdb66944430d1636165537 /python/qpid/tests | |
parent | bd67d8d00a4efde84e19bfbedd794b8e59d6e554 (diff) | |
download | qpid-python-a374dcc0c34f51a2086122d834009716eb86cd54.tar.gz |
fixed concurrent close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@959289 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/tests')
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 72 |
1 files changed, 71 insertions, 1 deletions
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py index c01f16e73c..52ca9f32be 100644 --- a/python/qpid/tests/messaging/endpoints.py +++ b/python/qpid/tests/messaging/endpoints.py @@ -20,12 +20,13 @@ # setup, usage, teardown, errors(sync), errors(async), stress, soak, # boundary-conditions, config -import errno, os, socket, time +import errno, os, socket, sys, time from qpid import compat from qpid.compat import set from qpid.messaging import * from qpid.messaging.transports import TRANSPORTS from qpid.tests.messaging import Base +from threading import Thread class SetupTests(Base): @@ -212,6 +213,32 @@ class ConnectionTests(Base): self.conn.close() assert not self.conn.attached() + def testSimultaneousClose(self): + ssns = [self.conn.session() for i in range(3)] + for s in ssns: + for i in range(3): + s.receiver("amq.topic") + s.sender("amq.topic") + + def closer(errors): + try: + self.conn.close() + except: + _, e, _ = sys.exc_info() + errors.append(compat.format_exc(e)) + + t1_errors = [] + t2_errors = [] + t1 = Thread(target=lambda: closer(t1_errors)) + t2 = Thread(target=lambda: closer(t2_errors)) + t1.start() + t2.start() + t1.join(self.delay()) + t2.join(self.delay()) + + assert not t1_errors, t1_errors[0] + assert not t2_errors, t2_errors[0] + class hangable: def __init__(self, host, port): @@ -655,6 +682,49 @@ class ReceiverTests(Base): assert msg.content == three self.ssn.acknowledge() + def fetchFromClosedTest(self, entry): + entry.close() + try: + msg = self.rcv.fetch(0) + assert False, "unexpected result: %s" % msg + except Empty, e: + assert False, "unexpected exception: %s" % e + except LinkClosed, e: + pass + + def testFetchFromClosedReceiver(self): + self.fetchFromClosedTest(self.rcv) + + def testFetchFromClosedSession(self): + self.fetchFromClosedTest(self.ssn) + + def testFetchFromClosedConnection(self): + self.fetchFromClosedTest(self.conn) + + def fetchFromConcurrentCloseTest(self, entry): + def closer(): + time.sleep(self.delay()) + entry.close() + t = Thread(target=closer) + t.start() + try: + msg = self.rcv.fetch() + assert False, "unexpected result: %s" % msg + except Empty, e: + assert False, "unexpected exception: %s" % e + except LinkClosed, e: + pass + t.join() + + def testFetchFromConcurrentCloseReceiver(self): + self.fetchFromConcurrentCloseTest(self.rcv) + + def testFetchFromConcurrentCloseSession(self): + self.fetchFromConcurrentCloseTest(self.ssn) + + def testFetchFromConcurrentCloseConnection(self): + self.fetchFromConcurrentCloseTest(self.conn) + def testCapacityIncrease(self): content = self.send("testCapacityIncrease") self.sleep() |