summaryrefslogtreecommitdiff
path: root/cpp/src/tests/BrokerChannelTest.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
committerGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
commit9e10f4ea3b2f8ab6650f635cada48e4735ca20d7 (patch)
tree26ad3b8dffa17fa665fe7a033a7c8092839df011 /cpp/src/tests/BrokerChannelTest.cpp
parent6b09696b216c090b512c6af92bf7976ae3407add (diff)
downloadqpid-python-9e10f4ea3b2f8ab6650f635cada48e4735ca20d7.tar.gz
Updated message.transfer encoding to use header and content segments (including new structs).
Unified more between the basic and message classes messages. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/BrokerChannelTest.cpp')
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp97
1 files changed, 35 insertions, 62 deletions
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 3253a3d27a..1e5a30f157 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -19,12 +19,14 @@
*
*/
#include "qpid/broker/BrokerChannel.h"
-#include "qpid/broker/BrokerMessage.h"
#include "qpid/broker/BrokerQueue.h"
#include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageDelivery.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid_test_plugin.h"
#include <iostream>
+#include <sstream>
#include <memory>
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/AMQFrame.h"
@@ -72,7 +74,6 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_TEST_SUITE(BrokerChannelTest);
CPPUNIT_TEST(testConsumerMgmt);;
CPPUNIT_TEST(testDeliveryNoAck);
- CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testQueuePolicy);
CPPUNIT_TEST(testFlow);
CPPUNIT_TEST(testAsyncMesgToMoreThanOneQueue);
@@ -155,7 +156,16 @@ class BrokerChannelTest : public CppUnit::TestCase
void check()
{
- CPPUNIT_ASSERT(expected.empty());
+ if (!expected.empty()) {
+ std::stringstream error;
+ error << "Expected: ";
+ while (!expected.empty()) {
+ MethodCall& m = expected.front();
+ error << m.name << "(" << m.msg << ", '" << m.data << "'); ";
+ expected.pop();
+ }
+ CPPUNIT_FAIL(error.str());
+ }
}
};
@@ -173,7 +183,7 @@ class BrokerChannelTest : public CppUnit::TestCase
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(connection, recorder, 0, 0);
+ Channel channel(connection, recorder, 0);
channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
@@ -203,7 +213,7 @@ class BrokerChannelTest : public CppUnit::TestCase
Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
Queue::shared_ptr queue(new Queue("my_queue"));
string tag("test");
- DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token"));
channel.consume(token, tag, queue, false, false, 0);
queue->deliver(msg);
sleep(2);
@@ -213,48 +223,6 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
- void testStaging(){
- MockMessageStore store;
- connection.setFrameMax(1000);
- connection.setStagingThreshold(10);
- Channel channel(connection, recorder, 1, &store);
- const string data[] = {"abcde", "fghij", "klmno"};
-
- Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false);
-
- store.expect();
- store.stage(*msg);
- for (int i = 0; i < 3; i++) {
- store.appendContent(*msg, data[i]);
- }
- store.destroy(*msg);
- store.test();
-
- Exchange::shared_ptr exchange =
- broker->getExchanges().declare("my_exchange", "fanout").first;
- Queue::shared_ptr queue(new Queue("my_queue"));
- exchange->bind(queue, "", 0);
-
- AMQHeaderBody header(BASIC);
- uint64_t contentSize(0);
- for (int i = 0; i < 3; i++) {
- contentSize += data[i].size();
- }
- header.setContentSize(contentSize);
- channel.handlePublish(msg);
- channel.handleHeader(&header);
-
- for (int i = 0; i < 3; i++) {
- AMQContentBody body(data[i]);
- channel.handleContent(&body);
- }
- Message::shared_ptr msg2 = queue->dequeue();
- CPPUNIT_ASSERT_EQUAL(msg, msg2.get());
- msg2.reset();//should trigger destroy call
-
- store.check();
- }
-
//NOTE: strictly speaking this should/could be part of QueueTest,
//but as it can usefully use the same utility classes as this
@@ -279,7 +247,6 @@ class BrokerChannelTest : public CppUnit::TestCase
store.expect();
store.stage(*msg3);
- store.destroy(*msg3);
store.test();
Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0));
@@ -348,16 +315,17 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(handler.frames[0].getBody()));
- const string data("abcdefghijklmn");
-
- Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
- addContent(msg, data);
Queue::shared_ptr queue(new Queue("my_queue"));
string tag("test");
- DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token"));
+ DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token"));
channel.consume(token, tag, queue, false, false, 0);
channel.flow(false);
+
+ //'publish' a message
+ Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+ addContent(msg, "abcdefghijklmn");
queue->deliver(msg);
+
//ensure no messages have been delivered
CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size());
@@ -369,21 +337,26 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
- Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
+ Message::shared_ptr createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
{
- BasicMessage* msg = new BasicMessage(
- 0, exchange, routingKey, false, false);
- AMQHeaderBody header(BASIC);
- header.setContentSize(contentSize);
- msg->setHeader(&header);
- msg->getHeaderProperties()->setMessageId(messageId);
+ Message::shared_ptr msg(new Message());
+
+ AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
+ AMQFrame header(0, AMQHeaderBody());
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(contentSize);
+ props->setMessageId(messageId);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
return msg;
}
void addContent(Message::shared_ptr msg, const string& data)
{
- AMQContentBody body(data);
- msg->addContent(&body);
+ AMQFrame content(0, AMQContentBody(data));
+ msg->getFrames().append(content);
}
};