summaryrefslogtreecommitdiff
path: root/qpid/cpp/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/tests')
-rw-r--r--qpid/cpp/tests/ClientChannelTest.cpp71
-rw-r--r--qpid/cpp/tests/InProcessBroker.h25
-rw-r--r--qpid/cpp/tests/ProducerConsumerTest.cpp9
-rw-r--r--qpid/cpp/tests/client_test.cpp4
-rw-r--r--qpid/cpp/tests/echo_service.cpp10
-rw-r--r--qpid/cpp/tests/topic_listener.cpp6
-rw-r--r--qpid/cpp/tests/topic_publisher.cpp9
7 files changed, 90 insertions, 44 deletions
diff --git a/qpid/cpp/tests/ClientChannelTest.cpp b/qpid/cpp/tests/ClientChannelTest.cpp
index 7b0bc363fe..f22170691c 100644
--- a/qpid/cpp/tests/ClientChannelTest.cpp
+++ b/qpid/cpp/tests/ClientChannelTest.cpp
@@ -32,6 +32,10 @@ using namespace qpid::client;
using namespace qpid::sys;
using namespace qpid::framing;
+/// Small frame size so we can create fragmented messages.
+const size_t FRAME_MAX = 256;
+
+
/**
* Test client API using an in-process broker.
*/
@@ -42,6 +46,8 @@ class ClientChannelTest : public CppUnit::TestCase
CPPUNIT_TEST(testGetNoContent);
CPPUNIT_TEST(testConsumeCancel);
CPPUNIT_TEST(testConsumePublished);
+ CPPUNIT_TEST(testGetFragmentedMessage);
+ CPPUNIT_TEST(testConsumeFragmentedMessage);
CPPUNIT_TEST_SUITE_END();
struct Listener: public qpid::client::MessageListener {
@@ -65,7 +71,8 @@ class ClientChannelTest : public CppUnit::TestCase
public:
ClientChannelTest()
- : qname("testq"), data("hello"),
+ : connection(FRAME_MAX),
+ qname("testq"), data("hello"),
queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
{
connection.openChannel(channel);
@@ -76,21 +83,21 @@ class ClientChannelTest : public CppUnit::TestCase
void testPublishGet() {
Message pubMsg(data);
pubMsg.getHeaders().setString("hello", "world");
- channel.getBasic().publish(pubMsg, exchange, qname);
+ channel.publish(pubMsg, exchange, qname);
Message getMsg;
- CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue));
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
CPPUNIT_ASSERT_EQUAL(data, getMsg.getData());
CPPUNIT_ASSERT_EQUAL(string("world"),
getMsg.getHeaders().getString("hello"));
- CPPUNIT_ASSERT(!channel.getBasic().get(getMsg, queue)); // Empty queue
+ CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue
}
void testGetNoContent() {
Message pubMsg;
pubMsg.getHeaders().setString("hello", "world");
- channel.getBasic().publish(pubMsg, exchange, qname);
+ channel.publish(pubMsg, exchange, qname);
Message getMsg;
- CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue));
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
CPPUNIT_ASSERT(getMsg.getData().empty());
CPPUNIT_ASSERT_EQUAL(string("world"),
getMsg.getHeaders().getString("hello"));
@@ -98,10 +105,10 @@ class ClientChannelTest : public CppUnit::TestCase
void testConsumeCancel() {
string tag; // Broker assigned
- channel.getBasic().consume(queue, tag, &listener);
+ channel.consume(queue, tag, &listener);
channel.start();
CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
- channel.getBasic().publish(Message("a"), exchange, qname);
+ channel.publish(Message("a"), exchange, qname);
{
Mutex::ScopedLock l(listener.monitor);
Time deadline(now() + 1*TIME_SEC);
@@ -112,8 +119,8 @@ class ClientChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size());
CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData());
- channel.getBasic().publish(Message("b"), exchange, qname);
- channel.getBasic().publish(Message("c"), exchange, qname);
+ channel.publish(Message("b"), exchange, qname);
+ channel.publish(Message("c"), exchange, qname);
{
Mutex::ScopedLock l(listener.monitor);
while (listener.messages.size() != 3) {
@@ -124,15 +131,15 @@ class ClientChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData());
CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData());
- channel.getBasic().cancel(tag);
- channel.getBasic().publish(Message("d"), exchange, qname);
+ channel.cancel(tag);
+ channel.publish(Message("d"), exchange, qname);
CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
{
Mutex::ScopedLock l(listener.monitor);
CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2));
}
Message msg;
- CPPUNIT_ASSERT(channel.getBasic().get(msg, queue));
+ CPPUNIT_ASSERT(channel.get(msg, queue));
CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData());
}
@@ -140,9 +147,9 @@ class ClientChannelTest : public CppUnit::TestCase
void testConsumePublished() {
Message pubMsg("x");
pubMsg.getHeaders().setString("y", "z");
- channel.getBasic().publish(pubMsg, exchange, qname);
+ channel.publish(pubMsg, exchange, qname);
string tag;
- channel.getBasic().consume(queue, tag, &listener);
+ channel.consume(queue, tag, &listener);
CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
channel.start();
{
@@ -155,8 +162,40 @@ class ClientChannelTest : public CppUnit::TestCase
listener.messages[0].getHeaders().getString("y"));
}
+ void testGetFragmentedMessage() {
+ string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size.
+ channel.publish(Message(longStr), exchange, qname);
+ // FIXME aconway 2007-03-21: Remove couts.
+ cout << "==== Fragmented publish:" << endl
+ << connection.conversation << endl;
+ Message getMsg;
+ cout << "==== Fragmented get:" << endl
+ << connection.conversation << endl;
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
+ }
-
+ void testConsumeFragmentedMessage() {
+ string xx(FRAME_MAX*2, 'x');
+ channel.publish(Message(xx), exchange, qname);
+ cout << "==== Fragmented publish:" << endl
+ << connection.conversation << endl;
+ channel.start();
+ string tag;
+ channel.consume(queue, tag, &listener);
+ string yy(FRAME_MAX*2, 'y');
+ channel.publish(Message(yy), exchange, qname);
+ {
+ Mutex::ScopedLock l(listener.monitor);
+ while (listener.messages.size() != 2)
+ CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
+ }
+ // FIXME aconway 2007-03-21:
+ cout << "==== Fragmented consme 2 messages:" << endl
+ << connection.conversation << endl;
+
+ CPPUNIT_ASSERT_EQUAL(xx, listener.messages[0].getData());
+ CPPUNIT_ASSERT_EQUAL(yy, listener.messages[1].getData());
+ }
};
// Make this test suite a plugin.
diff --git a/qpid/cpp/tests/InProcessBroker.h b/qpid/cpp/tests/InProcessBroker.h
index 709ca9b953..833b821d11 100644
--- a/qpid/cpp/tests/InProcessBroker.h
+++ b/qpid/cpp/tests/InProcessBroker.h
@@ -145,25 +145,30 @@ std::ostream& operator<<(
return out;
}
+} // namespace broker
-}} // namespace qpid::broker
-
+namespace client {
/** An in-process client+broker all in one. */
-class InProcessBrokerClient : public qpid::client::Connection {
+class InProcessBrokerClient : public client::Connection {
public:
- qpid::broker::InProcessBroker broker;
- qpid::broker::InProcessBroker::Conversation& conversation;
+ broker::InProcessBroker broker;
+ broker::InProcessBroker::Conversation& conversation;
/** Constructor creates broker and opens client connection. */
- InProcessBrokerClient(qpid::framing::ProtocolVersion version=
- qpid::framing::highestProtocolVersion
- ) : broker(version), conversation(broker.conversation)
+ InProcessBrokerClient(
+ u_int32_t max_frame_size=65536,
+ framing::ProtocolVersion version= framing::highestProtocolVersion
+ ) : client::Connection(false, max_frame_size, version),
+ broker(version),
+ conversation(broker.conversation)
{
setConnector(broker);
open("");
}
-
- ~InProcessBrokerClient() {}
};
+
+}} // namespace qpid::client
+
+
#endif // _tests_InProcessBroker_h
diff --git a/qpid/cpp/tests/ProducerConsumerTest.cpp b/qpid/cpp/tests/ProducerConsumerTest.cpp
index e6d4090596..1f2aeffbc5 100644
--- a/qpid/cpp/tests/ProducerConsumerTest.cpp
+++ b/qpid/cpp/tests/ProducerConsumerTest.cpp
@@ -30,8 +30,9 @@
#include "AMQP_HighestVersion.h"
#include "sys/AtomicCount.h"
-using namespace qpid::sys;
-using namespace qpid::framing;
+using namespace qpid;
+using namespace sys;
+using namespace framing;
using namespace boost;
using namespace std;
@@ -99,7 +100,7 @@ class ProducerConsumerTest : public CppUnit::TestCase
CPPUNIT_TEST_SUITE_END();
public:
- InProcessBrokerClient client;
+ client::InProcessBrokerClient client;
ProducerConsumer pc;
WatchedCounter stopped;
@@ -166,7 +167,7 @@ class ProducerConsumerTest : public CppUnit::TestCase
}
public:
- ProducerConsumerTest() : client(highestProtocolVersion) {}
+ ProducerConsumerTest() : client() {}
void testProduceConsume() {
ConsumeRunnable runMe(*this);
diff --git a/qpid/cpp/tests/client_test.cpp b/qpid/cpp/tests/client_test.cpp
index 413523a6a7..92952c69b1 100644
--- a/qpid/cpp/tests/client_test.cpp
+++ b/qpid/cpp/tests/client_test.cpp
@@ -102,7 +102,7 @@ int main(int argc, char**)
Monitor monitor;
SimpleListener listener(&monitor);
string tag("MyTag");
- channel.getBasic().consume(queue, tag, &listener);
+ channel.consume(queue, tag, &listener);
if (verbose) std::cout << "Registered consumer." << std::endl;
//we need to enable the message dispatching for this channel
@@ -115,7 +115,7 @@ int main(int argc, char**)
Message msg;
string data("MyMessage");
msg.setData(data);
- channel.getBasic().publish(msg, exchange, "MyTopic");
+ channel.publish(msg, exchange, "MyTopic");
if (verbose) std::cout << "Published message: " << data << std::endl;
{
diff --git a/qpid/cpp/tests/echo_service.cpp b/qpid/cpp/tests/echo_service.cpp
index 412ffbeb58..ff11a336fe 100644
--- a/qpid/cpp/tests/echo_service.cpp
+++ b/qpid/cpp/tests/echo_service.cpp
@@ -116,7 +116,7 @@ int main(int argc, char** argv){
//Consume from the response queue, logging all echoed message to console:
LoggingListener listener;
std::string tag;
- channel.getBasic().consume(response, tag, &listener);
+ channel.consume(response, tag, &listener);
//Process incoming requests on a new thread
channel.start();
@@ -129,7 +129,7 @@ int main(int argc, char** argv){
Message msg;
msg.getHeaders().setString("RESPONSE_QUEUE", response.getName());
msg.setData(text);
- channel.getBasic().publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service);
+ channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service);
std::cout << "Enter text to send:" << std::endl;
}
@@ -158,10 +158,10 @@ int main(int argc, char** argv){
//Consume from the request queue, echoing back all messages received to the client that sent them
EchoServer server(&channel);
std::string tag = "server_tag";
- channel.getBasic().consume(request, tag, &server);
+ channel.consume(request, tag, &server);
//Process incoming requests on the main thread
- channel.getBasic().run();
+ channel.run();
connection.close();
} catch(qpid::QpidError error) {
@@ -184,7 +184,7 @@ void EchoServer::received(Message& message)
std::cout << "Echoing " << message.getData() << " back to " << name << std::endl;
//'echo' the message back:
- channel->getBasic().publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name);
+ channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name);
}
}
diff --git a/qpid/cpp/tests/topic_listener.cpp b/qpid/cpp/tests/topic_listener.cpp
index 5f5500f7b9..5928dac49a 100644
--- a/qpid/cpp/tests/topic_listener.cpp
+++ b/qpid/cpp/tests/topic_listener.cpp
@@ -119,9 +119,9 @@ int main(int argc, char** argv){
//set up listener
Listener listener(&channel, response.getName(), args.getTransactional());
string tag;
- channel.getBasic().consume(control, tag, &listener, args.getAckMode());
+ channel.consume(control, tag, &listener, args.getAckMode());
cout << "topic_listener: Consuming." << endl;
- channel.getBasic().run();
+ channel.run();
connection.close();
cout << "topic_listener: normal exit" << endl;
return 0;
@@ -166,7 +166,7 @@ void Listener::report(){
<< time/TIME_MSEC << " ms.";
Message msg(reportstr.str());
msg.getHeaders().setString("TYPE", "REPORT");
- channel->getBasic().publish(msg, string(), responseQueue);
+ channel->publish(msg, string(), responseQueue);
if(transactional){
channel->commit();
}
diff --git a/qpid/cpp/tests/topic_publisher.cpp b/qpid/cpp/tests/topic_publisher.cpp
index 0e6c63ab35..2fd1e6b810 100644
--- a/qpid/cpp/tests/topic_publisher.cpp
+++ b/qpid/cpp/tests/topic_publisher.cpp
@@ -129,7 +129,7 @@ int main(int argc, char** argv) {
//set up listener
Publisher publisher(&channel, "topic_control", args.getTransactional());
std::string tag("mytag");
- channel.getBasic().consume(response, tag, &publisher, args.getAckMode());
+ channel.consume(response, tag, &publisher, args.getAckMode());
channel.start();
int batchSize(args.getBatches());
@@ -187,12 +187,13 @@ int64_t Publisher::publish(int msgs, int listeners, int size){
{
Monitor::ScopedLock l(monitor);
for(int i = 0; i < msgs; i++){
- channel->getBasic().publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+ channel->publish(
+ msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
}
//send report request
Message reportRequest;
reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- channel->getBasic().publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+ channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
if(transactional){
channel->commit();
}
@@ -216,7 +217,7 @@ void Publisher::terminate(){
//send termination request
Message terminationRequest;
terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
- channel->getBasic().publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+ channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
if(transactional){
channel->commit();
}