summaryrefslogtreecommitdiff
path: root/cpp/src/tests/MessageBuilderTest.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
committerGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
commit9e10f4ea3b2f8ab6650f635cada48e4735ca20d7 (patch)
tree26ad3b8dffa17fa665fe7a033a7c8092839df011 /cpp/src/tests/MessageBuilderTest.cpp
parent6b09696b216c090b512c6af92bf7976ae3407add (diff)
downloadqpid-python-9e10f4ea3b2f8ab6650f635cada48e4735ca20d7.tar.gz
Updated message.transfer encoding to use header and content segments (including new structs).
Unified more between the basic and message classes messages. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/MessageBuilderTest.cpp')
-rw-r--r--cpp/src/tests/MessageBuilderTest.cpp284
1 files changed, 137 insertions, 147 deletions
diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp
index a12fc603ce..341fdf56f5 100644
--- a/cpp/src/tests/MessageBuilderTest.cpp
+++ b/cpp/src/tests/MessageBuilderTest.cpp
@@ -18,15 +18,13 @@
* under the License.
*
*/
-#include "qpid/Exception.h"
-#include "qpid/broker/BrokerMessage.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/NullMessageStore.h"
-#include "qpid/framing/Buffer.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/TypeFilter.h"
#include "qpid_test_plugin.h"
-#include <iostream>
-#include <memory>
-#include "MockChannel.h"
+#include <list>
using namespace boost;
using namespace qpid::broker;
@@ -35,72 +33,55 @@ using namespace qpid::sys;
class MessageBuilderTest : public CppUnit::TestCase
{
- struct MockHandler : CompletionHandler {
- Message::shared_ptr msg;
+ class MockMessageStore : public NullMessageStore
+ {
+ enum Op {STAGE=1, APPEND=2};
- virtual void complete(Message::shared_ptr _msg){
- msg = _msg;
+ uint64_t id;
+ PersistableMessage* expectedMsg;
+ string expectedData;
+ std::list<Op> ops;
+
+ void checkExpectation(Op actual)
+ {
+ CPPUNIT_ASSERT_EQUAL(ops.front(), actual);
+ ops.pop_front();
}
- };
- class TestMessageStore : public NullMessageStore
- {
- Buffer* header;
- Buffer* content;
- const uint32_t contentBufferSize;
-
- public:
+ public:
+ MockMessageStore() : id(0), expectedMsg(0) {}
- void stage(PersistableMessage& msg)
- {
- if (msg.getPersistenceId() == 0) {
- header = new Buffer(msg.encodedSize());
- msg.encode(*header);
- content = new Buffer(contentBufferSize);
- msg.setPersistenceId(1);
- } else {
- throw qpid::Exception("Message already staged!");
- }
+ void expectStage(PersistableMessage& msg)
+ {
+ expectedMsg = &msg;
+ ops.push_back(STAGE);
}
- void appendContent(PersistableMessage& msg, const string& data)
- {
- if (msg.getPersistenceId() == 1) {
- content->putRawData(data);
- } else {
- throw qpid::Exception("Invalid message id!");
- }
+ void expectAppendContent(PersistableMessage& msg, const string& data)
+ {
+ expectedMsg = &msg;
+ expectedData = data;
+ ops.push_back(APPEND);
}
- using NullMessageStore::destroy;
+ void stage(PersistableMessage& msg)
+ {
+ checkExpectation(STAGE);
+ CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg);
+ msg.setPersistenceId(++id);
+ }
- void destroy(PersistableMessage& msg)
+ void appendContent(PersistableMessage& msg, const string& data)
{
- CPPUNIT_ASSERT(msg.getPersistenceId());
+ checkExpectation(APPEND);
+ CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg);
+ CPPUNIT_ASSERT_EQUAL(expectedData, data);
}
- BasicMessage::shared_ptr getRestoredMessage()
+ bool expectationsMet()
{
- BasicMessage::shared_ptr msg(new BasicMessage());
- if (header) {
- header->flip();
- msg->decodeHeader(*header);
- delete header;
- header = 0;
- if (content) {
- content->flip();
- msg->decodeContent(*content);
- delete content;
- content = 0;
- }
- }
- return msg;
+ return ops.empty();
}
-
- //dont care about any of the other methods:
- TestMessageStore(uint32_t _contentBufferSize) : NullMessageStore(), header(0), content(0),
- contentBufferSize(_contentBufferSize) {}
- ~TestMessageStore(){}
};
CPPUNIT_TEST_SUITE(MessageBuilderTest);
@@ -113,106 +94,115 @@ class MessageBuilderTest : public CppUnit::TestCase
public:
void testHeaderOnly(){
- MockHandler handler;
- MessageBuilder builder(&handler);
-
- Message::shared_ptr message(
- new BasicMessage(
- 0, "test", "my_routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(0);
-
- builder.initialise(message);
- CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(&header);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ MessageBuilder builder;
+ builder.start(SequenceNumber());
+
+ std::string exchange("builder-exchange");
+ std::string key("builder-exchange");
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+
+ header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0);
+ header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
+
+ builder.handle(method);
+ builder.handle(header);
+
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT_EQUAL(exchange, builder.getMessage()->getExchangeName());
+ CPPUNIT_ASSERT_EQUAL(key, builder.getMessage()->getRoutingKey());
+ CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete());
}
void test1ContentFrame(){
- MockHandler handler;
- MessageBuilder builder(&handler);
+ MessageBuilder builder;
+ builder.start(SequenceNumber());
- string data1("abcdefg");
+ std::string data("abcdefg");
+ std::string exchange("builder-exchange");
+ std::string key("builder-exchange");
- Message::shared_ptr message(
- new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(7);
- AMQContentBody part1(data1);
-
- builder.initialise(message);
- CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(&header);
- CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(&part1);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ AMQFrame content(0, AMQContentBody(data));
+
+ header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data.size());
+ header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
+
+ builder.handle(method);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete());
+
+ builder.handle(header);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete());
+
+ builder.handle(content);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete());
}
void test2ContentFrames(){
- MockHandler handler;
- MessageBuilder builder(&handler);
-
- string data1("abcdefg");
- string data2("hijklmn");
-
- Message::shared_ptr message(
- new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(14);
- AMQContentBody part1(data1);
- AMQContentBody part2(data2);
-
- builder.initialise(message);
- CPPUNIT_ASSERT(!handler.msg);
- builder.setHeader(&header);
- CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(&part1);
- CPPUNIT_ASSERT(!handler.msg);
- builder.addContent(&part2);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ MessageBuilder builder;
+ builder.start(SequenceNumber());
+
+ std::string data1("abcdefg");
+ std::string data2("hijklmn");
+ std::string exchange("builder-exchange");
+ std::string key("builder-exchange");
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ AMQFrame content1(0, AMQContentBody(data1));
+ AMQFrame content2(0, AMQContentBody(data2));
+
+ header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size());
+ header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
+
+ builder.handle(method);
+ builder.handle(header);
+ builder.handle(content1);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(!builder.getMessage()->getFrames().isComplete());
+
+ builder.handle(content2);
+ CPPUNIT_ASSERT(builder.getMessage());
+ CPPUNIT_ASSERT(builder.getMessage()->getFrames().isComplete());
}
void testStaging(){
- //store must be the last thing to be destroyed or destructor
- //of Message fails (it uses the store to call destroy if lazy
- //loaded content is in use)
- TestMessageStore store(14);
- {
- MockHandler handler;
- MessageBuilder builder(&handler, &store, 5);
-
- string data1("abcdefg");
- string data2("hijklmn");
-
- Message::shared_ptr message(
- new BasicMessage(0, "test", "my_routing_key", false, false));
- AMQHeaderBody header(BASIC);
- header.setContentSize(14);
- BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header.getProperties());
- properties->setMessageId("MyMessage");
- properties->getHeaders().setString("abc", "xyz");
-
- AMQContentBody part1(data1);
- AMQContentBody part2(data2);
-
- builder.initialise(message);
- builder.setHeader(&header);
- builder.addContent(&part1);
- builder.addContent(&part2);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
-
- BasicMessage::shared_ptr restored = store.getRestoredMessage();
- CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange());
- CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey());
- CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId());
- CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"),
- restored->getHeaderProperties()->getHeaders().getString("abc"));
- CPPUNIT_ASSERT_EQUAL((uint64_t) 14, restored->contentSize());
- }
+ MockMessageStore store;
+ MessageBuilder builder(&store, 5);
+ builder.start(SequenceNumber());
+
+ std::string data1("abcdefg");
+ std::string data2("hijklmn");
+ std::string exchange("builder-exchange");
+ std::string key("builder-exchange");
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+ AMQFrame content1(0, AMQContentBody(data1));
+ AMQFrame content2(0, AMQContentBody(data2));
+
+ header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size());
+ header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
+
+ builder.handle(method);
+ builder.handle(header);
+
+ store.expectStage(*builder.getMessage());
+ builder.handle(content1);
+ CPPUNIT_ASSERT(store.expectationsMet());
+ CPPUNIT_ASSERT_EQUAL((uint64_t) 1, builder.getMessage()->getPersistenceId());
+
+ store.expectAppendContent(*builder.getMessage(), data2);
+ builder.handle(content2);
+ CPPUNIT_ASSERT(store.expectationsMet());
+
+ //were the content frames dropped?
+ CPPUNIT_ASSERT_EQUAL((uint64_t) 0, builder.getMessage()->contentSize());
}
};