From ce9743f8f1640d42af5fe7aaa8fe7e3ca82a914d Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 17 Jul 2007 08:28:48 +0000 Subject: Some refactoring towards a more decoupled handler chain structure: * Connection no longer depends on Channel; it contains a map of FrameHandler::Chains. (The construction of the chains still refers to specific handlers). * Channel is no longer tied to ChannelAdapter through inheritance. The former is independent of any particular handler chain or protocol version, the latter is still used by ConnectionAdapter and SemanticHandler in the 0-9 chain. * A DeliveryAdapter interface has been introduced as part of the separation of ChannelAdapter from Channel. This is intended to adapt from a version independent core to version specific mechanisms for sending messages. i.e. it fulfills the same role for outputs that e.g. BrokerAdapter does for inputs. (Its not perfect yet by any means but is a step on the way to the correct model I think). * The connection related methods sent over channel zero are implemented in their own adapter (ConnectionAdapter), and are entirely separate from the semantic layer. The channel control methods are still bundled with the proper semantic layer methods; they too can be separated but would have to share the request id with the semantic method handler due to the nature of the 0-9 WIP. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@556846 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/BrokerChannelTest.cpp | 124 ++++++++++++++---------------------- 1 file changed, 47 insertions(+), 77 deletions(-) (limited to 'cpp/src/tests/BrokerChannelTest.cpp') diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 929105f6e3..251ac624ab 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -48,13 +48,38 @@ struct MockHandler : ConnectionOutputHandler{ void close() {}; }; +struct DeliveryRecorder +{ + typedef std::pair Delivery; + std::vector delivered; + + struct Adapter : DeliveryAdapter + { + RequestId id; + DeliveryRecorder& recorder; + + Adapter(DeliveryRecorder& r) : recorder(r) {} + + RequestId getNextDeliveryTag() { return id + 1; } + void deliver(Message::shared_ptr& msg, RequestId tag) + { + recorder.delivered.push_back(Delivery(msg, tag)); + id++; + } + + }; + + std::auto_ptr createAdapter() + { + return std::auto_ptr(new Adapter(*this)); + } +}; class BrokerChannelTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(BrokerChannelTest); - CPPUNIT_TEST(testConsumerMgmt); + CPPUNIT_TEST(testConsumerMgmt);; CPPUNIT_TEST(testDeliveryNoAck); - CPPUNIT_TEST(testDeliveryAndRecovery); CPPUNIT_TEST(testStaging); CPPUNIT_TEST(testQueuePolicy); CPPUNIT_TEST(testFlow); @@ -160,11 +185,12 @@ class BrokerChannelTest : public CppUnit::TestCase ConnectionToken* owner = 0; string tag("my_consumer"); - channel.consume(tag, queue, false, false, owner); + std::auto_ptr unused; + channel.consume(unused, tag, queue, false, false, owner); string tagA; string tagB; - channel.consume(tagA, queue, false, false, owner); - channel.consume(tagB, queue, false, false, owner); + channel.consume(unused, tagA, queue, false, false, owner); + channel.consume(unused, tagB, queue, false, false, owner); CPPUNIT_ASSERT_EQUAL((uint32_t) 3, queue->getConsumerCount()); CPPUNIT_ASSERT(channel.exists("my_consumer")); CPPUNIT_ASSERT(channel.exists(tagA)); @@ -178,65 +204,17 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL((uint32_t) 0, queue->getConsumerCount()); } - void testDeliveryNoAck(){ + void testDeliveryNoAck(){ Channel channel(connection, 7); - channel.open(); - const string data("abcdefghijklmn"); - Message::shared_ptr msg( - createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); - - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel()); - CPPUNIT_ASSERT(dynamic_cast( - handler.frames[0].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast( - handler.frames[1].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast( - handler.frames[2].getBody().get())); - AMQContentBody* contentBody = dynamic_cast( - handler.frames[3].getBody().get()); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); - } - - void testDeliveryAndRecovery(){ - Channel channel(connection, 7); - channel.open(); - const string data("abcdefghijklmn"); - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("ack"); - channel.consume(tag, queue, true, false, owner); - + DeliveryRecorder recorder; + string tag("test"); + channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel()); - CPPUNIT_ASSERT(dynamic_cast( - handler.frames[0].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast( - handler.frames[1].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast( - handler.frames[2].getBody().get())); - AMQContentBody* contentBody = dynamic_cast( - handler.frames[3].getBody().get()); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + + CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); + CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); } void testStaging(){ @@ -349,26 +327,18 @@ class BrokerChannelTest : public CppUnit::TestCase Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); + DeliveryRecorder recorder; + string tag("test"); + channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); channel.flow(false); queue->deliver(msg); - //ensure no more frames have been delivered - CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount()); + //ensure no messages have been delivered + CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size()); + channel.flow(true); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[3].getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast(handler.frames[1].getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast(handler.frames[2].getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast(handler.frames[3].getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + //ensure no messages have been delivered + CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); + CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); } Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) -- cgit v1.2.1