diff options
author | Gordon Sim <gsim@apache.org> | 2006-11-24 17:21:47 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-11-24 17:21:47 +0000 |
commit | d6befaeb77df8a09845e4c11070afe8ab4d5052d (patch) | |
tree | b5f9ec40dedf2053d04c87f0117f0953a3026180 /cpp/test | |
parent | b442c78351bf330cf23b67e86aa17424d5a78966 (diff) | |
download | qpid-python-d6befaeb77df8a09845e4c11070afe8ab4d5052d.tar.gz |
Initial sketching out of staging functionality for large messages (i.e. allowing content to be stored as it arrives, rather than collecting it in memory until complete).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@478923 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/test')
-rw-r--r-- | cpp/test/unit/qpid/broker/MessageBuilderTest.cpp | 84 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/TxAckTest.cpp | 15 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/TxPublishTest.cpp | 14 |
3 files changed, 90 insertions, 23 deletions
diff --git a/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp b/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp index 1976cdf286..a5f7911fc8 100644 --- a/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp +++ b/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp @@ -18,8 +18,11 @@ * under the License. * */ +#include <qpid/Exception.h> #include <qpid/broker/Message.h> #include <qpid/broker/MessageBuilder.h> +#include <qpid/broker/NullMessageStore.h> +#include <qpid/framing/Buffer.h> #include <qpid_test_plugin.h> #include <iostream> #include <memory> @@ -39,11 +42,58 @@ class MessageBuilderTest : public CppUnit::TestCase } }; + class TestMessageStore : public NullMessageStore + { + Buffer* header; + Buffer* content; + const u_int32_t contentBufferSize; + + public: + + void stage(Message::shared_ptr& msg) + { + if (msg->getPersistenceId() == 0) { + header = new Buffer(msg->encodedHeaderSize()); + msg->encodeHeader(*header); + content = new Buffer(contentBufferSize); + msg->encodeContent(*content); + } else if (!header || !content) { + throw qpid::Exception("Buffers not initialised!"); + } else { + msg->encodeContent(*content); + } + msg->setPersistenceId(1); + } + + Message::shared_ptr getRestoredMessage() + { + Message::shared_ptr msg(new Message()); + if (header) { + header->flip(); + msg->decodeHeader(*header); + delete header; + header = 0; + if (content) { + content->flip(); + msg->decodeContent(*content); + delete content; + content = 0; + } + } + return msg; + } + + //dont care about any of the other methods: + TestMessageStore(u_int32_t _contentBufferSize) : NullMessageStore(false), header(0), content(0), + contentBufferSize(_contentBufferSize) {} + ~TestMessageStore(){} + }; CPPUNIT_TEST_SUITE(MessageBuilderTest); CPPUNIT_TEST(testHeaderOnly); CPPUNIT_TEST(test1ContentFrame); CPPUNIT_TEST(test2ContentFrames); + CPPUNIT_TEST(testStaging); CPPUNIT_TEST_SUITE_END(); public: @@ -106,6 +156,40 @@ class MessageBuilderTest : public CppUnit::TestCase CPPUNIT_ASSERT(handler.msg); CPPUNIT_ASSERT_EQUAL(message, handler.msg); } + + void testStaging(){ + DummyHandler handler; + TestMessageStore store(50);//more than enough for two frames of 14 bytes + MessageBuilder builder(&handler, &store, 5); + + string data1("abcdefg"); + string data2("hijklmn"); + + Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); + properties->setMessageId("MyMessage"); + properties->getHeaders().setString("abc", "xyz"); + + AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); + AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); + + builder.initialise(message); + builder.setHeader(header); + builder.addContent(part1); + builder.addContent(part2); + CPPUNIT_ASSERT(handler.msg); + CPPUNIT_ASSERT_EQUAL(message, handler.msg); + + Message::shared_ptr restored = store.getRestoredMessage(); + CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange()); + CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey()); + CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId()); + CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"), + restored->getHeaderProperties()->getHeaders().getString("abc")); + CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, restored->contentSize()); + } }; // Make this test suite a plugin. diff --git a/cpp/test/unit/qpid/broker/TxAckTest.cpp b/cpp/test/unit/qpid/broker/TxAckTest.cpp index 91f0bd1498..47693a7133 100644 --- a/cpp/test/unit/qpid/broker/TxAckTest.cpp +++ b/cpp/test/unit/qpid/broker/TxAckTest.cpp @@ -18,7 +18,7 @@ * under the License. * */ -#include <qpid/broker/MessageStore.h> +#include <qpid/broker/NullMessageStore.h> #include <qpid/broker/RecoveryManager.h> #include <qpid/broker/TxAck.h> #include <qpid_test_plugin.h> @@ -34,7 +34,7 @@ using namespace qpid::framing; class TxAckTest : public CppUnit::TestCase { - class TestMessageStore : public MessageStore + class TestMessageStore : public NullMessageStore { public: vector<Message::shared_ptr> dequeued; @@ -44,16 +44,7 @@ class TxAckTest : public CppUnit::TestCase dequeued.push_back(msg); } - //dont care about any of the other methods: - void create(const Queue&){} - void destroy(const Queue&){} - void recover(RecoveryManager&){} - void enqueue(TransactionContext*, Message::shared_ptr&, const Queue&, const string * const){} - void committed(const string * const){} - void aborted(const string * const){} - std::auto_ptr<TransactionContext> begin(){ return std::auto_ptr<TransactionContext>(); } - void commit(TransactionContext*){} - void abort(TransactionContext*){} + TestMessageStore() : NullMessageStore(false) {} ~TestMessageStore(){} }; diff --git a/cpp/test/unit/qpid/broker/TxPublishTest.cpp b/cpp/test/unit/qpid/broker/TxPublishTest.cpp index a28d1127de..b301405d22 100644 --- a/cpp/test/unit/qpid/broker/TxPublishTest.cpp +++ b/cpp/test/unit/qpid/broker/TxPublishTest.cpp @@ -18,7 +18,7 @@ * under the License. * */ -#include <qpid/broker/MessageStore.h> +#include <qpid/broker/NullMessageStore.h> #include <qpid/broker/RecoveryManager.h> #include <qpid/broker/TxPublish.h> #include <qpid_test_plugin.h> @@ -35,7 +35,7 @@ using namespace qpid::framing; class TxPublishTest : public CppUnit::TestCase { - class TestMessageStore : public MessageStore + class TestMessageStore : public NullMessageStore { public: vector< pair<string, Message::shared_ptr> > enqueued; @@ -46,15 +46,7 @@ class TxPublishTest : public CppUnit::TestCase } //dont care about any of the other methods: - void create(const Queue&){} - void destroy(const Queue&){} - void recover(RecoveryManager&){} - void dequeue(TransactionContext*, Message::shared_ptr&, const Queue&, const string * const){} - void committed(const string * const){} - void aborted(const string * const){} - std::auto_ptr<TransactionContext> begin(){ return std::auto_ptr<TransactionContext>(); } - void commit(TransactionContext*){} - void abort(TransactionContext*){} + TestMessageStore() : NullMessageStore(false) {} ~TestMessageStore(){} }; |