diff options
Diffstat (limited to 'qpid/cpp/tests')
-rw-r--r-- | qpid/cpp/tests/ClientChannelTest.cpp | 71 | ||||
-rw-r--r-- | qpid/cpp/tests/InProcessBroker.h | 25 | ||||
-rw-r--r-- | qpid/cpp/tests/ProducerConsumerTest.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/tests/client_test.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/tests/echo_service.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/tests/topic_listener.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/tests/topic_publisher.cpp | 9 |
7 files changed, 90 insertions, 44 deletions
diff --git a/qpid/cpp/tests/ClientChannelTest.cpp b/qpid/cpp/tests/ClientChannelTest.cpp index 7b0bc363fe..f22170691c 100644 --- a/qpid/cpp/tests/ClientChannelTest.cpp +++ b/qpid/cpp/tests/ClientChannelTest.cpp @@ -32,6 +32,10 @@ using namespace qpid::client; using namespace qpid::sys; using namespace qpid::framing; +/// Small frame size so we can create fragmented messages. +const size_t FRAME_MAX = 256; + + /** * Test client API using an in-process broker. */ @@ -42,6 +46,8 @@ class ClientChannelTest : public CppUnit::TestCase CPPUNIT_TEST(testGetNoContent); CPPUNIT_TEST(testConsumeCancel); CPPUNIT_TEST(testConsumePublished); + CPPUNIT_TEST(testGetFragmentedMessage); + CPPUNIT_TEST(testConsumeFragmentedMessage); CPPUNIT_TEST_SUITE_END(); struct Listener: public qpid::client::MessageListener { @@ -65,7 +71,8 @@ class ClientChannelTest : public CppUnit::TestCase public: ClientChannelTest() - : qname("testq"), data("hello"), + : connection(FRAME_MAX), + qname("testq"), data("hello"), queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE) { connection.openChannel(channel); @@ -76,21 +83,21 @@ class ClientChannelTest : public CppUnit::TestCase void testPublishGet() { Message pubMsg(data); pubMsg.getHeaders().setString("hello", "world"); - channel.getBasic().publish(pubMsg, exchange, qname); + channel.publish(pubMsg, exchange, qname); Message getMsg; - CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue)); + CPPUNIT_ASSERT(channel.get(getMsg, queue)); CPPUNIT_ASSERT_EQUAL(data, getMsg.getData()); CPPUNIT_ASSERT_EQUAL(string("world"), getMsg.getHeaders().getString("hello")); - CPPUNIT_ASSERT(!channel.getBasic().get(getMsg, queue)); // Empty queue + CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue } void testGetNoContent() { Message pubMsg; pubMsg.getHeaders().setString("hello", "world"); - channel.getBasic().publish(pubMsg, exchange, qname); + channel.publish(pubMsg, exchange, qname); Message getMsg; - CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue)); + CPPUNIT_ASSERT(channel.get(getMsg, queue)); CPPUNIT_ASSERT(getMsg.getData().empty()); CPPUNIT_ASSERT_EQUAL(string("world"), getMsg.getHeaders().getString("hello")); @@ -98,10 +105,10 @@ class ClientChannelTest : public CppUnit::TestCase void testConsumeCancel() { string tag; // Broker assigned - channel.getBasic().consume(queue, tag, &listener); + channel.consume(queue, tag, &listener); channel.start(); CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); - channel.getBasic().publish(Message("a"), exchange, qname); + channel.publish(Message("a"), exchange, qname); { Mutex::ScopedLock l(listener.monitor); Time deadline(now() + 1*TIME_SEC); @@ -112,8 +119,8 @@ class ClientChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size()); CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData()); - channel.getBasic().publish(Message("b"), exchange, qname); - channel.getBasic().publish(Message("c"), exchange, qname); + channel.publish(Message("b"), exchange, qname); + channel.publish(Message("c"), exchange, qname); { Mutex::ScopedLock l(listener.monitor); while (listener.messages.size() != 3) { @@ -124,15 +131,15 @@ class ClientChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData()); CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData()); - channel.getBasic().cancel(tag); - channel.getBasic().publish(Message("d"), exchange, qname); + channel.cancel(tag); + channel.publish(Message("d"), exchange, qname); CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size()); { Mutex::ScopedLock l(listener.monitor); CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2)); } Message msg; - CPPUNIT_ASSERT(channel.getBasic().get(msg, queue)); + CPPUNIT_ASSERT(channel.get(msg, queue)); CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData()); } @@ -140,9 +147,9 @@ class ClientChannelTest : public CppUnit::TestCase void testConsumePublished() { Message pubMsg("x"); pubMsg.getHeaders().setString("y", "z"); - channel.getBasic().publish(pubMsg, exchange, qname); + channel.publish(pubMsg, exchange, qname); string tag; - channel.getBasic().consume(queue, tag, &listener); + channel.consume(queue, tag, &listener); CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); channel.start(); { @@ -155,8 +162,40 @@ class ClientChannelTest : public CppUnit::TestCase listener.messages[0].getHeaders().getString("y")); } + void testGetFragmentedMessage() { + string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size. + channel.publish(Message(longStr), exchange, qname); + // FIXME aconway 2007-03-21: Remove couts. + cout << "==== Fragmented publish:" << endl + << connection.conversation << endl; + Message getMsg; + cout << "==== Fragmented get:" << endl + << connection.conversation << endl; + CPPUNIT_ASSERT(channel.get(getMsg, queue)); + } - + void testConsumeFragmentedMessage() { + string xx(FRAME_MAX*2, 'x'); + channel.publish(Message(xx), exchange, qname); + cout << "==== Fragmented publish:" << endl + << connection.conversation << endl; + channel.start(); + string tag; + channel.consume(queue, tag, &listener); + string yy(FRAME_MAX*2, 'y'); + channel.publish(Message(yy), exchange, qname); + { + Mutex::ScopedLock l(listener.monitor); + while (listener.messages.size() != 2) + CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); + } + // FIXME aconway 2007-03-21: + cout << "==== Fragmented consme 2 messages:" << endl + << connection.conversation << endl; + + CPPUNIT_ASSERT_EQUAL(xx, listener.messages[0].getData()); + CPPUNIT_ASSERT_EQUAL(yy, listener.messages[1].getData()); + } }; // Make this test suite a plugin. diff --git a/qpid/cpp/tests/InProcessBroker.h b/qpid/cpp/tests/InProcessBroker.h index 709ca9b953..833b821d11 100644 --- a/qpid/cpp/tests/InProcessBroker.h +++ b/qpid/cpp/tests/InProcessBroker.h @@ -145,25 +145,30 @@ std::ostream& operator<<( return out; } +} // namespace broker -}} // namespace qpid::broker - +namespace client { /** An in-process client+broker all in one. */ -class InProcessBrokerClient : public qpid::client::Connection { +class InProcessBrokerClient : public client::Connection { public: - qpid::broker::InProcessBroker broker; - qpid::broker::InProcessBroker::Conversation& conversation; + broker::InProcessBroker broker; + broker::InProcessBroker::Conversation& conversation; /** Constructor creates broker and opens client connection. */ - InProcessBrokerClient(qpid::framing::ProtocolVersion version= - qpid::framing::highestProtocolVersion - ) : broker(version), conversation(broker.conversation) + InProcessBrokerClient( + u_int32_t max_frame_size=65536, + framing::ProtocolVersion version= framing::highestProtocolVersion + ) : client::Connection(false, max_frame_size, version), + broker(version), + conversation(broker.conversation) { setConnector(broker); open(""); } - - ~InProcessBrokerClient() {} }; + +}} // namespace qpid::client + + #endif // _tests_InProcessBroker_h diff --git a/qpid/cpp/tests/ProducerConsumerTest.cpp b/qpid/cpp/tests/ProducerConsumerTest.cpp index e6d4090596..1f2aeffbc5 100644 --- a/qpid/cpp/tests/ProducerConsumerTest.cpp +++ b/qpid/cpp/tests/ProducerConsumerTest.cpp @@ -30,8 +30,9 @@ #include "AMQP_HighestVersion.h" #include "sys/AtomicCount.h" -using namespace qpid::sys; -using namespace qpid::framing; +using namespace qpid; +using namespace sys; +using namespace framing; using namespace boost; using namespace std; @@ -99,7 +100,7 @@ class ProducerConsumerTest : public CppUnit::TestCase CPPUNIT_TEST_SUITE_END(); public: - InProcessBrokerClient client; + client::InProcessBrokerClient client; ProducerConsumer pc; WatchedCounter stopped; @@ -166,7 +167,7 @@ class ProducerConsumerTest : public CppUnit::TestCase } public: - ProducerConsumerTest() : client(highestProtocolVersion) {} + ProducerConsumerTest() : client() {} void testProduceConsume() { ConsumeRunnable runMe(*this); diff --git a/qpid/cpp/tests/client_test.cpp b/qpid/cpp/tests/client_test.cpp index 413523a6a7..92952c69b1 100644 --- a/qpid/cpp/tests/client_test.cpp +++ b/qpid/cpp/tests/client_test.cpp @@ -102,7 +102,7 @@ int main(int argc, char**) Monitor monitor; SimpleListener listener(&monitor); string tag("MyTag"); - channel.getBasic().consume(queue, tag, &listener); + channel.consume(queue, tag, &listener); if (verbose) std::cout << "Registered consumer." << std::endl; //we need to enable the message dispatching for this channel @@ -115,7 +115,7 @@ int main(int argc, char**) Message msg; string data("MyMessage"); msg.setData(data); - channel.getBasic().publish(msg, exchange, "MyTopic"); + channel.publish(msg, exchange, "MyTopic"); if (verbose) std::cout << "Published message: " << data << std::endl; { diff --git a/qpid/cpp/tests/echo_service.cpp b/qpid/cpp/tests/echo_service.cpp index 412ffbeb58..ff11a336fe 100644 --- a/qpid/cpp/tests/echo_service.cpp +++ b/qpid/cpp/tests/echo_service.cpp @@ -116,7 +116,7 @@ int main(int argc, char** argv){ //Consume from the response queue, logging all echoed message to console: LoggingListener listener; std::string tag; - channel.getBasic().consume(response, tag, &listener); + channel.consume(response, tag, &listener); //Process incoming requests on a new thread channel.start(); @@ -129,7 +129,7 @@ int main(int argc, char** argv){ Message msg; msg.getHeaders().setString("RESPONSE_QUEUE", response.getName()); msg.setData(text); - channel.getBasic().publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service); + channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service); std::cout << "Enter text to send:" << std::endl; } @@ -158,10 +158,10 @@ int main(int argc, char** argv){ //Consume from the request queue, echoing back all messages received to the client that sent them EchoServer server(&channel); std::string tag = "server_tag"; - channel.getBasic().consume(request, tag, &server); + channel.consume(request, tag, &server); //Process incoming requests on the main thread - channel.getBasic().run(); + channel.run(); connection.close(); } catch(qpid::QpidError error) { @@ -184,7 +184,7 @@ void EchoServer::received(Message& message) std::cout << "Echoing " << message.getData() << " back to " << name << std::endl; //'echo' the message back: - channel->getBasic().publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name); + channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name); } } diff --git a/qpid/cpp/tests/topic_listener.cpp b/qpid/cpp/tests/topic_listener.cpp index 5f5500f7b9..5928dac49a 100644 --- a/qpid/cpp/tests/topic_listener.cpp +++ b/qpid/cpp/tests/topic_listener.cpp @@ -119,9 +119,9 @@ int main(int argc, char** argv){ //set up listener Listener listener(&channel, response.getName(), args.getTransactional()); string tag; - channel.getBasic().consume(control, tag, &listener, args.getAckMode()); + channel.consume(control, tag, &listener, args.getAckMode()); cout << "topic_listener: Consuming." << endl; - channel.getBasic().run(); + channel.run(); connection.close(); cout << "topic_listener: normal exit" << endl; return 0; @@ -166,7 +166,7 @@ void Listener::report(){ << time/TIME_MSEC << " ms."; Message msg(reportstr.str()); msg.getHeaders().setString("TYPE", "REPORT"); - channel->getBasic().publish(msg, string(), responseQueue); + channel->publish(msg, string(), responseQueue); if(transactional){ channel->commit(); } diff --git a/qpid/cpp/tests/topic_publisher.cpp b/qpid/cpp/tests/topic_publisher.cpp index 0e6c63ab35..2fd1e6b810 100644 --- a/qpid/cpp/tests/topic_publisher.cpp +++ b/qpid/cpp/tests/topic_publisher.cpp @@ -129,7 +129,7 @@ int main(int argc, char** argv) { //set up listener Publisher publisher(&channel, "topic_control", args.getTransactional()); std::string tag("mytag"); - channel.getBasic().consume(response, tag, &publisher, args.getAckMode()); + channel.consume(response, tag, &publisher, args.getAckMode()); channel.start(); int batchSize(args.getBatches()); @@ -187,12 +187,13 @@ int64_t Publisher::publish(int msgs, int listeners, int size){ { Monitor::ScopedLock l(monitor); for(int i = 0; i < msgs; i++){ - channel->getBasic().publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); + channel->publish( + msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); } //send report request Message reportRequest; reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); - channel->getBasic().publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); + channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); if(transactional){ channel->commit(); } @@ -216,7 +217,7 @@ void Publisher::terminate(){ //send termination request Message terminationRequest; terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); - channel->getBasic().publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); + channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); if(transactional){ channel->commit(); } |