diff options
Diffstat (limited to 'cpp/src/tests/MessageBuilderTest.cpp')
-rw-r--r-- | cpp/src/tests/MessageBuilderTest.cpp | 284 |
1 files changed, 137 insertions, 147 deletions
diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp index a12fc603ce..341fdf56f5 100644 --- a/cpp/src/tests/MessageBuilderTest.cpp +++ b/cpp/src/tests/MessageBuilderTest.cpp @@ -18,15 +18,13 @@ * under the License. * */ -#include "qpid/Exception.h" -#include "qpid/broker/BrokerMessage.h" +#include "qpid/broker/Message.h" #include "qpid/broker/MessageBuilder.h" #include "qpid/broker/NullMessageStore.h" -#include "qpid/framing/Buffer.h" +#include "qpid/framing/frame_functors.h" +#include "qpid/framing/TypeFilter.h" #include "qpid_test_plugin.h" -#include <iostream> -#include <memory> -#include "MockChannel.h" +#include <list> using namespace boost; using namespace qpid::broker; @@ -35,72 +33,55 @@ using namespace qpid::sys; class MessageBuilderTest : public CppUnit::TestCase { - struct MockHandler : CompletionHandler { - Message::shared_ptr msg; + class MockMessageStore : public NullMessageStore + { + enum Op {STAGE=1, APPEND=2}; - virtual void complete(Message::shared_ptr _msg){ - msg = _msg; + uint64_t id; + PersistableMessage* expectedMsg; + string expectedData; + std::list<Op> ops; + + void checkExpectation(Op actual) + { + CPPUNIT_ASSERT_EQUAL(ops.front(), actual); + ops.pop_front(); } - }; - class TestMessageStore : public NullMessageStore - { - Buffer* header; - Buffer* content; - const uint32_t contentBufferSize; - - public: + public: + MockMessageStore() : id(0), expectedMsg(0) {} - void stage(PersistableMessage& msg) - { - if (msg.getPersistenceId() == 0) { - header = new Buffer(msg.encodedSize()); - msg.encode(*header); - content = new Buffer(contentBufferSize); - msg.setPersistenceId(1); - } else { - throw qpid::Exception("Message already staged!"); - } + void expectStage(PersistableMessage& msg) + { + expectedMsg = &msg; + ops.push_back(STAGE); } - void appendContent(PersistableMessage& msg, const string& data) - { - if (msg.getPersistenceId() == 1) { - content->putRawData(data); - } else { - throw qpid::Exception("Invalid message id!"); - } + void expectAppendContent(PersistableMessage& msg, const string& data) + { + expectedMsg = &msg; + expectedData = data; + ops.push_back(APPEND); } - using NullMessageStore::destroy; + void stage(PersistableMessage& msg) + { + checkExpectation(STAGE); + CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg); + msg.setPersistenceId(++id); + } - void destroy(PersistableMessage& msg) + void appendContent(PersistableMessage& msg, const string& data) { - CPPUNIT_ASSERT(msg.getPersistenceId()); + checkExpectation(APPEND); + CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg); + CPPUNIT_ASSERT_EQUAL(expectedData, data); } - BasicMessage::shared_ptr getRestoredMessage() + bool expectationsMet() { - BasicMessage::shared_ptr msg(new BasicMessage()); - if (header) { - header->flip(); - msg->decodeHeader(*header); - delete header; - header = 0; - if (content) { - content->flip(); - msg->decodeContent(*content); - delete content; - content = 0; - } - } - return msg; + return ops.empty(); } - - //dont care about any of the other methods: - TestMessageStore(uint32_t _contentBufferSize) : NullMessageStore(), header(0), content(0), - contentBufferSize(_contentBufferSize) {} - ~TestMessageStore(){} }; CPPUNIT_TEST_SUITE(MessageBuilderTest); @@ -113,106 +94,115 @@ class MessageBuilderTest : public CppUnit::TestCase public: void testHeaderOnly(){ - MockHandler handler; - MessageBuilder builder(&handler); - - Message::shared_ptr message( - new BasicMessage( - 0, "test", "my_routing_key", false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(0); - - builder.initialise(message); - CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(&header); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); + MessageBuilder builder; + builder.start(SequenceNumber()); + + std::string exchange("builder-exchange"); + std::string key("builder-exchange"); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + + header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0); + header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); + + builder.handle(method); + builder.handle(header); + + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT_EQUAL(exchange, builder.getMessage()->getExchangeName()); + CPPUNIT_ASSERT_EQUAL(key, builder.getMessage()->getRoutingKey()); + CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete()); } void test1ContentFrame(){ - MockHandler handler; - MessageBuilder builder(&handler); + MessageBuilder builder; + builder.start(SequenceNumber()); - string data1("abcdefg"); + std::string data("abcdefg"); + std::string exchange("builder-exchange"); + std::string key("builder-exchange"); - Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(7); - AMQContentBody part1(data1); - - builder.initialise(message); - CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(&header); - CPPUNIT_ASSERT(!handler.msg); - builder.addContent(&part1); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + AMQFrame content(0, AMQContentBody(data)); + + header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data.size()); + header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); + + builder.handle(method); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete()); + + builder.handle(header); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete()); + + builder.handle(content); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete()); } void test2ContentFrames(){ - MockHandler handler; - MessageBuilder builder(&handler); - - string data1("abcdefg"); - string data2("hijklmn"); - - Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(14); - AMQContentBody part1(data1); - AMQContentBody part2(data2); - - builder.initialise(message); - CPPUNIT_ASSERT(!handler.msg); - builder.setHeader(&header); - CPPUNIT_ASSERT(!handler.msg); - builder.addContent(&part1); - CPPUNIT_ASSERT(!handler.msg); - builder.addContent(&part2); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); + MessageBuilder builder; + builder.start(SequenceNumber()); + + std::string data1("abcdefg"); + std::string data2("hijklmn"); + std::string exchange("builder-exchange"); + std::string key("builder-exchange"); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + AMQFrame content1(0, AMQContentBody(data1)); + AMQFrame content2(0, AMQContentBody(data2)); + + header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size()); + header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); + + builder.handle(method); + builder.handle(header); + builder.handle(content1); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete()); + + builder.handle(content2); + CPPUNIT_ASSERT(builder.getMessage()); + CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete()); } void testStaging(){ - //store must be the last thing to be destroyed or destructor - //of Message fails (it uses the store to call destroy if lazy - //loaded content is in use) - TestMessageStore store(14); - { - MockHandler handler; - MessageBuilder builder(&handler, &store, 5); - - string data1("abcdefg"); - string data2("hijklmn"); - - Message::shared_ptr message( - new BasicMessage(0, "test", "my_routing_key", false, false)); - AMQHeaderBody header(BASIC); - header.setContentSize(14); - BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header.getProperties()); - properties->setMessageId("MyMessage"); - properties->getHeaders().setString("abc", "xyz"); - - AMQContentBody part1(data1); - AMQContentBody part2(data2); - - builder.initialise(message); - builder.setHeader(&header); - builder.addContent(&part1); - builder.addContent(&part2); - CPPUNIT_ASSERT(handler.msg); - CPPUNIT_ASSERT_EQUAL(message, handler.msg); - - BasicMessage::shared_ptr restored = store.getRestoredMessage(); - CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange()); - CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey()); - CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId()); - CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"), - restored->getHeaderProperties()->getHeaders().getString("abc")); - CPPUNIT_ASSERT_EQUAL((uint64_t) 14, restored->contentSize()); - } + MockMessageStore store; + MessageBuilder builder(&store, 5); + builder.start(SequenceNumber()); + + std::string data1("abcdefg"); + std::string data2("hijklmn"); + std::string exchange("builder-exchange"); + std::string key("builder-exchange"); + + AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); + AMQFrame header(0, AMQHeaderBody()); + AMQFrame content1(0, AMQContentBody(data1)); + AMQFrame content2(0, AMQContentBody(data2)); + + header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size()); + header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); + + builder.handle(method); + builder.handle(header); + + store.expectStage(*builder.getMessage()); + builder.handle(content1); + CPPUNIT_ASSERT(store.expectationsMet()); + CPPUNIT_ASSERT_EQUAL((uint64_t) 1, builder.getMessage()->getPersistenceId()); + + store.expectAppendContent(*builder.getMessage(), data2); + builder.handle(content2); + CPPUNIT_ASSERT(store.expectationsMet()); + + //were the content frames dropped? + CPPUNIT_ASSERT_EQUAL((uint64_t) 0, builder.getMessage()->contentSize()); } }; |