summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-07 19:07:32 +0000
committerGordon Sim <gsim@apache.org>2008-03-07 19:07:32 +0000
commit039d4461a0c1bb44731bbee6df58c0ee0c1673cf (patch)
tree242e9dcd96b51dbe14ea8d474347a00b7c8fe6b5
parent28bb2c5583a4747594c869af46593f223344a043 (diff)
downloadqpid-python-039d4461a0c1bb44731bbee6df58c0ee0c1673cf.tar.gz
Added acquire impl to final 0-10 codepath
Converted some more python tests git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634780 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp16
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h2
-rw-r--r--cpp/xml/extra.xml11
-rw-r--r--python/cpp_failing_0-10.txt4
-rw-r--r--python/qpid/testlib.py2
-rwxr-xr-xpython/run-tests2
-rw-r--r--python/tests_0-10/message.py51
-rw-r--r--python/tests_0-10/queue.py24
8 files changed, 75 insertions, 37 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index c4ee6d9ec1..663565c26c 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -367,16 +367,20 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm
commands.for_each(acceptOp);
}
-/*
-void SessionAdapter::MessageHandlerImpl::acquire(const SequenceSet& transfers)
+framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
{
+ //TODO: change this when SequenceNumberSet is deleted along with preview code
SequenceNumberSet results;
- RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results));
- transfers.processRanges(op);
+ RangedOperation f = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results));
+ transfers.for_each(f);
+
results = results.condense();
- getProxy().getMessage().acquired(results);
+ SequenceSet acquisitions;
+ RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2);
+ results.processRanges(g);
+
+ return Message010AcquireResult(acquisitions);
}
-*/
void SessionAdapter::ExecutionHandlerImpl::sync()
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index c2d61392d7..23cc1beb93 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -149,6 +149,8 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
void release(const framing::SequenceSet& commands,
bool setRedelivered);
+ framing::Message010AcquireResult acquire(const framing::SequenceSet&);
+
void subscribe(const string& queue,
const string& destination,
uint8_t acceptMode,
diff --git a/cpp/xml/extra.xml b/cpp/xml/extra.xml
index f8bd7688b4..23df91e492 100644
--- a/cpp/xml/extra.xml
+++ b/cpp/xml/extra.xml
@@ -670,7 +670,16 @@
<field name="commands" domain="sequence-set"/>
<field name="set-redelivered" domain="bit"/>
</method>
-
+ <method name = "acquire" index="5">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="transfers" domain="sequence-set"/>
+ <result>
+ <struct size="long" type="4">
+ <field name="transfers" domain="sequence-set"/>
+ </struct>
+ </result>
+ </method>
<method name = "subscribe" index="7">
<doc>blah, blah</doc>
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt
index 3d00313d2d..3b2c560d8a 100644
--- a/python/cpp_failing_0-10.txt
+++ b/python/cpp_failing_0-10.txt
@@ -39,8 +39,6 @@ tests_0-10.dtx.DtxTests.test_start_join
tests_0-10.dtx.DtxTests.test_start_join_and_resume
tests_0-10.dtx.DtxTests.test_suspend_resume
tests_0-10.dtx.DtxTests.test_suspend_start_end_resume
-tests_0-10.message.MessageTests.test_acquire
-tests_0-10.message.MessageTests.test_subscribe_not_acquired_3
tests_0-10.message.MessageTests.test_consume_exclusive
tests_0-10.message.MessageTests.test_consume_no_local
tests_0-10.message.MessageTests.test_consume_no_local_awkward
@@ -60,6 +58,6 @@ tests_0-10.queue.QueueTests.test_declare_passive
tests_0-10.queue.QueueTests.test_delete_ifempty
tests_0-10.queue.QueueTests.test_delete_ifunused
tests_0-10.queue.QueueTests.test_delete_simple
-tests_0-10.queue.QueueTests.test_purge
tests_0-10.queue.QueueTests.test_bind
tests_0-10.queue.QueueTests.test_unbind_headers
+tests_0-10.queue.QueueTests.test_purge_empty_name \ No newline at end of file
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index e8e54b3a56..7e5a2a6b66 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -353,5 +353,5 @@ class TestBase010(unittest.TestCase):
self.session = self.conn.session("test-session", timeout=10)
def tearDown(self):
- self.session.close(timeout=10)
+ if not self.session.error(): self.session.close(timeout=10)
self.conn.close(timeout=10)
diff --git a/python/run-tests b/python/run-tests
index 7efe5523df..07162efd15 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -21,7 +21,7 @@
import sys, logging
from qpid.testlib import testrunner
-if "-v" in sys.argv:
+if "-vv" in sys.argv:
level = logging.DEBUG
else:
level = logging.WARN
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index aaefb52392..dd80e79d36 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -635,9 +635,7 @@ class MessageTests(TestBase010):
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#use fanout for now:
- session.exchange_bind(exchange="amq.fanout", queue="q")
- session.message_transfer(destination="amq.fanout", message=Message("acquire me"))
- #session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
@@ -647,12 +645,13 @@ class MessageTests(TestBase010):
#message should still be on the queue:
self.assertEquals(1, session.queue_query(queue = "q").message_count)
- response = session.message_acquire(RangedSet(msg.id))
+ transfers = RangedSet(msg.id)
+ response = session.message_acquire(transfers)
#check that we get notification (i.e. message_acquired)
- self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+ self.assert_(msg.id in response.transfers)
#message should have been removed from the queue:
self.assertEquals(0, session.queue_query(queue = "q").message_count)
- session.message_accept(RangedSet(msg.id))
+ session.message_accept(transfers)
def test_release(self):
@@ -800,12 +799,12 @@ class MessageTests(TestBase010):
session = self.session
#publish some messages
- self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
#create a not-acquired subscriber
- session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
session.message_flow(unit = 0, value = 10, destination = "a")
@@ -816,19 +815,18 @@ class MessageTests(TestBase010):
self.assertEquals("message-%d" % (i), msg.body)
if (i % 2):
#try to acquire every second message
- session.message_acquire([msg.command_id, msg.command_id])
+ response = session.message_acquire(RangedSet(msg.id))
#check that acquire succeeds
- response = session.control_queue.get(timeout=1)
- self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
- session.message_release(RangedSet(msg.id))
- session.channel._completed.add(msg.id)
- session.channel.session_completed(session.channel._completed)
-
- msg.complete()
+ self.assert_(msg.id in response.transfers)
+ session.message_accept(RangedSet(msg.id))
+ else:
+ session.message_release(RangedSet(msg.id))
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
self.assertEmpty(queue)
#create a second not-acquired subscriber
- session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
session.message_flow(unit = 0, value = 1, destination = "b")
#check it gets those not consumed
@@ -836,7 +834,9 @@ class MessageTests(TestBase010):
for i in [2,4,6,8,10]:
msg = queue.get(timeout = 1)
self.assertEquals("message-%d" % (i), msg.body)
- msg.complete()
+ session.message_release(RangedSet(msg.id))
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
session.message_flow(unit = 0, value = 1, destination = "b")
self.assertEmpty(queue)
@@ -899,6 +899,21 @@ class MessageTests(TestBase010):
msg = queue.get(timeout = 3)
self.assertEquals("message-body", msg.body)
+ def test_empty_body(self):
+ session = self.session
+ session.queue_declare(queue="xyz", exclusive=True, auto_delete=True)
+ props = session.delivery_properties(routing_key="xyz")
+ session.message_transfer(message=Message(props, ""))
+
+ consumer_tag = "tag1"
+ session.message_subscribe(queue="xyz", destination=consumer_tag)
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = consumer_tag)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = consumer_tag)
+ queue = session.incoming(consumer_tag)
+ msg = queue.get(timeout=1)
+ self.assertEquals("", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
def assertDataEquals(self, session, msg, expected):
self.assertEquals(expected, msg.body)
diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py
index 38d7a3291b..97e17048d3 100644
--- a/python/tests_0-10/queue.py
+++ b/python/tests_0-10/queue.py
@@ -20,6 +20,7 @@ from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.testlib import TestBase010
from qpid.datatypes import Message
+from qpid.session import SessionException
class QueueTests(TestBase010):
"""Tests for 'methods' on the amqp queue 'class'"""
@@ -54,22 +55,31 @@ class QueueTests(TestBase010):
msg = queue.get(timeout=1)
self.assertEqual("four", msg.body)
- #check error conditions (use new sessions):
- session = self.conn.session("error-checker")
+ def test_purge_queue_exists(self):
+ """
+ Test that the correct exception is thrown is no queue exists
+ for the name specified in purge
+ """
+ session = self.session
try:
#queue specified but doesn't exist:
session.queue_purge(queue="invalid-queue")
self.fail("Expected failure when purging non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code) #not-found
- session = self.conn.session("error-checker")
+ def test_purge_empty_name(self):
+ """
+ Test that the correct exception is thrown is no queue name
+ is specified for purge
+ """
+ session = self.session
try:
#queue not specified and none previously declared for channel:
session.queue_purge()
self.fail("Expected failure when purging unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
+ except SessionException, e:
+ self.assertEquals(531, e.args[0].error_code) #illegal-argument
def test_declare_exclusive(self):
"""