diff options
Diffstat (limited to 'cpp/src/tests/BrokerChannelTest.cpp')
-rw-r--r-- | cpp/src/tests/BrokerChannelTest.cpp | 124 |
1 files changed, 47 insertions, 77 deletions
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 929105f6e3..251ac624ab 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -48,13 +48,38 @@ struct MockHandler : ConnectionOutputHandler{ void close() {}; }; +struct DeliveryRecorder +{ + typedef std::pair<Message::shared_ptr, RequestId> Delivery; + std::vector<Delivery> delivered; + + struct Adapter : DeliveryAdapter + { + RequestId id; + DeliveryRecorder& recorder; + + Adapter(DeliveryRecorder& r) : recorder(r) {} + + RequestId getNextDeliveryTag() { return id + 1; } + void deliver(Message::shared_ptr& msg, RequestId tag) + { + recorder.delivered.push_back(Delivery(msg, tag)); + id++; + } + + }; + + std::auto_ptr<DeliveryAdapter> createAdapter() + { + return std::auto_ptr<DeliveryAdapter>(new Adapter(*this)); + } +}; class BrokerChannelTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(BrokerChannelTest); - CPPUNIT_TEST(testConsumerMgmt); + CPPUNIT_TEST(testConsumerMgmt);; CPPUNIT_TEST(testDeliveryNoAck); - CPPUNIT_TEST(testDeliveryAndRecovery); CPPUNIT_TEST(testStaging); CPPUNIT_TEST(testQueuePolicy); CPPUNIT_TEST(testFlow); @@ -160,11 +185,12 @@ class BrokerChannelTest : public CppUnit::TestCase ConnectionToken* owner = 0; string tag("my_consumer"); - channel.consume(tag, queue, false, false, owner); + std::auto_ptr<DeliveryAdapter> unused; + channel.consume(unused, tag, queue, false, false, owner); string tagA; string tagB; - channel.consume(tagA, queue, false, false, owner); - channel.consume(tagB, queue, false, false, owner); + channel.consume(unused, tagA, queue, false, false, owner); + channel.consume(unused, tagB, queue, false, false, owner); CPPUNIT_ASSERT_EQUAL((uint32_t) 3, queue->getConsumerCount()); CPPUNIT_ASSERT(channel.exists("my_consumer")); CPPUNIT_ASSERT(channel.exists(tagA)); @@ -178,65 +204,17 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL((uint32_t) 0, queue->getConsumerCount()); } - void testDeliveryNoAck(){ + void testDeliveryNoAck(){ Channel channel(connection, 7); - channel.open(); - const string data("abcdefghijklmn"); - Message::shared_ptr msg( - createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); - - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel()); - CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( - handler.frames[0].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( - handler.frames[1].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( - handler.frames[2].getBody().get())); - AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( - handler.frames[3].getBody().get()); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); - } - - void testDeliveryAndRecovery(){ - Channel channel(connection, 7); - channel.open(); - const string data("abcdefghijklmn"); - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("ack"); - channel.consume(tag, queue, true, false, owner); - + DeliveryRecorder recorder; + string tag("test"); + channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel()); - CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( - handler.frames[0].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( - handler.frames[1].getBody().get())); - CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( - handler.frames[2].getBody().get())); - AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( - handler.frames[3].getBody().get()); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + + CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); + CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); } void testStaging(){ @@ -349,26 +327,18 @@ class BrokerChannelTest : public CppUnit::TestCase Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); + DeliveryRecorder recorder; + string tag("test"); + channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); channel.flow(false); queue->deliver(msg); - //ensure no more frames have been delivered - CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount()); + //ensure no messages have been delivered + CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size()); + channel.flow(true); - CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1].getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2].getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[3].getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[1].getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[2].getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3].getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + //ensure no messages have been delivered + CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); + CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); } Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) |