diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-04-02 11:40:48 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-04-02 11:40:48 +0000 |
commit | 16e203a0d32df9829bcf4fb738ef89fc94404155 (patch) | |
tree | b5dbb15f4a238ca377236ce16140443e20ed3e4a /cpp/src/tests/ClientChannelTest.cpp | |
parent | fb410c63d08e87019b3d2a8d85820ae809758f62 (diff) | |
download | qpid-python-16e203a0d32df9829bcf4fb738ef89fc94404155.tar.gz |
Fix for the most disruptive items in QPID-243.
* All #include lines now use '""' rather than '<>' where appropriate.
* #include lines within the qpid project use relative includes so that
the same path will work in /usr/include when installed as part of the
client libraries.
* All the source code has now been rearranged to be under src in a directory
analogous to the namespace of the classes in it.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@524769 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/ClientChannelTest.cpp')
-rw-r--r-- | cpp/src/tests/ClientChannelTest.cpp | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp new file mode 100644 index 0000000000..458931c4f4 --- /dev/null +++ b/cpp/src/tests/ClientChannelTest.cpp @@ -0,0 +1,193 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <vector> +#include "qpid_test_plugin.h" +#include "InProcessBroker.h" +#include "../client/ClientChannel.h" +#include "../client/ClientMessage.h" +#include "../client/ClientQueue.h" +#include "../client/ClientExchange.h" +#include "../client/MessageListener.h" + +using namespace std; +using namespace boost; +using namespace qpid::client; +using namespace qpid::sys; +using namespace qpid::framing; + +/// Small frame size so we can create fragmented messages. +const size_t FRAME_MAX = 256; + + +/** + * Test client API using an in-process broker. + */ +class ClientChannelTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ClientChannelTest); + CPPUNIT_TEST(testPublishGet); + CPPUNIT_TEST(testGetNoContent); + CPPUNIT_TEST(testConsumeCancel); + CPPUNIT_TEST(testConsumePublished); + CPPUNIT_TEST(testGetFragmentedMessage); + CPPUNIT_TEST(testConsumeFragmentedMessage); + CPPUNIT_TEST_SUITE_END(); + + struct Listener: public qpid::client::MessageListener { + vector<Message> messages; + Monitor monitor; + void received(Message& msg) { + Mutex::ScopedLock l(monitor); + messages.push_back(msg); + monitor.notifyAll(); + } + }; + + InProcessBrokerClient connection; // client::connection + local broker + Channel channel; + const std::string qname; + const std::string data; + Queue queue; + Exchange exchange; + Listener listener; + + public: + + ClientChannelTest() + : connection(FRAME_MAX), + qname("testq"), data("hello"), + queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE) + { + connection.openChannel(channel); + CPPUNIT_ASSERT(channel.getId() != 0); + channel.declareQueue(queue); + } + + void testPublishGet() { + Message pubMsg(data); + pubMsg.getHeaders().setString("hello", "world"); + channel.publish(pubMsg, exchange, qname); + Message getMsg; + CPPUNIT_ASSERT(channel.get(getMsg, queue)); + CPPUNIT_ASSERT_EQUAL(data, getMsg.getData()); + CPPUNIT_ASSERT_EQUAL(string("world"), + getMsg.getHeaders().getString("hello")); + CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue + } + + void testGetNoContent() { + Message pubMsg; + pubMsg.getHeaders().setString("hello", "world"); + channel.publish(pubMsg, exchange, qname); + Message getMsg; + CPPUNIT_ASSERT(channel.get(getMsg, queue)); + CPPUNIT_ASSERT(getMsg.getData().empty()); + CPPUNIT_ASSERT_EQUAL(string("world"), + getMsg.getHeaders().getString("hello")); + } + + void testConsumeCancel() { + string tag; // Broker assigned + channel.consume(queue, tag, &listener); + channel.start(); + CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); + channel.publish(Message("a"), exchange, qname); + { + Mutex::ScopedLock l(listener.monitor); + Time deadline(now() + 1*TIME_SEC); + while (listener.messages.size() != 1) { + CPPUNIT_ASSERT(listener.monitor.wait(deadline)); + } + } + CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size()); + CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData()); + + channel.publish(Message("b"), exchange, qname); + channel.publish(Message("c"), exchange, qname); + { + Mutex::ScopedLock l(listener.monitor); + while (listener.messages.size() != 3) { + CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); + } + } + CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size()); + CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData()); + CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData()); + + channel.cancel(tag); + channel.publish(Message("d"), exchange, qname); + CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size()); + { + Mutex::ScopedLock l(listener.monitor); + CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2)); + } + Message msg; + CPPUNIT_ASSERT(channel.get(msg, queue)); + CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData()); + } + + // Consume already-published messages + void testConsumePublished() { + Message pubMsg("x"); + pubMsg.getHeaders().setString("y", "z"); + channel.publish(pubMsg, exchange, qname); + string tag; + channel.consume(queue, tag, &listener); + CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); + channel.start(); + { + Mutex::ScopedLock l(listener.monitor); + while (listener.messages.size() != 1) + CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); + } + CPPUNIT_ASSERT_EQUAL(string("x"), listener.messages[0].getData()); + CPPUNIT_ASSERT_EQUAL(string("z"), + listener.messages[0].getHeaders().getString("y")); + } + + void testGetFragmentedMessage() { + string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size. + channel.publish(Message(longStr), exchange, qname); + Message getMsg; + CPPUNIT_ASSERT(channel.get(getMsg, queue)); + } + + void testConsumeFragmentedMessage() { + string xx(FRAME_MAX*2, 'x'); + channel.publish(Message(xx), exchange, qname); + channel.start(); + string tag; + channel.consume(queue, tag, &listener); + string yy(FRAME_MAX*2, 'y'); + channel.publish(Message(yy), exchange, qname); + { + Mutex::ScopedLock l(listener.monitor); + while (listener.messages.size() != 2) + CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); + } + CPPUNIT_ASSERT_EQUAL(xx, listener.messages[0].getData()); + CPPUNIT_ASSERT_EQUAL(yy, listener.messages[1].getData()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ClientChannelTest); |