diff options
-rw-r--r-- | qpid/python/cpp_failing.txt | 4 | ||||
-rw-r--r-- | qpid/python/java_failing.txt | 12 | ||||
-rw-r--r-- | qpid/python/qpid/client.py | 11 | ||||
-rw-r--r-- | qpid/python/qpid/connection.py | 3 | ||||
-rw-r--r-- | qpid/python/qpid/peer.py | 11 | ||||
-rw-r--r-- | qpid/python/qpid/reference.py | 2 | ||||
-rw-r--r-- | qpid/python/qpid/spec.py | 5 | ||||
-rw-r--r-- | qpid/python/qpid/testlib.py | 53 | ||||
-rw-r--r-- | qpid/python/tests/message.py | 214 | ||||
-rw-r--r-- | qpid/python/tests/tx.py | 3 |
10 files changed, 243 insertions, 75 deletions
diff --git a/qpid/python/cpp_failing.txt b/qpid/python/cpp_failing.txt index e69de29bb2..69c7cf3f5a 100644 --- a/qpid/python/cpp_failing.txt +++ b/qpid/python/cpp_failing.txt @@ -0,0 +1,4 @@ +tests.message.MessageTests.test_checkpoint +tests.message.MessageTests.test_reference_large +tests.message.MessageTests.test_reject +tests.basic.BasicTests.test_get diff --git a/qpid/python/java_failing.txt b/qpid/python/java_failing.txt index b671f86064..7252d0f496 100644 --- a/qpid/python/java_failing.txt +++ b/qpid/python/java_failing.txt @@ -1,4 +1,4 @@ -tests.basic.BasicTests.test_qos_prefetch_count +ntests.basic.BasicTests.test_qos_prefetch_count tests.basic.BasicTests.test_ack tests.basic.BasicTests.test_cancel tests.basic.BasicTests.test_consume_exclusive @@ -8,11 +8,11 @@ tests.basic.BasicTests.test_consume_unique_consumers tests.basic.BasicTests.test_get tests.basic.BasicTests.test_qos_prefetch_size tests.basic.BasicTests.test_recover_requeue + tests.exchange.RecommendedTypesRuleTests.testTopic tests.exchange.RequiredInstancesRuleTests.testAmqTopic -tests.message.MessageTests.test_qos_prefetch_count -tests.message.MessageTests.test_ack -tests.message.MessageTests.test_get -tests.message.MessageTests.test_qos_prefetch_size -tests.message.MessageTests.test_recover_requeue +tests.message.MessageTests.test_checkpoint +tests.message.MessageTests.test_reject + +tests.broker.BrokerTests.test_ping_pong diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py index ea6aa7901a..e548ef0e99 100644 --- a/qpid/python/qpid/client.py +++ b/qpid/python/qpid/client.py @@ -76,7 +76,8 @@ class Client: self.locale = locale self.tune_params = tune_params - self.conn = Connection(connect(self.host, self.port), self.spec) + self.socket = connect(self.host, self.port) + self.conn = Connection(self.socket, self.spec) self.peer = Peer(self.conn, ClientDelegate(self), self.opened) self.conn.init() @@ -90,6 +91,9 @@ class Client: def opened(self, ch): ch.references = References() + def close(self): + self.socket.close() + class ClientDelegate(Delegate): def __init__(self, client): @@ -112,9 +116,8 @@ class ClientDelegate(Delegate): def message_transfer(self, ch, msg): if isinstance(msg.body, ReferenceId): - self.client.queue(msg.destination).put(ch.references.get(msg.body.id)) - else: - self.client.queue(msg.destination).put(msg) + msg.reference = ch.references.get(msg.body.id) + self.client.queue(msg.destination).put(msg) def message_open(self, ch, msg): ch.references.open(msg.reference) diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py index 0785fe8774..cdfa2c2dc0 100644 --- a/qpid/python/qpid/connection.py +++ b/qpid/python/qpid/connection.py @@ -53,6 +53,9 @@ class SockIO: def flush(self): pass + def close(self): + self.sock.shutdown(socket.SHUT_RDWR) + def connect(host, port): sock = socket.socket() sock.connect((host, port)) diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py index 6c8c6647c9..28db20f187 100644 --- a/qpid/python/qpid/peer.py +++ b/qpid/python/qpid/peer.py @@ -97,9 +97,12 @@ class Peer: self.fatal() def close(self, reason): + # We must close the delegate first because closing channels + # may wake up waiting threads and we don't want them to see + # the delegate as open. + self.delegate.close(reason) for ch in self.channels.values(): ch.close(reason) - self.delegate.close(reason) def writer(self): try: @@ -144,7 +147,7 @@ class Requester: self.write(frame, content) def receive(self, channel, frame): - listener = self.outstanding.pop(frame.id) + listener = self.outstanding.pop(frame.request_id) listener(channel, frame) class Responder: @@ -178,8 +181,8 @@ class Channel: self.requester = Requester(self.write) self.responder = Responder(self.write) - # XXX: better switch - self.reliable = False + # Use reliable framing if version == 0-9. + self.reliable = (spec.major == 0 and spec.minor == 9) self.synchronous = True def close(self, reason): diff --git a/qpid/python/qpid/reference.py b/qpid/python/qpid/reference.py index d357560390..48ecb67656 100644 --- a/qpid/python/qpid/reference.py +++ b/qpid/python/qpid/reference.py @@ -111,7 +111,7 @@ class References: self.get(id).close() self.lock.acquire() try: - del map[id] + self.map.pop(id) finally: self.lock.release() diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py index e430c45b96..4f0661bcbc 100644 --- a/qpid/python/qpid/spec.py +++ b/qpid/python/qpid/spec.py @@ -79,12 +79,11 @@ class Spec(Metadata): PRINT=["major", "minor", "file"] - def __init__(self, major, minor, file, errata): + def __init__(self, major, minor, file): Metadata.__init__(self) self.major = major self.minor = minor self.file = file - self.errata = errata self.constants = SpecContainer() self.classes = SpecContainer() # methods indexed by classname_methname @@ -275,7 +274,7 @@ def load_fields(nd, l, domains): def load(specfile, *errata): doc = xmlutil.parse(specfile) spec_root = doc["amqp"][0] - spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile, errata) + spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile) for root in [spec_root] + map(lambda x: xmlutil.parse(x)["amqp"][0], errata): # constants diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index dcbf0ed91c..05641bce7e 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -26,6 +26,7 @@ import qpid.client, qpid.spec import Queue from getopt import getopt, GetoptError from qpid.content import Content +from qpid.message import Message def findmodules(root): """Find potential python modules under directory root""" @@ -56,15 +57,16 @@ class TestRunner: run-tests [options] [test*] The name of a test is package.module.ClassName.testMethod Options: - -?/-h/--help : this message + -?/-h/--help : this message -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations: 0-8 - use the default 0-8 specification. 0-9 - use the default 0-9 specification. + -e/--errata <errata.xml> : file containing amqp XML errata -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to - -v/--verbose : verbose - lists tests as they are run. - -d/--debug : enable debug logging. - -i/--ignore <test> : ignore the named test. - -I/--ignore-file : file containing patterns to ignore. + -v/--verbose : verbose - lists tests as they are run. + -d/--debug : enable debug logging. + -i/--ignore <test> : ignore the named test. + -I/--ignore-file : file containing patterns to ignore. """ sys.exit(1) @@ -103,24 +105,27 @@ Options: for opt, value in opts: if opt in ("-?", "-h", "--help"): self._die() if opt in ("-s", "--spec"): self.specfile = value + if opt in ("-e", "--errata"): self.errata.append(value) if opt in ("-b", "--broker"): self.setBroker(value) if opt in ("-v", "--verbose"): self.verbose = 2 if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) if opt in ("-i", "--ignore"): self.ignore.append(value) if opt in ("-I", "--ignore-file"): self.ignoreFile(value) + # Abbreviations for default settings. if (self.specfile == "0-8"): - self.specfile = "../specs/amqp.0-8.xml" + self.specfile = "../specs/amqp.0-8.xml" if (self.specfile == "0-9"): - self.specfile = "../specs/amqp.0-9.xml" - self.errata = ["../specs/amqp-errata.0-9.xml"] + self.specfile = "../specs/amqp.0-9.xml" + self.errata.append("../specs/amqp-errata.0-9.xml") + if (self.specfile == None): + self._die("No XML specification provided") print "Using specification from:", self.specfile self.spec = qpid.spec.load(self.specfile, *self.errata) if len(self.tests) == 0: if self.use08spec(): - testdir="tests_0-8" + self.tests=findmodules("tests_0-8") else: - testdir="tests" - self.tests=findmodules(testdir) + self.tests=findmodules("tests") def testSuite(self): class IgnoringTestSuite(unittest.TestSuite): @@ -137,7 +142,11 @@ Options: self._parseargs(args) runner = unittest.TextTestRunner(descriptions=False, verbosity=self.verbose) - result = runner.run(self.testSuite()) + try: + result = runner.run(self.testSuite()) + except: + print "Unhandled error in test:", sys.exc_info() + if (self.ignore): print "=======================================" print "NOTE: the following tests were ignored:" @@ -181,10 +190,18 @@ class TestBase(unittest.TestCase): self.channel.channel_open() def tearDown(self): - for ch, q in self.queues: - ch.queue_delete(queue=q) - for ch, ex in self.exchanges: - ch.exchange_delete(exchange=ex) + try: + for ch, q in self.queues: + ch.queue_delete(queue=q) + for ch, ex in self.exchanges: + ch.exchange_delete(exchange=ex) + except: + print "Error on tearDown:", sys.exc_info() + + if not self.client.closed: + self.client.channel(0).connection_close(reply_code=200) + else: + self.client.close() def connect(self, *args, **keys): """Create a new connction, return the Client object""" @@ -261,13 +278,15 @@ class TestBase(unittest.TestCase): """ self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) - def assertChannelException(self, expectedCode, message): + def assertChannelException(self, expectedCode, message): + if not isinstance(message, Message): self.fail("expected channel_close method") self.assertEqual("channel", message.method.klass.name) self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) def assertConnectionException(self, expectedCode, message): + if not isinstance(message, Message): self.fail("expected connection_close method") self.assertEqual("connection", message.method.klass.name) self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) diff --git a/qpid/python/tests/message.py b/qpid/python/tests/message.py index 916a9825bd..d5f5d4dbc2 100644 --- a/qpid/python/tests/message.py +++ b/qpid/python/tests/message.py @@ -171,8 +171,7 @@ class MessageTests(TestBase): self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) - msg1.ok() - msg2.ok() + msg1.ok(batchoffset=1)#One and Two msg4.ok() channel.message_recover(requeue=False) @@ -216,9 +215,8 @@ class MessageTests(TestBase): self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) - msg1.ok() #One - msg2.ok() #Two - msg4.ok() #Two + msg1.ok(batchoffset=1) #One and Two + msg4.ok() #Four channel.message_cancel(destination="consumer_tag") channel.message_consume(queue="test-requeue", destination="consumer_tag") @@ -263,11 +261,9 @@ class MessageTests(TestBase): channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i) #only 5 messages should have been delivered: - msgs = [] for i in range(1, 6): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msgs.append(msg) try: extra = queue.get(timeout=1) self.fail("Got unexpected 6th message in original queue: " + extra.body) @@ -275,18 +271,13 @@ class MessageTests(TestBase): #ack messages and check that the next set arrive ok: #todo: once batching is implmented, send a single response for all messages - for msg in msgs: - msg.ok() - del msgs + msg.ok(batchoffset=-4)#1-5 for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msgs.append(msg) - for msg in msgs: - msg.ok() - del msgs + msg.ok(batchoffset=-4)#6-10 try: extra = queue.get(timeout=1) @@ -313,12 +304,9 @@ class MessageTests(TestBase): channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i) #only 5 messages should have been delivered (i.e. 45 bytes worth): - msgs = [] for i in range(1, 6): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - print "Got Message %d" % i - msgs.append(msg) try: extra = queue.get(timeout=1) @@ -326,18 +314,13 @@ class MessageTests(TestBase): except Empty: None #ack messages and check that the next set arrive ok: - for msg in msgs: - msg.ok() - del msgs + msg.ok(batchoffset=-4)#1-5 for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msgs.append(msg) - for msg in msgs: - msg.ok() - del msgs + msg.ok(batchoffset=-4)#6-10 try: extra = queue.get(timeout=1) @@ -383,14 +366,13 @@ class MessageTests(TestBase): reply = channel.message_get(no_ack=False, queue="test-get", destination=tag) self.assertEqual(reply.method.klass.name, "message") self.assertEqual(reply.method.name, "ok") - self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body) - reply.ok() - - #todo: when batching is available, test ack multiple - #if(i == 13): - # channel.message_ack(delivery_tag=reply.delivery_tag, multiple=True) - #if(i in [15, 17, 19]): - # channel.message_ack(delivery_tag=reply.delivery_tag) + msg = self.client.queue(tag).get(timeout=1) + self.assertEqual("Message %d" % i, msg.body) + + if (i==13): + msg.ok(batchoffset=-2)#11, 12 & 13 + if(i in [15, 17, 19]): + msg.ok() reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") @@ -479,8 +461,9 @@ class MessageTests(TestBase): queue = other.queue("c1") msg = queue.get(timeout=1) - self.assertTrue(isinstance(msg, Reference)) - self.assertEquals(data, msg.get_complete()) + self.assertTrue(isinstance(msg.body, ReferenceId)) + self.assertTrue(msg.reference) + self.assertEquals(data, msg.reference.get_complete()) def test_reference_completion(self): """ @@ -511,12 +494,165 @@ class MessageTests(TestBase): #first, wait for the ok for the transfer ack.get_response(timeout=1) + self.assertDataEquals(channel, queue.get(timeout=1), "abcd") + + def test_reference_multi_transfer(self): + """ + Test that multiple transfer requests for the same reference are + correctly handled. + """ + channel = self.channel + #declare and consume from two queues + channel.queue_declare(queue="q-one", exclusive=True) + channel.queue_declare(queue="q-two", exclusive=True) + channel.message_consume(queue="q-one", destination="q-one") + channel.message_consume(queue="q-two", destination="q-two") + queue1 = self.client.queue("q-one") + queue2 = self.client.queue("q-two") + + #transfer a single ref to both queues (in separate commands) + channel.message_open(reference="my-ref") + channel.synchronous = False + ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref")) + channel.message_append(reference="my-ref", bytes="my data") + ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref")) + channel.synchronous = True + channel.message_close(reference="my-ref") + + #check that both queues have the message + self.assertDataEquals(channel, queue1.get(timeout=1), "my data") + self.assertDataEquals(channel, queue2.get(timeout=1), "my data") + self.assertEmpty(queue1) + self.assertEmpty(queue2) + + #transfer a single ref to the same queue twice (in separate commands) + channel.message_open(reference="my-ref") + channel.synchronous = False + ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref")) + channel.message_append(reference="my-ref", bytes="second message") + ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref")) + channel.synchronous = True + channel.message_close(reference="my-ref") + + msg1 = queue1.get(timeout=1) + msg2 = queue1.get(timeout=1) + #order is undefined + if msg1.message_id == "abc": + self.assertEquals(msg2.message_id, "xyz") + else: + self.assertEquals(msg1.message_id, "xyz") + self.assertEquals(msg2.message_id, "abc") + + #would be legal for the incoming messages to be transfered + #inline or by reference in any combination + + if isinstance(msg1.body, ReferenceId): + self.assertEquals("second message", msg1.reference.get_complete()) + if isinstance(msg2.body, ReferenceId): + if msg1.body != msg2.body: + self.assertEquals("second message", msg2.reference.get_complete()) + #else ok, as same ref as msg1 + else: + self.assertEquals("second message", msg1.body) + if isinstance(msg2.body, ReferenceId): + self.assertEquals("second message", msg2.reference.get_complete()) + else: + self.assertEquals("second message", msg2.body) + + self.assertEmpty(queue1) + + def test_reference_unopened_on_append_error(self): + channel = self.channel + try: + channel.message_append(reference="unopened") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_reference_unopened_on_close_error(self): + channel = self.channel + try: + channel.message_close(reference="unopened") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_reference_unopened_on_transfer_error(self): + channel = self.channel + try: + channel.message_transfer(body=ReferenceId("unopened")) + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_reference_already_opened_error(self): + channel = self.channel + channel.message_open(reference="a") + try: + channel.message_open(reference="a") + except Closed, e: + self.assertConnectionException(503, e.args[0]) + + def test_empty_reference(self): + channel = self.channel + channel.queue_declare(queue="ref_queue", exclusive=True) + channel.message_consume(queue="ref_queue", destination="c1") + queue = self.client.queue("c1") + + refId = "myref" + channel.message_open(reference=refId) + channel.synchronous = False + ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId)) + channel.synchronous = True + channel.message_close(reference=refId) + + #first, wait for the ok for the transfer + ack.get_response(timeout=1) + msg = queue.get(timeout=1) - if isinstance(msg, Reference): - #should we force broker to deliver as reference by frame - #size limit? or test that separately? for compliance, - #allowing either seems best for now... - data = msg.get_complete() + self.assertEquals(msg.message_id, "empty-msg") + self.assertDataEquals(channel, msg, "") + + def test_reject(self): + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + + channel.message_consume(queue = "q", destination = "consumer") + channel.message_transfer(routing_key = "q", body="blah, blah") + msg = self.client.queue("consumer").get(timeout = 1) + self.assertEquals(msg.body, "blah, blah") + channel.message_cancel(destination = "consumer") + msg.reject() + + channel.message_consume(queue = "q", destination = "checker") + msg = self.client.queue("checker").get(timeout = 1) + self.assertEquals(msg.body, "blah, blah") + + def test_checkpoint(self): + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + + channel.message_open(reference="my-ref") + channel.message_append(reference="my-ref", bytes="abcdefgh") + channel.message_append(reference="my-ref", bytes="ijklmnop") + channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint") + channel.channel_close() + + channel = self.client.channel(2) + channel.channel_open() + channel.message_consume(queue = "q", destination = "consumer") + offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value + self.assertEquals(offset, 16) + channel.message_append(reference="my-ref", bytes="qrstuvwxyz") + channel.synchronous = False + channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref")) + channel.synchronous = True + channel.message_close(reference="my-ref") + + self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz") + self.assertEmpty(self.client.queue("consumer")) + + + def assertDataEquals(self, channel, msg, expected): + if isinstance(msg.body, ReferenceId): + data = msg.reference.get_complete() else: data = msg.body self.assertEquals("abcdefghijkl", data) diff --git a/qpid/python/tests/tx.py b/qpid/python/tests/tx.py index 55a5eaeade..0f6b4f5cd1 100644 --- a/qpid/python/tests/tx.py +++ b/qpid/python/tests/tx.py @@ -163,7 +163,8 @@ class TxTests(TestBase): for i in range(1, 5): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok() + + msg.ok(batchoffset=-3) channel.message_consume(queue=name_b, destination="sub_b", no_ack=False) queue_b = self.client.queue("sub_b") |