From bcd011a10c0db4ffc6f78380c548d673e270e000 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Tue, 20 Nov 2007 13:59:54 +0000 Subject: backing up the previous tag git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/tags/M2@596673 13f79535-47bb-0310-9956-ffa450edef68 --- Final/cpp/tests/ChannelTest.cpp | 357 ---------------------------------------- 1 file changed, 357 deletions(-) delete mode 100644 Final/cpp/tests/ChannelTest.cpp (limited to 'Final/cpp/tests/ChannelTest.cpp') diff --git a/Final/cpp/tests/ChannelTest.cpp b/Final/cpp/tests/ChannelTest.cpp deleted file mode 100644 index cc0a90bad9..0000000000 --- a/Final/cpp/tests/ChannelTest.cpp +++ /dev/null @@ -1,357 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -using namespace boost; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; -using std::string; -using std::queue; - -struct DummyHandler : OutputHandler{ - std::vector frames; - - virtual void send(AMQFrame* frame){ - frames.push_back(frame); - } -}; - - -class ChannelTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ChannelTest); - CPPUNIT_TEST(testConsumerMgmt); - CPPUNIT_TEST(testDeliveryNoAck); - CPPUNIT_TEST(testDeliveryAndRecovery); - CPPUNIT_TEST(testStaging); - CPPUNIT_TEST(testQueuePolicy); - CPPUNIT_TEST(testFlow); - CPPUNIT_TEST_SUITE_END(); - - class MockMessageStore : public NullMessageStore - { - struct MethodCall - { - const string name; - Message* const msg; - const string data;//only needed for appendContent - - void check(const MethodCall& other) const - { - CPPUNIT_ASSERT_EQUAL(name, other.name); - CPPUNIT_ASSERT_EQUAL(msg, other.msg); - CPPUNIT_ASSERT_EQUAL(data, other.data); - } - }; - - queue expected; - bool expectMode;//true when setting up expected calls - - void handle(const MethodCall& call) - { - if (expectMode) { - expected.push(call); - } else { - call.check(expected.front()); - expected.pop(); - } - } - - void handle(const string& name, Message* msg, const string& data) - { - MethodCall call = {name, msg, data}; - handle(call); - } - - public: - - MockMessageStore() : expectMode(false) {} - - void stage(Message* const msg) - { - if(!expectMode) msg->setPersistenceId(1); - MethodCall call = {"stage", msg, ""}; - handle(call); - } - - void appendContent(Message* msg, const string& data) - { - MethodCall call = {"appendContent", msg, data}; - handle(call); - } - - void destroy(Message* msg) - { - MethodCall call = {"destroy", msg, ""}; - handle(call); - } - - void expect() - { - expectMode = true; - } - - void test() - { - expectMode = false; - } - - void check() - { - CPPUNIT_ASSERT(expected.empty()); - } - }; - - - public: - - void testConsumerMgmt(){ - Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(qpid::framing::highestProtocolVersion, 0, 0, 0); - CPPUNIT_ASSERT(!channel.exists("my_consumer")); - - ConnectionToken* owner = 0; - string tag("my_consumer"); - channel.consume(tag, queue, false, false, owner); - string tagA; - string tagB; - channel.consume(tagA, queue, false, false, owner); - channel.consume(tagB, queue, false, false, owner); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 3, queue->getConsumerCount()); - CPPUNIT_ASSERT(channel.exists("my_consumer")); - CPPUNIT_ASSERT(channel.exists(tagA)); - CPPUNIT_ASSERT(channel.exists(tagB)); - channel.cancel(tagA); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 2, queue->getConsumerCount()); - CPPUNIT_ASSERT(channel.exists("my_consumer")); - CPPUNIT_ASSERT(!channel.exists(tagA)); - CPPUNIT_ASSERT(channel.exists(tagB)); - channel.close(); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, queue->getConsumerCount()); - } - - void testDeliveryNoAck(){ - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000); - - const string data("abcdefghijklmn"); - - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); - - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast(handler.frames[0]->getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast(handler.frames[1]->getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast(handler.frames[2]->getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); - } - - void testDeliveryAndRecovery(){ - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000); - const string data("abcdefghijklmn"); - - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("ack"); - channel.consume(tag, queue, true, false, owner); - - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast(handler.frames[0]->getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast(handler.frames[1]->getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast(handler.frames[2]->getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); - } - - void testStaging(){ - MockMessageStore store; - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); - const string data[] = {"abcde", "fghij", "klmno"}; - - Message* msg = new Message(0, "my_exchange", "my_routing_key", false, false); - - store.expect(); - store.stage(msg); - for (int i = 0; i < 3; i++) { - store.appendContent(msg, data[i]); - } - store.destroy(msg); - store.test(); - - Exchange::shared_ptr exchange(new FanOutExchange("my_exchange")); - Queue::shared_ptr queue(new Queue("my_queue")); - exchange->bind(queue, "", 0); - - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - u_int64_t contentSize(0); - for (int i = 0; i < 3; i++) { - contentSize += data[i].size(); - } - header->setContentSize(contentSize); - channel.handlePublish(msg, exchange); - channel.handleHeader(header); - - for (int i = 0; i < 3; i++) { - AMQContentBody::shared_ptr body(new AMQContentBody(data[i])); - channel.handleContent(body); - } - Message::shared_ptr msg2 = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg, msg2.get()); - msg2.reset();//should trigger destroy call - - store.check(); - } - - - //NOTE: strictly speaking this should/could be part of QueueTest, - //but as it can usefully use the same utility classes as this - //class it is defined here for simpllicity - void testQueuePolicy() - { - MockMessageStore store; - {//must ensure that store is last thing deleted as it is needed by destructor of lazy loaded content - const string data1("abcd"); - const string data2("efghijk"); - const string data3("lmnopqrstuvwxyz"); - Message::shared_ptr msg1(createMessage("e", "A", "MsgA", data1.size())); - Message::shared_ptr msg2(createMessage("e", "B", "MsgB", data2.size())); - Message::shared_ptr msg3(createMessage("e", "C", "MsgC", data3.size())); - addContent(msg1, data1); - addContent(msg2, data2); - addContent(msg3, data3); - - QueuePolicy policy(2, 0);//third message should be stored on disk and lazy loaded - FieldTable settings; - policy.update(settings); - - store.expect(); - store.stage(msg3.get()); - store.destroy(msg3.get()); - store.test(); - - Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0)); - queue->configure(settings);//set policy - queue->deliver(msg1); - queue->deliver(msg2); - queue->deliver(msg3); - - Message::shared_ptr next = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg1, next); - CPPUNIT_ASSERT_EQUAL((u_int32_t) data1.size(), next->encodedContentSize()); - next = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg2, next); - CPPUNIT_ASSERT_EQUAL((u_int32_t) data2.size(), next->encodedContentSize()); - next = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg3, next); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, next->encodedContentSize()); - - next.reset(); - msg1.reset(); - msg2.reset(); - msg3.reset();//must clear all references to messages to allow them to be destroyed - - } - store.check(); - } - - - void testFlow(){ - DummyHandler handler; - Channel channel(qpid::framing::highestProtocolVersion, &handler, 7, 10000); - - const string data("abcdefghijklmn"); - - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); - channel.flow(false); - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 0, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount()); - channel.flow(true); - CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast(handler.frames[0]->getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast(handler.frames[1]->getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast(handler.frames[2]->getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); - } - - Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, u_int64_t contentSize) - { - Message* msg = new Message(0, exchange, routingKey, false, false); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(contentSize); - msg->setHeader(header); - msg->getHeaderProperties()->setMessageId(messageId); - return msg; - } - - void addContent(Message::shared_ptr msg, const string& data) - { - AMQContentBody::shared_ptr body(new AMQContentBody(data)); - msg->addContent(body); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ChannelTest); -- cgit v1.2.1