summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/python/cpp_failing.txt4
-rw-r--r--qpid/python/java_failing.txt12
-rw-r--r--qpid/python/qpid/client.py11
-rw-r--r--qpid/python/qpid/connection.py3
-rw-r--r--qpid/python/qpid/peer.py11
-rw-r--r--qpid/python/qpid/reference.py2
-rw-r--r--qpid/python/qpid/spec.py5
-rw-r--r--qpid/python/qpid/testlib.py53
-rw-r--r--qpid/python/tests/message.py214
-rw-r--r--qpid/python/tests/tx.py3
10 files changed, 243 insertions, 75 deletions
diff --git a/qpid/python/cpp_failing.txt b/qpid/python/cpp_failing.txt
index e69de29bb2..69c7cf3f5a 100644
--- a/qpid/python/cpp_failing.txt
+++ b/qpid/python/cpp_failing.txt
@@ -0,0 +1,4 @@
+tests.message.MessageTests.test_checkpoint
+tests.message.MessageTests.test_reference_large
+tests.message.MessageTests.test_reject
+tests.basic.BasicTests.test_get
diff --git a/qpid/python/java_failing.txt b/qpid/python/java_failing.txt
index b671f86064..7252d0f496 100644
--- a/qpid/python/java_failing.txt
+++ b/qpid/python/java_failing.txt
@@ -1,4 +1,4 @@
-tests.basic.BasicTests.test_qos_prefetch_count
+ntests.basic.BasicTests.test_qos_prefetch_count
tests.basic.BasicTests.test_ack
tests.basic.BasicTests.test_cancel
tests.basic.BasicTests.test_consume_exclusive
@@ -8,11 +8,11 @@ tests.basic.BasicTests.test_consume_unique_consumers
tests.basic.BasicTests.test_get
tests.basic.BasicTests.test_qos_prefetch_size
tests.basic.BasicTests.test_recover_requeue
+
tests.exchange.RecommendedTypesRuleTests.testTopic
tests.exchange.RequiredInstancesRuleTests.testAmqTopic
-tests.message.MessageTests.test_qos_prefetch_count
-tests.message.MessageTests.test_ack
-tests.message.MessageTests.test_get
-tests.message.MessageTests.test_qos_prefetch_size
-tests.message.MessageTests.test_recover_requeue
+tests.message.MessageTests.test_checkpoint
+tests.message.MessageTests.test_reject
+
+tests.broker.BrokerTests.test_ping_pong
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py
index ea6aa7901a..e548ef0e99 100644
--- a/qpid/python/qpid/client.py
+++ b/qpid/python/qpid/client.py
@@ -76,7 +76,8 @@ class Client:
self.locale = locale
self.tune_params = tune_params
- self.conn = Connection(connect(self.host, self.port), self.spec)
+ self.socket = connect(self.host, self.port)
+ self.conn = Connection(self.socket, self.spec)
self.peer = Peer(self.conn, ClientDelegate(self), self.opened)
self.conn.init()
@@ -90,6 +91,9 @@ class Client:
def opened(self, ch):
ch.references = References()
+ def close(self):
+ self.socket.close()
+
class ClientDelegate(Delegate):
def __init__(self, client):
@@ -112,9 +116,8 @@ class ClientDelegate(Delegate):
def message_transfer(self, ch, msg):
if isinstance(msg.body, ReferenceId):
- self.client.queue(msg.destination).put(ch.references.get(msg.body.id))
- else:
- self.client.queue(msg.destination).put(msg)
+ msg.reference = ch.references.get(msg.body.id)
+ self.client.queue(msg.destination).put(msg)
def message_open(self, ch, msg):
ch.references.open(msg.reference)
diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py
index 0785fe8774..cdfa2c2dc0 100644
--- a/qpid/python/qpid/connection.py
+++ b/qpid/python/qpid/connection.py
@@ -53,6 +53,9 @@ class SockIO:
def flush(self):
pass
+ def close(self):
+ self.sock.shutdown(socket.SHUT_RDWR)
+
def connect(host, port):
sock = socket.socket()
sock.connect((host, port))
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index 6c8c6647c9..28db20f187 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -97,9 +97,12 @@ class Peer:
self.fatal()
def close(self, reason):
+ # We must close the delegate first because closing channels
+ # may wake up waiting threads and we don't want them to see
+ # the delegate as open.
+ self.delegate.close(reason)
for ch in self.channels.values():
ch.close(reason)
- self.delegate.close(reason)
def writer(self):
try:
@@ -144,7 +147,7 @@ class Requester:
self.write(frame, content)
def receive(self, channel, frame):
- listener = self.outstanding.pop(frame.id)
+ listener = self.outstanding.pop(frame.request_id)
listener(channel, frame)
class Responder:
@@ -178,8 +181,8 @@ class Channel:
self.requester = Requester(self.write)
self.responder = Responder(self.write)
- # XXX: better switch
- self.reliable = False
+ # Use reliable framing if version == 0-9.
+ self.reliable = (spec.major == 0 and spec.minor == 9)
self.synchronous = True
def close(self, reason):
diff --git a/qpid/python/qpid/reference.py b/qpid/python/qpid/reference.py
index d357560390..48ecb67656 100644
--- a/qpid/python/qpid/reference.py
+++ b/qpid/python/qpid/reference.py
@@ -111,7 +111,7 @@ class References:
self.get(id).close()
self.lock.acquire()
try:
- del map[id]
+ self.map.pop(id)
finally:
self.lock.release()
diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py
index e430c45b96..4f0661bcbc 100644
--- a/qpid/python/qpid/spec.py
+++ b/qpid/python/qpid/spec.py
@@ -79,12 +79,11 @@ class Spec(Metadata):
PRINT=["major", "minor", "file"]
- def __init__(self, major, minor, file, errata):
+ def __init__(self, major, minor, file):
Metadata.__init__(self)
self.major = major
self.minor = minor
self.file = file
- self.errata = errata
self.constants = SpecContainer()
self.classes = SpecContainer()
# methods indexed by classname_methname
@@ -275,7 +274,7 @@ def load_fields(nd, l, domains):
def load(specfile, *errata):
doc = xmlutil.parse(specfile)
spec_root = doc["amqp"][0]
- spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile, errata)
+ spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile)
for root in [spec_root] + map(lambda x: xmlutil.parse(x)["amqp"][0], errata):
# constants
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py
index dcbf0ed91c..05641bce7e 100644
--- a/qpid/python/qpid/testlib.py
+++ b/qpid/python/qpid/testlib.py
@@ -26,6 +26,7 @@ import qpid.client, qpid.spec
import Queue
from getopt import getopt, GetoptError
from qpid.content import Content
+from qpid.message import Message
def findmodules(root):
"""Find potential python modules under directory root"""
@@ -56,15 +57,16 @@ class TestRunner:
run-tests [options] [test*]
The name of a test is package.module.ClassName.testMethod
Options:
- -?/-h/--help : this message
+ -?/-h/--help : this message
-s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations:
0-8 - use the default 0-8 specification.
0-9 - use the default 0-9 specification.
+ -e/--errata <errata.xml> : file containing amqp XML errata
-b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to
- -v/--verbose : verbose - lists tests as they are run.
- -d/--debug : enable debug logging.
- -i/--ignore <test> : ignore the named test.
- -I/--ignore-file : file containing patterns to ignore.
+ -v/--verbose : verbose - lists tests as they are run.
+ -d/--debug : enable debug logging.
+ -i/--ignore <test> : ignore the named test.
+ -I/--ignore-file : file containing patterns to ignore.
"""
sys.exit(1)
@@ -103,24 +105,27 @@ Options:
for opt, value in opts:
if opt in ("-?", "-h", "--help"): self._die()
if opt in ("-s", "--spec"): self.specfile = value
+ if opt in ("-e", "--errata"): self.errata.append(value)
if opt in ("-b", "--broker"): self.setBroker(value)
if opt in ("-v", "--verbose"): self.verbose = 2
if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
if opt in ("-i", "--ignore"): self.ignore.append(value)
if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
+ # Abbreviations for default settings.
if (self.specfile == "0-8"):
- self.specfile = "../specs/amqp.0-8.xml"
+ self.specfile = "../specs/amqp.0-8.xml"
if (self.specfile == "0-9"):
- self.specfile = "../specs/amqp.0-9.xml"
- self.errata = ["../specs/amqp-errata.0-9.xml"]
+ self.specfile = "../specs/amqp.0-9.xml"
+ self.errata.append("../specs/amqp-errata.0-9.xml")
+ if (self.specfile == None):
+ self._die("No XML specification provided")
print "Using specification from:", self.specfile
self.spec = qpid.spec.load(self.specfile, *self.errata)
if len(self.tests) == 0:
if self.use08spec():
- testdir="tests_0-8"
+ self.tests=findmodules("tests_0-8")
else:
- testdir="tests"
- self.tests=findmodules(testdir)
+ self.tests=findmodules("tests")
def testSuite(self):
class IgnoringTestSuite(unittest.TestSuite):
@@ -137,7 +142,11 @@ Options:
self._parseargs(args)
runner = unittest.TextTestRunner(descriptions=False,
verbosity=self.verbose)
- result = runner.run(self.testSuite())
+ try:
+ result = runner.run(self.testSuite())
+ except:
+ print "Unhandled error in test:", sys.exc_info()
+
if (self.ignore):
print "======================================="
print "NOTE: the following tests were ignored:"
@@ -181,10 +190,18 @@ class TestBase(unittest.TestCase):
self.channel.channel_open()
def tearDown(self):
- for ch, q in self.queues:
- ch.queue_delete(queue=q)
- for ch, ex in self.exchanges:
- ch.exchange_delete(exchange=ex)
+ try:
+ for ch, q in self.queues:
+ ch.queue_delete(queue=q)
+ for ch, ex in self.exchanges:
+ ch.exchange_delete(exchange=ex)
+ except:
+ print "Error on tearDown:", sys.exc_info()
+
+ if not self.client.closed:
+ self.client.channel(0).connection_close(reply_code=200)
+ else:
+ self.client.close()
def connect(self, *args, **keys):
"""Create a new connction, return the Client object"""
@@ -261,13 +278,15 @@ class TestBase(unittest.TestCase):
"""
self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
- def assertChannelException(self, expectedCode, message):
+ def assertChannelException(self, expectedCode, message):
+ if not isinstance(message, Message): self.fail("expected channel_close method")
self.assertEqual("channel", message.method.klass.name)
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
def assertConnectionException(self, expectedCode, message):
+ if not isinstance(message, Message): self.fail("expected connection_close method")
self.assertEqual("connection", message.method.klass.name)
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
diff --git a/qpid/python/tests/message.py b/qpid/python/tests/message.py
index 916a9825bd..d5f5d4dbc2 100644
--- a/qpid/python/tests/message.py
+++ b/qpid/python/tests/message.py
@@ -171,8 +171,7 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok()
- msg2.ok()
+ msg1.ok(batchoffset=1)#One and Two
msg4.ok()
channel.message_recover(requeue=False)
@@ -216,9 +215,8 @@ class MessageTests(TestBase):
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
- msg1.ok() #One
- msg2.ok() #Two
- msg4.ok() #Two
+ msg1.ok(batchoffset=1) #One and Two
+ msg4.ok() #Four
channel.message_cancel(destination="consumer_tag")
channel.message_consume(queue="test-requeue", destination="consumer_tag")
@@ -263,11 +261,9 @@ class MessageTests(TestBase):
channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i)
#only 5 messages should have been delivered:
- msgs = []
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.append(msg)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 6th message in original queue: " + extra.body)
@@ -275,18 +271,13 @@ class MessageTests(TestBase):
#ack messages and check that the next set arrive ok:
#todo: once batching is implmented, send a single response for all messages
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#1-5
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.append(msg)
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#6-10
try:
extra = queue.get(timeout=1)
@@ -313,12 +304,9 @@ class MessageTests(TestBase):
channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i)
#only 5 messages should have been delivered (i.e. 45 bytes worth):
- msgs = []
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- print "Got Message %d" % i
- msgs.append(msg)
try:
extra = queue.get(timeout=1)
@@ -326,18 +314,13 @@ class MessageTests(TestBase):
except Empty: None
#ack messages and check that the next set arrive ok:
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#1-5
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.append(msg)
- for msg in msgs:
- msg.ok()
- del msgs
+ msg.ok(batchoffset=-4)#6-10
try:
extra = queue.get(timeout=1)
@@ -383,14 +366,13 @@ class MessageTests(TestBase):
reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
- reply.ok()
-
- #todo: when batching is available, test ack multiple
- #if(i == 13):
- # channel.message_ack(delivery_tag=reply.delivery_tag, multiple=True)
- #if(i in [15, 17, 19]):
- # channel.message_ack(delivery_tag=reply.delivery_tag)
+ msg = self.client.queue(tag).get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ if (i==13):
+ msg.ok(batchoffset=-2)#11, 12 & 13
+ if(i in [15, 17, 19]):
+ msg.ok()
reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
@@ -479,8 +461,9 @@ class MessageTests(TestBase):
queue = other.queue("c1")
msg = queue.get(timeout=1)
- self.assertTrue(isinstance(msg, Reference))
- self.assertEquals(data, msg.get_complete())
+ self.assertTrue(isinstance(msg.body, ReferenceId))
+ self.assertTrue(msg.reference)
+ self.assertEquals(data, msg.reference.get_complete())
def test_reference_completion(self):
"""
@@ -511,12 +494,165 @@ class MessageTests(TestBase):
#first, wait for the ok for the transfer
ack.get_response(timeout=1)
+ self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
+
+ def test_reference_multi_transfer(self):
+ """
+ Test that multiple transfer requests for the same reference are
+ correctly handled.
+ """
+ channel = self.channel
+ #declare and consume from two queues
+ channel.queue_declare(queue="q-one", exclusive=True)
+ channel.queue_declare(queue="q-two", exclusive=True)
+ channel.message_consume(queue="q-one", destination="q-one")
+ channel.message_consume(queue="q-two", destination="q-two")
+ queue1 = self.client.queue("q-one")
+ queue2 = self.client.queue("q-two")
+
+ #transfer a single ref to both queues (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="my data")
+ ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ #check that both queues have the message
+ self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
+ self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
+ self.assertEmpty(queue1)
+ self.assertEmpty(queue2)
+
+ #transfer a single ref to the same queue twice (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="second message")
+ ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ msg1 = queue1.get(timeout=1)
+ msg2 = queue1.get(timeout=1)
+ #order is undefined
+ if msg1.message_id == "abc":
+ self.assertEquals(msg2.message_id, "xyz")
+ else:
+ self.assertEquals(msg1.message_id, "xyz")
+ self.assertEquals(msg2.message_id, "abc")
+
+ #would be legal for the incoming messages to be transfered
+ #inline or by reference in any combination
+
+ if isinstance(msg1.body, ReferenceId):
+ self.assertEquals("second message", msg1.reference.get_complete())
+ if isinstance(msg2.body, ReferenceId):
+ if msg1.body != msg2.body:
+ self.assertEquals("second message", msg2.reference.get_complete())
+ #else ok, as same ref as msg1
+ else:
+ self.assertEquals("second message", msg1.body)
+ if isinstance(msg2.body, ReferenceId):
+ self.assertEquals("second message", msg2.reference.get_complete())
+ else:
+ self.assertEquals("second message", msg2.body)
+
+ self.assertEmpty(queue1)
+
+ def test_reference_unopened_on_append_error(self):
+ channel = self.channel
+ try:
+ channel.message_append(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_close_error(self):
+ channel = self.channel
+ try:
+ channel.message_close(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_transfer_error(self):
+ channel = self.channel
+ try:
+ channel.message_transfer(body=ReferenceId("unopened"))
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_already_opened_error(self):
+ channel = self.channel
+ channel.message_open(reference="a")
+ try:
+ channel.message_open(reference="a")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_empty_reference(self):
+ channel = self.channel
+ channel.queue_declare(queue="ref_queue", exclusive=True)
+ channel.message_consume(queue="ref_queue", destination="c1")
+ queue = self.client.queue("c1")
+
+ refId = "myref"
+ channel.message_open(reference=refId)
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
+ channel.synchronous = True
+ channel.message_close(reference=refId)
+
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
msg = queue.get(timeout=1)
- if isinstance(msg, Reference):
- #should we force broker to deliver as reference by frame
- #size limit? or test that separately? for compliance,
- #allowing either seems best for now...
- data = msg.get_complete()
+ self.assertEquals(msg.message_id, "empty-msg")
+ self.assertDataEquals(channel, msg, "")
+
+ def test_reject(self):
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+
+ channel.message_consume(queue = "q", destination = "consumer")
+ channel.message_transfer(routing_key = "q", body="blah, blah")
+ msg = self.client.queue("consumer").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+ channel.message_cancel(destination = "consumer")
+ msg.reject()
+
+ channel.message_consume(queue = "q", destination = "checker")
+ msg = self.client.queue("checker").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+
+ def test_checkpoint(self):
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+
+ channel.message_open(reference="my-ref")
+ channel.message_append(reference="my-ref", bytes="abcdefgh")
+ channel.message_append(reference="my-ref", bytes="ijklmnop")
+ channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint")
+ channel.channel_close()
+
+ channel = self.client.channel(2)
+ channel.channel_open()
+ channel.message_consume(queue = "q", destination = "consumer")
+ offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value
+ self.assertEquals(offset, 16)
+ channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
+ channel.synchronous = False
+ channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
+ self.assertEmpty(self.client.queue("consumer"))
+
+
+ def assertDataEquals(self, channel, msg, expected):
+ if isinstance(msg.body, ReferenceId):
+ data = msg.reference.get_complete()
else:
data = msg.body
self.assertEquals("abcdefghijkl", data)
diff --git a/qpid/python/tests/tx.py b/qpid/python/tests/tx.py
index 55a5eaeade..0f6b4f5cd1 100644
--- a/qpid/python/tests/tx.py
+++ b/qpid/python/tests/tx.py
@@ -163,7 +163,8 @@ class TxTests(TestBase):
for i in range(1, 5):
msg = queue_a.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msg.ok()
+
+ msg.ok(batchoffset=-3)
channel.message_consume(queue=name_b, destination="sub_b", no_ack=False)
queue_b = self.client.queue("sub_b")