summaryrefslogtreecommitdiff
path: root/python/qpid/tests
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-06-30 12:44:58 +0000
committerRafael H. Schloming <rhs@apache.org>2010-06-30 12:44:58 +0000
commita374dcc0c34f51a2086122d834009716eb86cd54 (patch)
treea16d9dbdc5103b0144bdb66944430d1636165537 /python/qpid/tests
parentbd67d8d00a4efde84e19bfbedd794b8e59d6e554 (diff)
downloadqpid-python-a374dcc0c34f51a2086122d834009716eb86cd54.tar.gz
fixed concurrent close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@959289 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/tests')
-rw-r--r--python/qpid/tests/messaging/endpoints.py72
1 files changed, 71 insertions, 1 deletions
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
index c01f16e73c..52ca9f32be 100644
--- a/python/qpid/tests/messaging/endpoints.py
+++ b/python/qpid/tests/messaging/endpoints.py
@@ -20,12 +20,13 @@
# setup, usage, teardown, errors(sync), errors(async), stress, soak,
# boundary-conditions, config
-import errno, os, socket, time
+import errno, os, socket, sys, time
from qpid import compat
from qpid.compat import set
from qpid.messaging import *
from qpid.messaging.transports import TRANSPORTS
from qpid.tests.messaging import Base
+from threading import Thread
class SetupTests(Base):
@@ -212,6 +213,32 @@ class ConnectionTests(Base):
self.conn.close()
assert not self.conn.attached()
+ def testSimultaneousClose(self):
+ ssns = [self.conn.session() for i in range(3)]
+ for s in ssns:
+ for i in range(3):
+ s.receiver("amq.topic")
+ s.sender("amq.topic")
+
+ def closer(errors):
+ try:
+ self.conn.close()
+ except:
+ _, e, _ = sys.exc_info()
+ errors.append(compat.format_exc(e))
+
+ t1_errors = []
+ t2_errors = []
+ t1 = Thread(target=lambda: closer(t1_errors))
+ t2 = Thread(target=lambda: closer(t2_errors))
+ t1.start()
+ t2.start()
+ t1.join(self.delay())
+ t2.join(self.delay())
+
+ assert not t1_errors, t1_errors[0]
+ assert not t2_errors, t2_errors[0]
+
class hangable:
def __init__(self, host, port):
@@ -655,6 +682,49 @@ class ReceiverTests(Base):
assert msg.content == three
self.ssn.acknowledge()
+ def fetchFromClosedTest(self, entry):
+ entry.close()
+ try:
+ msg = self.rcv.fetch(0)
+ assert False, "unexpected result: %s" % msg
+ except Empty, e:
+ assert False, "unexpected exception: %s" % e
+ except LinkClosed, e:
+ pass
+
+ def testFetchFromClosedReceiver(self):
+ self.fetchFromClosedTest(self.rcv)
+
+ def testFetchFromClosedSession(self):
+ self.fetchFromClosedTest(self.ssn)
+
+ def testFetchFromClosedConnection(self):
+ self.fetchFromClosedTest(self.conn)
+
+ def fetchFromConcurrentCloseTest(self, entry):
+ def closer():
+ time.sleep(self.delay())
+ entry.close()
+ t = Thread(target=closer)
+ t.start()
+ try:
+ msg = self.rcv.fetch()
+ assert False, "unexpected result: %s" % msg
+ except Empty, e:
+ assert False, "unexpected exception: %s" % e
+ except LinkClosed, e:
+ pass
+ t.join()
+
+ def testFetchFromConcurrentCloseReceiver(self):
+ self.fetchFromConcurrentCloseTest(self.rcv)
+
+ def testFetchFromConcurrentCloseSession(self):
+ self.fetchFromConcurrentCloseTest(self.ssn)
+
+ def testFetchFromConcurrentCloseConnection(self):
+ self.fetchFromConcurrentCloseTest(self.conn)
+
def testCapacityIncrease(self):
content = self.send("testCapacityIncrease")
self.sleep()