summaryrefslogtreecommitdiff
path: root/cpp/tests/ClientChannelTest.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-03-21 19:12:14 +0000
committerAlan Conway <aconway@apache.org>2007-03-21 19:12:14 +0000
commitc1b0ba624ff2de40b23342cf2a96885342884dad (patch)
treef4fa4ef721b1fe98543cb49cc2c31f03cee452ed /cpp/tests/ClientChannelTest.cpp
parentdf4faa062b3512312c78167bfbdf19ff969210ac (diff)
downloadqpid-python-c1b0ba624ff2de40b23342cf2a96885342884dad.tar.gz
Refactored client side for dual-mode Channel supporting either 0-9 Message or 0-8 Basic.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@520972 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/tests/ClientChannelTest.cpp')
-rw-r--r--cpp/tests/ClientChannelTest.cpp71
1 files changed, 55 insertions, 16 deletions
diff --git a/cpp/tests/ClientChannelTest.cpp b/cpp/tests/ClientChannelTest.cpp
index 7b0bc363fe..f22170691c 100644
--- a/cpp/tests/ClientChannelTest.cpp
+++ b/cpp/tests/ClientChannelTest.cpp
@@ -32,6 +32,10 @@ using namespace qpid::client;
using namespace qpid::sys;
using namespace qpid::framing;
+/// Small frame size so we can create fragmented messages.
+const size_t FRAME_MAX = 256;
+
+
/**
* Test client API using an in-process broker.
*/
@@ -42,6 +46,8 @@ class ClientChannelTest : public CppUnit::TestCase
CPPUNIT_TEST(testGetNoContent);
CPPUNIT_TEST(testConsumeCancel);
CPPUNIT_TEST(testConsumePublished);
+ CPPUNIT_TEST(testGetFragmentedMessage);
+ CPPUNIT_TEST(testConsumeFragmentedMessage);
CPPUNIT_TEST_SUITE_END();
struct Listener: public qpid::client::MessageListener {
@@ -65,7 +71,8 @@ class ClientChannelTest : public CppUnit::TestCase
public:
ClientChannelTest()
- : qname("testq"), data("hello"),
+ : connection(FRAME_MAX),
+ qname("testq"), data("hello"),
queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
{
connection.openChannel(channel);
@@ -76,21 +83,21 @@ class ClientChannelTest : public CppUnit::TestCase
void testPublishGet() {
Message pubMsg(data);
pubMsg.getHeaders().setString("hello", "world");
- channel.getBasic().publish(pubMsg, exchange, qname);
+ channel.publish(pubMsg, exchange, qname);
Message getMsg;
- CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue));
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
CPPUNIT_ASSERT_EQUAL(data, getMsg.getData());
CPPUNIT_ASSERT_EQUAL(string("world"),
getMsg.getHeaders().getString("hello"));
- CPPUNIT_ASSERT(!channel.getBasic().get(getMsg, queue)); // Empty queue
+ CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue
}
void testGetNoContent() {
Message pubMsg;
pubMsg.getHeaders().setString("hello", "world");
- channel.getBasic().publish(pubMsg, exchange, qname);
+ channel.publish(pubMsg, exchange, qname);
Message getMsg;
- CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue));
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
CPPUNIT_ASSERT(getMsg.getData().empty());
CPPUNIT_ASSERT_EQUAL(string("world"),
getMsg.getHeaders().getString("hello"));
@@ -98,10 +105,10 @@ class ClientChannelTest : public CppUnit::TestCase
void testConsumeCancel() {
string tag; // Broker assigned
- channel.getBasic().consume(queue, tag, &listener);
+ channel.consume(queue, tag, &listener);
channel.start();
CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
- channel.getBasic().publish(Message("a"), exchange, qname);
+ channel.publish(Message("a"), exchange, qname);
{
Mutex::ScopedLock l(listener.monitor);
Time deadline(now() + 1*TIME_SEC);
@@ -112,8 +119,8 @@ class ClientChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size());
CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData());
- channel.getBasic().publish(Message("b"), exchange, qname);
- channel.getBasic().publish(Message("c"), exchange, qname);
+ channel.publish(Message("b"), exchange, qname);
+ channel.publish(Message("c"), exchange, qname);
{
Mutex::ScopedLock l(listener.monitor);
while (listener.messages.size() != 3) {
@@ -124,15 +131,15 @@ class ClientChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData());
CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData());
- channel.getBasic().cancel(tag);
- channel.getBasic().publish(Message("d"), exchange, qname);
+ channel.cancel(tag);
+ channel.publish(Message("d"), exchange, qname);
CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
{
Mutex::ScopedLock l(listener.monitor);
CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2));
}
Message msg;
- CPPUNIT_ASSERT(channel.getBasic().get(msg, queue));
+ CPPUNIT_ASSERT(channel.get(msg, queue));
CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData());
}
@@ -140,9 +147,9 @@ class ClientChannelTest : public CppUnit::TestCase
void testConsumePublished() {
Message pubMsg("x");
pubMsg.getHeaders().setString("y", "z");
- channel.getBasic().publish(pubMsg, exchange, qname);
+ channel.publish(pubMsg, exchange, qname);
string tag;
- channel.getBasic().consume(queue, tag, &listener);
+ channel.consume(queue, tag, &listener);
CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
channel.start();
{
@@ -155,8 +162,40 @@ class ClientChannelTest : public CppUnit::TestCase
listener.messages[0].getHeaders().getString("y"));
}
+ void testGetFragmentedMessage() {
+ string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size.
+ channel.publish(Message(longStr), exchange, qname);
+ // FIXME aconway 2007-03-21: Remove couts.
+ cout << "==== Fragmented publish:" << endl
+ << connection.conversation << endl;
+ Message getMsg;
+ cout << "==== Fragmented get:" << endl
+ << connection.conversation << endl;
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
+ }
-
+ void testConsumeFragmentedMessage() {
+ string xx(FRAME_MAX*2, 'x');
+ channel.publish(Message(xx), exchange, qname);
+ cout << "==== Fragmented publish:" << endl
+ << connection.conversation << endl;
+ channel.start();
+ string tag;
+ channel.consume(queue, tag, &listener);
+ string yy(FRAME_MAX*2, 'y');
+ channel.publish(Message(yy), exchange, qname);
+ {
+ Mutex::ScopedLock l(listener.monitor);
+ while (listener.messages.size() != 2)
+ CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
+ }
+ // FIXME aconway 2007-03-21:
+ cout << "==== Fragmented consme 2 messages:" << endl
+ << connection.conversation << endl;
+
+ CPPUNIT_ASSERT_EQUAL(xx, listener.messages[0].getData());
+ CPPUNIT_ASSERT_EQUAL(yy, listener.messages[1].getData());
+ }
};
// Make this test suite a plugin.