diff options
author | Gordon Sim <gsim@apache.org> | 2008-03-07 19:07:32 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-03-07 19:07:32 +0000 |
commit | 039d4461a0c1bb44731bbee6df58c0ee0c1673cf (patch) | |
tree | 242e9dcd96b51dbe14ea8d474347a00b7c8fe6b5 | |
parent | 28bb2c5583a4747594c869af46593f223344a043 (diff) | |
download | qpid-python-039d4461a0c1bb44731bbee6df58c0ee0c1673cf.tar.gz |
Added acquire impl to final 0-10 codepath
Converted some more python tests
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634780 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 2 | ||||
-rw-r--r-- | cpp/xml/extra.xml | 11 | ||||
-rw-r--r-- | python/cpp_failing_0-10.txt | 4 | ||||
-rw-r--r-- | python/qpid/testlib.py | 2 | ||||
-rwxr-xr-x | python/run-tests | 2 | ||||
-rw-r--r-- | python/tests_0-10/message.py | 51 | ||||
-rw-r--r-- | python/tests_0-10/queue.py | 24 |
8 files changed, 75 insertions, 37 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index c4ee6d9ec1..663565c26c 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -367,16 +367,20 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm commands.for_each(acceptOp); } -/* -void SessionAdapter::MessageHandlerImpl::acquire(const SequenceSet& transfers) +framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers) { + //TODO: change this when SequenceNumberSet is deleted along with preview code SequenceNumberSet results; - RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results)); - transfers.processRanges(op); + RangedOperation f = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results)); + transfers.for_each(f); + results = results.condense(); - getProxy().getMessage().acquired(results); + SequenceSet acquisitions; + RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2); + results.processRanges(g); + + return Message010AcquireResult(acquisitions); } -*/ void SessionAdapter::ExecutionHandlerImpl::sync() diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index c2d61392d7..23cc1beb93 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -149,6 +149,8 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations void release(const framing::SequenceSet& commands, bool setRedelivered); + framing::Message010AcquireResult acquire(const framing::SequenceSet&); + void subscribe(const string& queue, const string& destination, uint8_t acceptMode, diff --git a/cpp/xml/extra.xml b/cpp/xml/extra.xml index f8bd7688b4..23df91e492 100644 --- a/cpp/xml/extra.xml +++ b/cpp/xml/extra.xml @@ -670,7 +670,16 @@ <field name="commands" domain="sequence-set"/> <field name="set-redelivered" domain="bit"/> </method> - + <method name = "acquire" index="5"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="transfers" domain="sequence-set"/> + <result> + <struct size="long" type="4"> + <field name="transfers" domain="sequence-set"/> + </struct> + </result> + </method> <method name = "subscribe" index="7"> <doc>blah, blah</doc> diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt index 3d00313d2d..3b2c560d8a 100644 --- a/python/cpp_failing_0-10.txt +++ b/python/cpp_failing_0-10.txt @@ -39,8 +39,6 @@ tests_0-10.dtx.DtxTests.test_start_join tests_0-10.dtx.DtxTests.test_start_join_and_resume tests_0-10.dtx.DtxTests.test_suspend_resume tests_0-10.dtx.DtxTests.test_suspend_start_end_resume -tests_0-10.message.MessageTests.test_acquire -tests_0-10.message.MessageTests.test_subscribe_not_acquired_3 tests_0-10.message.MessageTests.test_consume_exclusive tests_0-10.message.MessageTests.test_consume_no_local tests_0-10.message.MessageTests.test_consume_no_local_awkward @@ -60,6 +58,6 @@ tests_0-10.queue.QueueTests.test_declare_passive tests_0-10.queue.QueueTests.test_delete_ifempty tests_0-10.queue.QueueTests.test_delete_ifunused tests_0-10.queue.QueueTests.test_delete_simple -tests_0-10.queue.QueueTests.test_purge tests_0-10.queue.QueueTests.test_bind tests_0-10.queue.QueueTests.test_unbind_headers +tests_0-10.queue.QueueTests.test_purge_empty_name
\ No newline at end of file diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index e8e54b3a56..7e5a2a6b66 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -353,5 +353,5 @@ class TestBase010(unittest.TestCase): self.session = self.conn.session("test-session", timeout=10) def tearDown(self): - self.session.close(timeout=10) + if not self.session.error(): self.session.close(timeout=10) self.conn.close(timeout=10) diff --git a/python/run-tests b/python/run-tests index 7efe5523df..07162efd15 100755 --- a/python/run-tests +++ b/python/run-tests @@ -21,7 +21,7 @@ import sys, logging from qpid.testlib import testrunner -if "-v" in sys.argv: +if "-vv" in sys.argv: level = logging.DEBUG else: level = logging.WARN diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index aaefb52392..dd80e79d36 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -635,9 +635,7 @@ class MessageTests(TestBase010): session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #use fanout for now: - session.exchange_bind(exchange="amq.fanout", queue="q") - session.message_transfer(destination="amq.fanout", message=Message("acquire me")) - #session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me")) session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) session.message_flow(destination="a", unit=0, value=0xFFFFFFFF) @@ -647,12 +645,13 @@ class MessageTests(TestBase010): #message should still be on the queue: self.assertEquals(1, session.queue_query(queue = "q").message_count) - response = session.message_acquire(RangedSet(msg.id)) + transfers = RangedSet(msg.id) + response = session.message_acquire(transfers) #check that we get notification (i.e. message_acquired) - self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) + self.assert_(msg.id in response.transfers) #message should have been removed from the queue: self.assertEquals(0, session.queue_query(queue = "q").message_count) - session.message_accept(RangedSet(msg.id)) + session.message_accept(transfers) def test_release(self): @@ -800,12 +799,12 @@ class MessageTests(TestBase010): session = self.session #publish some messages - self.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) #create a not-acquired subscriber - session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) + session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") session.message_flow(unit = 0, value = 10, destination = "a") @@ -816,19 +815,18 @@ class MessageTests(TestBase010): self.assertEquals("message-%d" % (i), msg.body) if (i % 2): #try to acquire every second message - session.message_acquire([msg.command_id, msg.command_id]) + response = session.message_acquire(RangedSet(msg.id)) #check that acquire succeeds - response = session.control_queue.get(timeout=1) - self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) - session.message_release(RangedSet(msg.id)) - session.channel._completed.add(msg.id) - session.channel.session_completed(session.channel._completed) - - msg.complete() + self.assert_(msg.id in response.transfers) + session.message_accept(RangedSet(msg.id)) + else: + session.message_release(RangedSet(msg.id)) + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) self.assertEmpty(queue) #create a second not-acquired subscriber - session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") session.message_flow(unit = 0, value = 1, destination = "b") #check it gets those not consumed @@ -836,7 +834,9 @@ class MessageTests(TestBase010): for i in [2,4,6,8,10]: msg = queue.get(timeout = 1) self.assertEquals("message-%d" % (i), msg.body) - msg.complete() + session.message_release(RangedSet(msg.id)) + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) session.message_flow(unit = 0, value = 1, destination = "b") self.assertEmpty(queue) @@ -899,6 +899,21 @@ class MessageTests(TestBase010): msg = queue.get(timeout = 3) self.assertEquals("message-body", msg.body) + def test_empty_body(self): + session = self.session + session.queue_declare(queue="xyz", exclusive=True, auto_delete=True) + props = session.delivery_properties(routing_key="xyz") + session.message_transfer(message=Message(props, "")) + + consumer_tag = "tag1" + session.message_subscribe(queue="xyz", destination=consumer_tag) + session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag) + queue = session.incoming(consumer_tag) + msg = queue.get(timeout=1) + self.assertEquals("", msg.body) + session.message_accept(RangedSet(msg.id)) + def assertDataEquals(self, session, msg, expected): self.assertEquals(expected, msg.body) diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py index 38d7a3291b..97e17048d3 100644 --- a/python/tests_0-10/queue.py +++ b/python/tests_0-10/queue.py @@ -20,6 +20,7 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.testlib import TestBase010 from qpid.datatypes import Message +from qpid.session import SessionException class QueueTests(TestBase010): """Tests for 'methods' on the amqp queue 'class'""" @@ -54,22 +55,31 @@ class QueueTests(TestBase010): msg = queue.get(timeout=1) self.assertEqual("four", msg.body) - #check error conditions (use new sessions): - session = self.conn.session("error-checker") + def test_purge_queue_exists(self): + """ + Test that the correct exception is thrown is no queue exists + for the name specified in purge + """ + session = self.session try: #queue specified but doesn't exist: session.queue_purge(queue="invalid-queue") self.fail("Expected failure when purging non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) #not-found - session = self.conn.session("error-checker") + def test_purge_empty_name(self): + """ + Test that the correct exception is thrown is no queue name + is specified for purge + """ + session = self.session try: #queue not specified and none previously declared for channel: session.queue_purge() self.fail("Expected failure when purging unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) + except SessionException, e: + self.assertEquals(531, e.args[0].error_code) #illegal-argument def test_declare_exclusive(self): """ |