diff options
author | Gordon Sim <gsim@apache.org> | 2007-08-28 19:38:17 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-08-28 19:38:17 +0000 |
commit | 9e10f4ea3b2f8ab6650f635cada48e4735ca20d7 (patch) | |
tree | 26ad3b8dffa17fa665fe7a033a7c8092839df011 /cpp/src/tests/BrokerChannelTest.cpp | |
parent | 6b09696b216c090b512c6af92bf7976ae3407add (diff) | |
download | qpid-python-9e10f4ea3b2f8ab6650f635cada48e4735ca20d7.tar.gz |
Updated message.transfer encoding to use header and content segments (including new structs).
Unified more between the basic and message classes messages.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/BrokerChannelTest.cpp')
-rw-r--r-- | cpp/src/tests/BrokerChannelTest.cpp | 97 |
1 files changed, 35 insertions, 62 deletions
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 3253a3d27a..1e5a30f157 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -19,12 +19,14 @@ * */ #include "qpid/broker/BrokerChannel.h" -#include "qpid/broker/BrokerMessage.h" #include "qpid/broker/BrokerQueue.h" #include "qpid/broker/FanOutExchange.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/MessageDelivery.h" #include "qpid/broker/NullMessageStore.h" #include "qpid_test_plugin.h" #include <iostream> +#include <sstream> #include <memory> #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/AMQFrame.h" @@ -72,7 +74,6 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_TEST_SUITE(BrokerChannelTest); CPPUNIT_TEST(testConsumerMgmt);; CPPUNIT_TEST(testDeliveryNoAck); - CPPUNIT_TEST(testStaging); CPPUNIT_TEST(testQueuePolicy); CPPUNIT_TEST(testFlow); CPPUNIT_TEST(testAsyncMesgToMoreThanOneQueue); @@ -155,7 +156,16 @@ class BrokerChannelTest : public CppUnit::TestCase void check() { - CPPUNIT_ASSERT(expected.empty()); + if (!expected.empty()) { + std::stringstream error; + error << "Expected: "; + while (!expected.empty()) { + MethodCall& m = expected.front(); + error << m.name << "(" << m.msg << ", '" << m.data << "'); "; + expected.pop(); + } + CPPUNIT_FAIL(error.str()); + } } }; @@ -173,7 +183,7 @@ class BrokerChannelTest : public CppUnit::TestCase void testConsumerMgmt(){ Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(connection, recorder, 0, 0); + Channel channel(connection, recorder, 0); channel.open(); CPPUNIT_ASSERT(!channel.exists("my_consumer")); @@ -203,7 +213,7 @@ class BrokerChannelTest : public CppUnit::TestCase Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); Queue::shared_ptr queue(new Queue("my_queue")); string tag("test"); - DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token")); + DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token")); channel.consume(token, tag, queue, false, false, 0); queue->deliver(msg); sleep(2); @@ -213,48 +223,6 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); } - void testStaging(){ - MockMessageStore store; - connection.setFrameMax(1000); - connection.setStagingThreshold(10); - Channel channel(connection, recorder, 1, &store); - const string data[] = {"abcde", "fghij", "klmno"}; - - Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false); - - store.expect(); - store.stage(*msg); - for (int i = 0; i < 3; i++) { - store.appendContent(*msg, data[i]); - } - store.destroy(*msg); - store.test(); - - Exchange::shared_ptr exchange = - broker->getExchanges().declare("my_exchange", "fanout").first; - Queue::shared_ptr queue(new Queue("my_queue")); - exchange->bind(queue, "", 0); - - AMQHeaderBody header(BASIC); - uint64_t contentSize(0); - for (int i = 0; i < 3; i++) { - contentSize += data[i].size(); - } - header.setContentSize(contentSize); - channel.handlePublish(msg); - channel.handleHeader(&header); - - for (int i = 0; i < 3; i++) { - AMQContentBody body(data[i]); - channel.handleContent(&body); - } - Message::shared_ptr msg2 = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg, msg2.get()); - msg2.reset();//should trigger destroy call - - store.check(); - } - //NOTE: strictly speaking this should/could be part of QueueTest, //but as it can usefully use the same utility classes as this @@ -279,7 +247,6 @@ class BrokerChannelTest : public CppUnit::TestCase store.expect(); store.stage(*msg3); - store.destroy(*msg3); store.test(); Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0)); @@ -348,16 +315,17 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(handler.frames[0].getBody())); - const string data("abcdefghijklmn"); - - Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); string tag("test"); - DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token")); + DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token")); channel.consume(token, tag, queue, false, false, 0); channel.flow(false); + + //'publish' a message + Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); + addContent(msg, "abcdefghijklmn"); queue->deliver(msg); + //ensure no messages have been delivered CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size()); @@ -369,21 +337,26 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); } - Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) + Message::shared_ptr createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) { - BasicMessage* msg = new BasicMessage( - 0, exchange, routingKey, false, false); - AMQHeaderBody header(BASIC); - header.setContentSize(contentSize); - msg->setHeader(&header); - msg->getHeaderProperties()->setMessageId(messageId); + Message::shared_ptr msg(new Message()); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + + msg->getFrames().append(method); + msg->getFrames().append(header); + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(contentSize); + props->setMessageId(messageId); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); return msg; } void addContent(Message::shared_ptr msg, const string& data) { - AMQContentBody body(data); - msg->addContent(&body); + AMQFrame content(0, AMQContentBody(data)); + msg->getFrames().append(content); } }; |