diff options
Diffstat (limited to 'cpp/test')
-rw-r--r-- | cpp/test/client/client_test.cpp | 97 | ||||
-rw-r--r-- | cpp/test/client/topic_listener.cpp | 180 | ||||
-rw-r--r-- | cpp/test/client/topic_publisher.cpp | 253 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/ChannelTest.cpp | 162 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/ExchangeTest.cpp | 65 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/HeadersExchangeTest.cpp | 112 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/Makefile | 20 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/MessageTest.cpp | 54 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/QueueRegistryTest.cpp | 76 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/QueueTest.cpp | 176 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/RouterTest.cpp | 86 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/TopicExchangeTest.cpp | 184 | ||||
-rw-r--r-- | cpp/test/unit/qpid/broker/ValueTest.cpp | 96 | ||||
-rw-r--r-- | cpp/test/unit/qpid/framing/BodyHandlerTest.cpp | 103 | ||||
-rw-r--r-- | cpp/test/unit/qpid/framing/Makefile | 21 | ||||
-rw-r--r-- | cpp/test/unit/qpid/framing/field_table_test.cpp | 52 | ||||
-rw-r--r-- | cpp/test/unit/qpid/framing/framing_test.cpp | 144 | ||||
-rw-r--r-- | cpp/test/unit/qpid/framing/header_test.cpp | 141 |
18 files changed, 2022 insertions, 0 deletions
diff --git a/cpp/test/client/client_test.cpp b/cpp/test/client/client_test.cpp new file mode 100644 index 0000000000..b899d494d7 --- /dev/null +++ b/cpp/test/client/client_test.cpp @@ -0,0 +1,97 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> + +#include "QpidError.h" +#include "Channel.h" +#include "Connection.h" +#include "FieldTable.h" +#include "Message.h" +#include "MessageListener.h" + +#include "MonitorImpl.h" + + +using namespace qpid::client; +using namespace qpid::concurrent; + +class SimpleListener : public virtual MessageListener{ + Monitor* monitor; + +public: + inline SimpleListener(Monitor* _monitor) : monitor(_monitor){} + + inline virtual void received(Message& /*msg*/){ + std::cout << "Received message " /**<< msg **/<< std::endl; + monitor->acquire(); + monitor->notify(); + monitor->release(); + } +}; + +int main(int argc, char**) +{ + try{ + Connection con(argc > 1); + Channel channel; + Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE); + Queue queue("MyQueue", true); + + string host("localhost"); + + con.open(host); + std::cout << "Opened connection." << std::endl; + con.openChannel(&channel); + std::cout << "Opened channel." << std::endl; + channel.declareExchange(exchange); + std::cout << "Declared exchange." << std::endl; + channel.declareQueue(queue); + std::cout << "Declared queue." << std::endl; + qpid::framing::FieldTable args; + channel.bind(exchange, queue, "MyTopic", args); + std::cout << "Bound queue to exchange." << std::endl; + + //set up a message listener + MonitorImpl monitor; + SimpleListener listener(&monitor); + string tag("MyTag"); + channel.consume(queue, tag, &listener); + channel.start(); + std::cout << "Registered consumer." << std::endl; + + Message msg; + string data("MyMessage"); + msg.setData(data); + channel.publish(msg, exchange, "MyTopic"); + std::cout << "Published message." << std::endl; + + monitor.acquire(); + monitor.wait(); + monitor.release(); + + + con.closeChannel(&channel); + std::cout << "Closed channel." << std::endl; + con.close(); + std::cout << "Closed connection." << std::endl; + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + return 1; + } + return 0; +} diff --git a/cpp/test/client/topic_listener.cpp b/cpp/test/client/topic_listener.cpp new file mode 100644 index 0000000000..a1b8e383a0 --- /dev/null +++ b/cpp/test/client/topic_listener.cpp @@ -0,0 +1,180 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include <sstream> +#include "apr_time.h" +#include "QpidError.h" +#include "Channel.h" +#include "Connection.h" +#include "Exchange.h" +#include "MessageListener.h" +#include "Queue.h" + +using namespace qpid::client; + +class Listener : public MessageListener{ + Channel* const channel; + const std::string responseQueue; + const bool transactional; + bool init; + int count; + apr_time_t start; + + void shutdown(); + void report(); +public: + Listener(Channel* channel, const std::string& reponseQueue, bool tx); + virtual void received(Message& msg); +}; + +class Args{ + string host; + int port; + int ackMode; + bool transactional; + int prefetch; + bool trace; + bool help; +public: + inline Args() : host("localhost"), port(5672), ackMode(NO_ACK), transactional(false), prefetch(1000), trace(false), help(false){} + void parse(int argc, char** argv); + void usage(); + + inline const string& getHost() const { return host;} + inline int getPort() const { return port; } + inline int getAckMode(){ return ackMode; } + inline bool getTransactional() const { return transactional; } + inline int getPrefetch(){ return prefetch; } + inline bool getTrace() const { return trace; } + inline bool getHelp() const { return help; } +}; + +int main(int argc, char** argv){ + Args args; + args.parse(argc, argv); + if(args.getHelp()){ + args.usage(); + }else{ + try{ + Connection connection(args.getTrace()); + connection.open(args.getHost(), args.getPort()); + Channel channel(args.getTransactional(), args.getPrefetch()); + connection.openChannel(&channel); + + //declare exchange, queue and bind them: + Queue response("response"); + channel.declareQueue(response); + + Queue control; + channel.declareQueue(control); + qpid::framing::FieldTable bindArgs; + channel.bind(Exchange::DEFAULT_TOPIC_EXCHANGE, control, "topic_control", bindArgs); + //set up listener + Listener listener(&channel, response.getName(), args.getTransactional()); + std::string tag; + channel.consume(control, tag, &listener, args.getAckMode()); + channel.run(); + connection.close(); + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } + } +} + +Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) : + channel(_channel), responseQueue(_responseq), transactional(tx), init(false), count(0){} + +void Listener::received(Message& message){ + if(!init){ + start = apr_time_as_msec(apr_time_now()); + count = 0; + init = true; + } + std::string type(message.getHeaders().getString("TYPE")); + + if(type == "TERMINATION_REQUEST"){ + shutdown(); + }else if(type == "REPORT_REQUEST"){ + //send a report: + report(); + init = false; + }else if (++count % 100 == 0){ + std::cout <<"Received " << count << " messages." << std::endl; + } +} + +void Listener::shutdown(){ + channel->close(); +} + +void Listener::report(){ + apr_time_t finish = apr_time_as_msec(apr_time_now()); + apr_time_t time = finish - start; + std::stringstream reportstr; + reportstr << "Received " << count << " messages in " << time << " ms."; + Message msg; + msg.setData(reportstr.str()); + channel->publish(msg, string(), responseQueue); + if(transactional){ + channel->commit(); + } +} + + +void Args::parse(int argc, char** argv){ + for(int i = 1; i < argc; i++){ + string name(argv[i]); + if("-help" == name){ + help = true; + break; + }else if("-host" == name){ + host = argv[++i]; + }else if("-port" == name){ + port = atoi(argv[++i]); + }else if("-ack_mode" == name){ + ackMode = atoi(argv[++i]); + }else if("-transactional" == name){ + transactional = true; + }else if("-prefetch" == name){ + prefetch = atoi(argv[++i]); + }else if("-trace" == name){ + trace = true; + }else{ + std::cout << "Warning: unrecognised option " << name << std::endl; + } + } +} + +void Args::usage(){ + std::cout << "Options:" << std::endl; + std::cout << " -help" << std::endl; + std::cout << " Prints this usage message" << std::endl; + std::cout << " -host <host>" << std::endl; + std::cout << " Specifies host to connect to (default is localhost)" << std::endl; + std::cout << " -port <port>" << std::endl; + std::cout << " Specifies port to conect to (default is 5762)" << std::endl; + std::cout << " -ack_mode <mode>" << std::endl; + std::cout << " Sets the acknowledgement mode" << std::endl; + std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl; + std::cout << " -transactional" << std::endl; + std::cout << " Indicates the client should use transactions" << std::endl; + std::cout << " -prefetch <count>" << std::endl; + std::cout << " Specifies the prefetch count (default is 1000)" << std::endl; + std::cout << " -trace" << std::endl; + std::cout << " Indicates that the frames sent and received should be logged" << std::endl; +} diff --git a/cpp/test/client/topic_publisher.cpp b/cpp/test/client/topic_publisher.cpp new file mode 100644 index 0000000000..fc6b7f3b30 --- /dev/null +++ b/cpp/test/client/topic_publisher.cpp @@ -0,0 +1,253 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include <cstdlib> +#include "unistd.h" +#include "apr_time.h" +#include "MonitorImpl.h" +#include "QpidError.h" +#include "Channel.h" +#include "Connection.h" +#include "Exchange.h" +#include "MessageListener.h" +#include "Queue.h" + +using namespace qpid::client; +using namespace qpid::concurrent; + +class Publisher : public MessageListener{ + Channel* const channel; + const std::string controlTopic; + const bool transactional; + MonitorImpl monitor; + int count; + + void waitForCompletion(int msgs); + string generateData(int size); + +public: + Publisher(Channel* channel, const std::string& controlTopic, bool tx); + virtual void received(Message& msg); + apr_time_t publish(int msgs, int listeners, int size); + void terminate(); +}; + +class Args{ + string host; + int port; + int messages; + int subscribers; + int ackMode; + bool transactional; + int prefetch; + int batches; + int delay; + int size; + bool trace; + bool help; +public: + inline Args() : host("localhost"), port(5672), messages(1000), subscribers(1), + ackMode(NO_ACK), transactional(false), prefetch(1000), batches(1), + delay(0), size(256), trace(false), help(false){} + + void parse(int argc, char** argv); + void usage(); + + inline const string& getHost() const { return host;} + inline int getPort() const { return port; } + inline int getMessages() const { return messages; } + inline int getSubscribers() const { return subscribers; } + inline int getAckMode(){ return ackMode; } + inline bool getTransactional() const { return transactional; } + inline int getPrefetch(){ return prefetch; } + inline int getBatches(){ return batches; } + inline int getDelay(){ return delay; } + inline int getSize(){ return size; } + inline bool getTrace() const { return trace; } + inline bool getHelp() const { return help; } +}; + +int main(int argc, char** argv){ + Args args; + args.parse(argc, argv); + if(args.getHelp()){ + args.usage(); + }else{ + try{ + Connection connection(args.getTrace()); + connection.open(args.getHost(), args.getPort()); + Channel channel(args.getTransactional(), args.getPrefetch()); + connection.openChannel(&channel); + + //declare queue (relying on default binding): + Queue response("response"); + channel.declareQueue(response); + + //set up listener + Publisher publisher(&channel, "topic_control", args.getTransactional()); + std::string tag("mytag"); + channel.consume(response, tag, &publisher, args.getAckMode()); + channel.start(); + + int batchSize(args.getBatches()); + apr_time_t max(0); + apr_time_t min(0); + apr_time_t sum(0); + for(int i = 0; i < batchSize; i++){ + if(i > 0 && args.getDelay()) sleep(args.getDelay()); + apr_time_t time = publisher.publish(args.getMessages(), args.getSubscribers(), args.getSize()); + if(!max || time > max) max = time; + if(!min || time < min) min = time; + sum += time; + std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << time << "ms" << std::endl; + } + publisher.terminate(); + apr_time_t avg = sum / batchSize; + if(batchSize > 1){ + std::cout << batchSize << " batches completed. avg=" << avg << + ", max=" << max << ", min=" << min << std::endl; + } + channel.close(); + connection.close(); + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } + } +} + +Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) : + channel(_channel), controlTopic(_controlTopic), transactional(tx){} + +void Publisher::received(Message& msg){ + //count responses and when all are received end the current batch + monitor.acquire(); + if(--count == 0){ + monitor.notify(); + } + std::cout << "Received report: " << msg.getData() << " (" << count << " remaining)." << std::endl; + monitor.release(); +} + +void Publisher::waitForCompletion(int msgs){ + count = msgs; + monitor.wait(); +} + +apr_time_t Publisher::publish(int msgs, int listeners, int size){ + monitor.acquire(); + Message msg; + msg.setData(generateData(size)); + apr_time_t start(apr_time_as_msec(apr_time_now())); + for(int i = 0; i < msgs; i++){ + channel->publish(msg, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); + } + //send report request + Message reportRequest; + reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); + channel->publish(reportRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); + if(transactional){ + channel->commit(); + } + + waitForCompletion(listeners); + monitor.release(); + apr_time_t finish(apr_time_as_msec(apr_time_now())); + + return finish - start; +} + +string Publisher::generateData(int size){ + string data; + for(int i = 0; i < size; i++){ + data += ('A' + (i / 26)); + } + return data; +} + +void Publisher::terminate(){ + //send termination request + Message terminationRequest; + terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); + channel->publish(terminationRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); + if(transactional){ + channel->commit(); + } +} + +void Args::parse(int argc, char** argv){ + for(int i = 1; i < argc; i++){ + string name(argv[i]); + if("-help" == name){ + help = true; + break; + }else if("-host" == name){ + host = argv[++i]; + }else if("-port" == name){ + port = atoi(argv[++i]); + }else if("-messages" == name){ + messages = atoi(argv[++i]); + }else if("-subscribers" == name){ + subscribers = atoi(argv[++i]); + }else if("-ack_mode" == name){ + ackMode = atoi(argv[++i]); + }else if("-transactional" == name){ + transactional = true; + }else if("-prefetch" == name){ + prefetch = atoi(argv[++i]); + }else if("-batches" == name){ + batches = atoi(argv[++i]); + }else if("-delay" == name){ + delay = atoi(argv[++i]); + }else if("-size" == name){ + size = atoi(argv[++i]); + }else if("-trace" == name){ + trace = true; + }else{ + std::cout << "Warning: unrecognised option " << name << std::endl; + } + } +} + +void Args::usage(){ + std::cout << "Options:" << std::endl; + std::cout << " -help" << std::endl; + std::cout << " Prints this usage message" << std::endl; + std::cout << " -host <host>" << std::endl; + std::cout << " Specifies host to connect to (default is localhost)" << std::endl; + std::cout << " -port <port>" << std::endl; + std::cout << " Specifies port to conect to (default is 5762)" << std::endl; + std::cout << " -messages <count>" << std::endl; + std::cout << " Specifies how many messages to send" << std::endl; + std::cout << " -subscribers <count>" << std::endl; + std::cout << " Specifies how many subscribers to expect reports from" << std::endl; + std::cout << " -ack_mode <mode>" << std::endl; + std::cout << " Sets the acknowledgement mode" << std::endl; + std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl; + std::cout << " -transactional" << std::endl; + std::cout << " Indicates the client should use transactions" << std::endl; + std::cout << " -prefetch <count>" << std::endl; + std::cout << " Specifies the prefetch count (default is 1000)" << std::endl; + std::cout << " -batches <count>" << std::endl; + std::cout << " Specifies how many batches to run" << std::endl; + std::cout << " -delay <seconds>" << std::endl; + std::cout << " Causes a delay between each batch" << std::endl; + std::cout << " -size <bytes>" << std::endl; + std::cout << " Sets the size of the published messages (default is 256 bytes)" << std::endl; + std::cout << " -trace" << std::endl; + std::cout << " Indicates that the frames sent and received should be logged" << std::endl; +} diff --git a/cpp/test/unit/qpid/broker/ChannelTest.cpp b/cpp/test/unit/qpid/broker/ChannelTest.cpp new file mode 100644 index 0000000000..8d54208f77 --- /dev/null +++ b/cpp/test/unit/qpid/broker/ChannelTest.cpp @@ -0,0 +1,162 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "../../src/qpid/broker/Channel.h" +#include "../../src/qpid/broker/Message.h" +#include <qpid_test_plugin.h> +#include <iostream> +#include <memory> + +using namespace std::tr1; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::concurrent; + +struct DummyRouter{ + Message::shared_ptr last; + + void operator()(Message::shared_ptr& msg){ + last = msg; + } +}; + +struct DummyHandler : OutputHandler{ + std::vector<AMQFrame*> frames; + + virtual void send(AMQFrame* frame){ + frames.push_back(frame); + } +}; + + +class ChannelTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ChannelTest); + CPPUNIT_TEST(testIncoming); + CPPUNIT_TEST(testConsumerMgmt); + CPPUNIT_TEST(testDeliveryNoAck); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testIncoming(){ + Channel channel(0, 0, 10000); + string routingKey("my_routing_key"); + channel.handlePublish(new Message(0, "test", routingKey, false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + string data1("abcdefg"); + string data2("hijklmn"); + AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); + AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); + + CPPUNIT_ASSERT(!channel.handleHeader(header, DummyRouter()).last); + CPPUNIT_ASSERT(!channel.handleContent(part1, DummyRouter()).last); + DummyRouter router = channel.handleContent(part2, DummyRouter()); + CPPUNIT_ASSERT(router.last); + CPPUNIT_ASSERT_EQUAL(routingKey, router.last->getRoutingKey()); + } + + void testConsumerMgmt(){ + Queue::shared_ptr queue(new Queue("my_queue")); + Channel channel(0, 0, 0); + CPPUNIT_ASSERT(!channel.exists("my_consumer")); + + ConnectionToken* owner; + string tag("my_consumer"); + channel.consume(tag, queue, false, false, owner); + string tagA; + string tagB; + channel.consume(tagA, queue, false, false, owner); + channel.consume(tagB, queue, false, false, owner); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 3, queue->getConsumerCount()); + CPPUNIT_ASSERT(channel.exists("my_consumer")); + CPPUNIT_ASSERT(channel.exists(tagA)); + CPPUNIT_ASSERT(channel.exists(tagB)); + channel.cancel(tagA); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 2, queue->getConsumerCount()); + CPPUNIT_ASSERT(channel.exists("my_consumer")); + CPPUNIT_ASSERT(!channel.exists(tagA)); + CPPUNIT_ASSERT(channel.exists(tagB)); + channel.close(); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, queue->getConsumerCount()); + } + + void testDeliveryNoAck(){ + DummyHandler handler; + Channel channel(&handler, 7, 10000); + + Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + msg->setHeader(header); + AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn")); + msg->addContent(body); + + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner(0); + string tag("no_ack"); + channel.consume(tag, queue, false, false, owner); + + queue->deliver(msg); + CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); + BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); + AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); + AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); + CPPUNIT_ASSERT(deliver); + CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData()); + } + + void testDeliveryAndRecovery(){ + DummyHandler handler; + Channel channel(&handler, 7, 10000); + + Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + msg->setHeader(header); + AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn")); + msg->addContent(body); + + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner; + string tag("ack"); + channel.consume(tag, queue, true, false, owner); + + queue->deliver(msg); + CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); + BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); + AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); + AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); + CPPUNIT_ASSERT(deliver); + CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ChannelTest); diff --git a/cpp/test/unit/qpid/broker/ExchangeTest.cpp b/cpp/test/unit/qpid/broker/ExchangeTest.cpp new file mode 100644 index 0000000000..3d21cb66f0 --- /dev/null +++ b/cpp/test/unit/qpid/broker/ExchangeTest.cpp @@ -0,0 +1,65 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "../../src/qpid/broker/DirectExchange.h" +#include "../../src/qpid/broker/Exchange.h" +#include "../../src/qpid/broker/Queue.h" +#include "../../src/qpid/broker/TopicExchange.h" +#include <qpid_test_plugin.h> +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::concurrent; + +class ExchangeTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ExchangeTest); + CPPUNIT_TEST(testMe); + CPPUNIT_TEST_SUITE_END(); + + public: + + // TODO aconway 2006-09-12: Need more detailed tests. + + void testMe() + { + Queue::shared_ptr queue(new Queue("queue", true, true)); + Queue::shared_ptr queue2(new Queue("queue2", true, true)); + + TopicExchange topic("topic"); + topic.bind(queue, "abc", 0); + topic.bind(queue2, "abc", 0); + + DirectExchange direct("direct"); + direct.bind(queue, "abc", 0); + direct.bind(queue2, "abc", 0); + + queue.reset(); + queue2.reset(); + + Message::shared_ptr msg = Message::shared_ptr(new Message(0, "e", "A", true, true)); + topic.route(msg, "abc", 0); + direct.route(msg, "abc", 0); + + // TODO aconway 2006-09-12: TODO Why no assertions? + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ExchangeTest); diff --git a/cpp/test/unit/qpid/broker/HeadersExchangeTest.cpp b/cpp/test/unit/qpid/broker/HeadersExchangeTest.cpp new file mode 100644 index 0000000000..d8fe71350e --- /dev/null +++ b/cpp/test/unit/qpid/broker/HeadersExchangeTest.cpp @@ -0,0 +1,112 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "../../src/qpid/broker/HeadersExchange.h" +#include "../../src/qpid/framing/FieldTable.h" +#include "../../src/qpid/framing/Value.h" +#include <qpid_test_plugin.h> + +using namespace qpid::broker; +using namespace qpid::framing; + +class HeadersExchangeTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(HeadersExchangeTest); + CPPUNIT_TEST(testMatchAll); + CPPUNIT_TEST(testMatchAny); + CPPUNIT_TEST(testMatchEmptyValue); + CPPUNIT_TEST(testMatchEmptyArgs); + CPPUNIT_TEST(testMatchNoXMatch); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testMatchAll() + { + FieldTable b, m; + b.setString("x-match", "all"); + b.setString("foo", "FOO"); + b.setInt("n", 42); + m.setString("foo", "FOO"); + m.setInt("n", 42); + CPPUNIT_ASSERT(HeadersExchange::match(b, m)); + + // Ignore extras. + m.setString("extra", "x"); + CPPUNIT_ASSERT(HeadersExchange::match(b, m)); + + // Fail mismatch, wrong value. + m.setString("foo", "NotFoo"); + CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + + // Fail mismatch, missing value + m.erase("foo"); + CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + } + + void testMatchAny() + { + FieldTable b, m; + b.setString("x-match", "any"); + b.setString("foo", "FOO"); + b.setInt("n", 42); + m.setString("foo", "FOO"); + CPPUNIT_ASSERT(HeadersExchange::match(b, m)); + m.erase("foo"); + CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + m.setInt("n", 42); + CPPUNIT_ASSERT(HeadersExchange::match(b, m)); + } + + void testMatchEmptyValue() + { + FieldTable b, m; + b.setString("x-match", "all"); + b.getMap()["foo"] = FieldTable::ValuePtr(new EmptyValue()); + b.getMap()["n"] = FieldTable::ValuePtr(new EmptyValue()); + CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + m.setString("foo", "blah"); + m.setInt("n", 123); + } + + void testMatchEmptyArgs() + { + FieldTable b, m; + m.setString("foo", "FOO"); + + b.setString("x-match", "all"); + CPPUNIT_ASSERT(HeadersExchange::match(b, m)); + b.setString("x-match", "any"); + CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + } + + + void testMatchNoXMatch() + { + FieldTable b, m; + b.setString("foo", "FOO"); + m.setString("foo", "FOO"); + CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + } + + +}; + +// make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(HeadersExchangeTest); diff --git a/cpp/test/unit/qpid/broker/Makefile b/cpp/test/unit/qpid/broker/Makefile new file mode 100644 index 0000000000..172ce564bf --- /dev/null +++ b/cpp/test/unit/qpid/broker/Makefile @@ -0,0 +1,20 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +QPID_HOME = ../../.. +LDLIBS=-lapr-1 -lcppunit $(COMMON_LIB) $(BROKER_LIB) +include ${QPID_HOME}/cpp/test_plugins.mk + diff --git a/cpp/test/unit/qpid/broker/MessageTest.cpp b/cpp/test/unit/qpid/broker/MessageTest.cpp new file mode 100644 index 0000000000..f6586f721a --- /dev/null +++ b/cpp/test/unit/qpid/broker/MessageTest.cpp @@ -0,0 +1,54 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "../../src/qpid/concurrent/APRBase.h" +#include "../../src/qpid/broker/Message.h" +#include <qpid_test_plugin.h> +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::concurrent; + +class MessageTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(MessageTest); + CPPUNIT_TEST(testMe); + CPPUNIT_TEST_SUITE_END(); + + public: + + // TODO aconway 2006-09-12: Need more detailed tests, + // need tests to assert something! + // + void testMe() + { + APRBase::increment(); + const int size(10); + for(int i = 0; i < size; i++){ + Message::shared_ptr msg = Message::shared_ptr(new Message(0, "A", "B", true, true)); + msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody())); + msg->addContent(AMQContentBody::shared_ptr(new AMQContentBody())); + msg.reset(); + } + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(MessageTest); + diff --git a/cpp/test/unit/qpid/broker/QueueRegistryTest.cpp b/cpp/test/unit/qpid/broker/QueueRegistryTest.cpp new file mode 100644 index 0000000000..96fbc88069 --- /dev/null +++ b/cpp/test/unit/qpid/broker/QueueRegistryTest.cpp @@ -0,0 +1,76 @@ +#include "../../src/qpid/broker/QueueRegistry.h" +#include <qpid_test_plugin.h> +#include <string> + +using namespace qpid::broker; + +class QueueRegistryTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(QueueRegistryTest); + CPPUNIT_TEST(testDeclare); + CPPUNIT_TEST(testDeclareTmp); + CPPUNIT_TEST(testFind); + CPPUNIT_TEST(testDestroy); + CPPUNIT_TEST_SUITE_END(); + + private: + std::string foo, bar; + QueueRegistry reg; + std::pair<Queue::shared_ptr, bool> qc; + + public: + void setUp() { + foo = "foo"; + bar = "bar"; + } + + void testDeclare() { + qc = reg.declare(foo, false, 0, 0); + Queue::shared_ptr q = qc.first; + CPPUNIT_ASSERT(q); + CPPUNIT_ASSERT(qc.second); // New queue + CPPUNIT_ASSERT_EQUAL(foo, q->getName()); + + qc = reg.declare(foo, false, 0, 0); + CPPUNIT_ASSERT_EQUAL(q, qc.first); + CPPUNIT_ASSERT(!qc.second); + + qc = reg.declare(bar, false, 0, 0); + q = qc.first; + CPPUNIT_ASSERT(q); + CPPUNIT_ASSERT_EQUAL(true, qc.second); + CPPUNIT_ASSERT_EQUAL(bar, q->getName()); + } + + void testDeclareTmp() + { + qc = reg.declare(std::string(), false, 0, 0); + CPPUNIT_ASSERT(qc.second); + CPPUNIT_ASSERT_EQUAL(std::string("tmp_1"), qc.first->getName()); + } + + void testFind() { + CPPUNIT_ASSERT(reg.find(foo) == 0); + + reg.declare(foo, false, 0, 0); + reg.declare(bar, false, 0, 0); + Queue::shared_ptr q = reg.find(bar); + CPPUNIT_ASSERT(q); + CPPUNIT_ASSERT_EQUAL(bar, q->getName()); + } + + void testDestroy() { + qc = reg.declare(foo, false, 0, 0); + reg.destroy(foo); + // Queue is gone from the registry. + CPPUNIT_ASSERT(reg.find(foo) == 0); + // Queue is not actually destroyed till we drop our reference. + CPPUNIT_ASSERT_EQUAL(foo, qc.first->getName()); + // We shoud be the only reference. + CPPUNIT_ASSERT_EQUAL(1L, qc.first.use_count()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(QueueRegistryTest); diff --git a/cpp/test/unit/qpid/broker/QueueTest.cpp b/cpp/test/unit/qpid/broker/QueueTest.cpp new file mode 100644 index 0000000000..fa6344c265 --- /dev/null +++ b/cpp/test/unit/qpid/broker/QueueTest.cpp @@ -0,0 +1,176 @@ + /* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "../../src/qpid/broker/Queue.h" +#include "../../src/qpid/broker/QueueRegistry.h" +#include <qpid_test_plugin.h> +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::concurrent; + + +class TestBinding : public virtual Binding{ + bool cancelled; + +public: + TestBinding(); + virtual void cancel(); + bool isCancelled(); +}; + +class TestConsumer : public virtual Consumer{ +public: + Message::shared_ptr last; + + virtual bool deliver(Message::shared_ptr& msg); +}; + + +class QueueTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(QueueTest); + CPPUNIT_TEST(testConsumers); + CPPUNIT_TEST(testBinding); + CPPUNIT_TEST(testRegistry); + CPPUNIT_TEST(testDequeue); + CPPUNIT_TEST_SUITE_END(); + + public: + void testConsumers(){ + Queue::shared_ptr queue(new Queue("my_queue", true, true)); + + //Test adding consumers: + TestConsumer c1; + TestConsumer c2; + queue->consume(&c1); + queue->consume(&c2); + + CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount()); + + //Test basic delivery: + Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); + Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); + Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); + + queue->deliver(msg1); + CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); + + queue->deliver(msg2); + CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get()); + + queue->deliver(msg3); + CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get()); + + //Test cancellation: + queue->cancel(&c1); + CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount()); + queue->cancel(&c2); + CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount()); + } + + void testBinding(){ + Queue::shared_ptr queue(new Queue("my_queue", true, true)); + //Test bindings: + TestBinding a; + TestBinding b; + queue->bound(&a); + queue->bound(&b); + + queue.reset(); + + CPPUNIT_ASSERT(a.isCancelled()); + CPPUNIT_ASSERT(b.isCancelled()); + } + + void testRegistry(){ + //Test use of queues in registry: + QueueRegistry registry; + registry.declare("queue1", true, true); + registry.declare("queue2", true, true); + registry.declare("queue3", true, true); + + CPPUNIT_ASSERT(registry.find("queue1")); + CPPUNIT_ASSERT(registry.find("queue2")); + CPPUNIT_ASSERT(registry.find("queue3")); + + registry.destroy("queue1"); + registry.destroy("queue2"); + registry.destroy("queue3"); + + CPPUNIT_ASSERT(!registry.find("queue1")); + CPPUNIT_ASSERT(!registry.find("queue2")); + CPPUNIT_ASSERT(!registry.find("queue3")); + } + + void testDequeue(){ + Queue::shared_ptr queue(new Queue("my_queue", true, true)); + + Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); + Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); + Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); + Message::shared_ptr received; + + queue->deliver(msg1); + queue->deliver(msg2); + queue->deliver(msg3); + + CPPUNIT_ASSERT_EQUAL(u_int32_t(3), queue->getMessageCount()); + + received = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get()); + CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getMessageCount()); + + received = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get()); + CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getMessageCount()); + + TestConsumer consumer; + queue->consume(&consumer); + queue->dispatch(); + CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get()); + CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount()); + + received = queue->dequeue(); + CPPUNIT_ASSERT(!received); + CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount()); + + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest); + +//TestBinding +TestBinding::TestBinding() : cancelled(false) {} + +void TestBinding::cancel(){ + CPPUNIT_ASSERT(!cancelled); + cancelled = true; +} + +bool TestBinding::isCancelled(){ + return cancelled; +} + +//TestConsumer +bool TestConsumer::deliver(Message::shared_ptr& msg){ + last = msg; + return true; +} + diff --git a/cpp/test/unit/qpid/broker/RouterTest.cpp b/cpp/test/unit/qpid/broker/RouterTest.cpp new file mode 100644 index 0000000000..7b77162905 --- /dev/null +++ b/cpp/test/unit/qpid/broker/RouterTest.cpp @@ -0,0 +1,86 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "../../src/qpid/broker/Channel.h" +#include "../../src/qpid/broker/Exchange.h" +#include "../../src/qpid/broker/ExchangeRegistry.h" +#include "../../src/qpid/broker/Message.h" +#include "../../src/qpid/broker/Router.h" +#include <qpid_test_plugin.h> +#include <iostream> +#include <memory> + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::concurrent; + +struct TestExchange : public Exchange{ + Message::shared_ptr msg; + string routingKey; + FieldTable* args; + + TestExchange() : Exchange("test"), args(0) {} + + void bind(Queue::shared_ptr /*queue*/, const string& /*routingKey*/, FieldTable* /*args*/){} + + void unbind(Queue::shared_ptr /*queue*/, const string& /*routingKey*/, FieldTable* /*args*/){ } + + void route(Message::shared_ptr& _msg, const string& _routingKey, FieldTable* _args){ + msg = _msg; + routingKey = _routingKey; + args = _args; + } +}; + +class RouterTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(RouterTest); + CPPUNIT_TEST(test); + CPPUNIT_TEST_SUITE_END(); + + public: + + void test() + { + ExchangeRegistry registry; + TestExchange* exchange = new TestExchange(); + registry.declare(exchange); + + string routingKey("my_routing_key"); + string name("name"); + string value("value"); + Message::shared_ptr msg(new Message(0, "test", routingKey, false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + + dynamic_cast<BasicHeaderProperties*>(header->getProperties())->getHeaders().setString(name, value); + msg->setHeader(header); + + Router router(registry); + router(msg); + + CPPUNIT_ASSERT(exchange->msg); + CPPUNIT_ASSERT_EQUAL(msg, exchange->msg); + CPPUNIT_ASSERT_EQUAL(routingKey, exchange->msg->getRoutingKey()); + CPPUNIT_ASSERT_EQUAL(routingKey, exchange->routingKey); + CPPUNIT_ASSERT_EQUAL(value, exchange->args->getString(name)); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(RouterTest); + diff --git a/cpp/test/unit/qpid/broker/TopicExchangeTest.cpp b/cpp/test/unit/qpid/broker/TopicExchangeTest.cpp new file mode 100644 index 0000000000..4066a4709f --- /dev/null +++ b/cpp/test/unit/qpid/broker/TopicExchangeTest.cpp @@ -0,0 +1,184 @@ +#include "../../src/qpid/broker/TopicExchange.h" +#include <qpid_test_plugin.h> + +using namespace qpid::broker; + +Tokens makeTokens(char** begin, char** end) +{ + Tokens t; + t.insert(t.end(), begin, end); + return t; +} + +// Calculate size of an array. +#define LEN(a) (sizeof(a)/sizeof(a[0])) + +// Convert array to token vector +#define TOKENS(a) makeTokens(a, a + LEN(a)) + +// Allow CPPUNIT_EQUALS to print a Tokens. +// TODO aconway 2006-09-19: Make it a template and put it in a shared test lib. +// +CppUnit::OStringStream& operator <<(CppUnit::OStringStream& out, const Tokens& v) +{ + out << "[ "; + for (Tokens::const_iterator i = v.begin(); + i != v.end(); ++i) + { + out << '"' << *i << '"' << (i+1 == v.end() ? "]" : ", "); + } + return out; +} + + +class TokensTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(TokensTest); + CPPUNIT_TEST(testTokens); + CPPUNIT_TEST_SUITE_END(); + + public: + void testTokens() + { + Tokens tokens("hello.world"); + char* expect[] = {"hello", "world"}; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect), tokens); + + tokens = "a.b.c"; + char* expect2[] = { "a", "b", "c" }; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect2), tokens); + + tokens = ""; + CPPUNIT_ASSERT(tokens.empty()); + + tokens = "x"; + char* expect3[] = { "x" }; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect3), tokens); + + tokens = (".x"); + char* expect4[] = { "", "x" }; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect4), tokens); + + tokens = ("x."); + char* expect5[] = { "x", "" }; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect5), tokens); + + tokens = ("."); + char* expect6[] = { "", "" }; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect6), tokens); + + tokens = (".."); + char* expect7[] = { "", "", "" }; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect7), tokens); + } + +}; + +#define ASSERT_NORMALIZED(expect, pattern) \ + CPPUNIT_ASSERT_EQUAL(Tokens(expect), static_cast<Tokens>(TopicPattern(pattern))) +class TopicPatternTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(TopicPatternTest); + CPPUNIT_TEST(testNormalize); + CPPUNIT_TEST(testPlain); + CPPUNIT_TEST(testStar); + CPPUNIT_TEST(testHash); + CPPUNIT_TEST(testMixed); + CPPUNIT_TEST(testCombo); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testNormalize() + { + CPPUNIT_ASSERT(TopicPattern("").empty()); + ASSERT_NORMALIZED("a.b.c", "a.b.c"); + ASSERT_NORMALIZED("a.*.c", "a.*.c"); + ASSERT_NORMALIZED("#", "#"); + ASSERT_NORMALIZED("#", "#.#.#.#"); + ASSERT_NORMALIZED("*.*.*.#", "#.*.#.*.#.#.*"); + ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*.#"); + ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*"); + } + + void testPlain() { + TopicPattern p("ab.cd.e"); + CPPUNIT_ASSERT(p.match("ab.cd.e")); + CPPUNIT_ASSERT(!p.match("abx.cd.e")); + CPPUNIT_ASSERT(!p.match("ab.cd")); + CPPUNIT_ASSERT(!p.match("ab.cd..e.")); + CPPUNIT_ASSERT(!p.match("ab.cd.e.")); + CPPUNIT_ASSERT(!p.match(".ab.cd.e")); + + p = ""; + CPPUNIT_ASSERT(p.match("")); + + p = "."; + CPPUNIT_ASSERT(p.match(".")); + } + + + void testStar() + { + TopicPattern p("a.*.b"); + CPPUNIT_ASSERT(p.match("a.xx.b")); + CPPUNIT_ASSERT(!p.match("a.b")); + + p = "*.x"; + CPPUNIT_ASSERT(p.match("y.x")); + CPPUNIT_ASSERT(p.match(".x")); + CPPUNIT_ASSERT(!p.match("x")); + + p = "x.x.*"; + CPPUNIT_ASSERT(p.match("x.x.y")); + CPPUNIT_ASSERT(p.match("x.x.")); + CPPUNIT_ASSERT(!p.match("x.x")); + CPPUNIT_ASSERT(!p.match("q.x.y")); + } + + void testHash() + { + TopicPattern p("a.#.b"); + CPPUNIT_ASSERT(p.match("a.b")); + CPPUNIT_ASSERT(p.match("a.x.b")); + CPPUNIT_ASSERT(p.match("a..x.y.zz.b")); + CPPUNIT_ASSERT(!p.match("a.b.")); + CPPUNIT_ASSERT(!p.match("q.x.b")); + + p = "a.#"; + CPPUNIT_ASSERT(p.match("a")); + CPPUNIT_ASSERT(p.match("a.b")); + CPPUNIT_ASSERT(p.match("a.b.c")); + + p = "#.a"; + CPPUNIT_ASSERT(p.match("a")); + CPPUNIT_ASSERT(p.match("x.y.a")); + } + + void testMixed() + { + TopicPattern p("*.x.#.y"); + CPPUNIT_ASSERT(p.match("a.x.y")); + CPPUNIT_ASSERT(p.match("a.x.p.qq.y")); + CPPUNIT_ASSERT(!p.match("a.a.x.y")); + CPPUNIT_ASSERT(!p.match("aa.x.b.c")); + + p = "a.#.b.*"; + CPPUNIT_ASSERT(p.match("a.b.x")); + CPPUNIT_ASSERT(p.match("a.x.x.x.b.x")); + } + + void testCombo() { + TopicPattern p("*.#.#.*.*.#"); + CPPUNIT_ASSERT(p.match("x.y.z")); + CPPUNIT_ASSERT(p.match("x.y.z.a.b.c")); + CPPUNIT_ASSERT(!p.match("x.y")); + CPPUNIT_ASSERT(!p.match("x")); + } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TopicPatternTest); +CPPUNIT_TEST_SUITE_REGISTRATION(TokensTest); diff --git a/cpp/test/unit/qpid/broker/ValueTest.cpp b/cpp/test/unit/qpid/broker/ValueTest.cpp new file mode 100644 index 0000000000..89444c38cf --- /dev/null +++ b/cpp/test/unit/qpid/broker/ValueTest.cpp @@ -0,0 +1,96 @@ +#include "../../src/qpid/framing/Value.h" +#include <qpid_test_plugin.h> + +using namespace qpid::framing; + + +// Allow CPPUNIT_EQUALS to print a Tokens. +// TODO aconway 2006-09-19: Make it a template and put it in a shared test lib. +// +template <class T> +CppUnit::OStringStream& operator <<(CppUnit::OStringStream& out, + const ValueOps<T>& v) +{ + out << v.getValue(); + return out; +} + + +class ValueTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ValueTest); + CPPUNIT_TEST(testStringValueEquals); + CPPUNIT_TEST(testIntegerValueEquals); + CPPUNIT_TEST(testDecimalValueEquals); + CPPUNIT_TEST(testFieldTableValueEquals); + CPPUNIT_TEST_SUITE_END(); + + StringValue s; + IntegerValue i; + DecimalValue d; + FieldTableValue ft; + EmptyValue e; + + public: + ValueTest() : + s("abc"), + i(42), + d(1234,2) + + { + ft.getValue().setString("foo", "FOO"); + ft.getValue().setInt("magic", 7); + } + + void testStringValueEquals() + { + + CPPUNIT_ASSERT(StringValue("abc") == s); + CPPUNIT_ASSERT(s != StringValue("foo")); + CPPUNIT_ASSERT(s != e); + CPPUNIT_ASSERT(e != d); + CPPUNIT_ASSERT(e != ft); + } + + void testIntegerValueEquals() + { + CPPUNIT_ASSERT(IntegerValue(42) == i); + CPPUNIT_ASSERT(IntegerValue(5) != i); + CPPUNIT_ASSERT(i != e); + CPPUNIT_ASSERT(i != d); + } + + void testDecimalValueEquals() + { + CPPUNIT_ASSERT(DecimalValue(1234, 2) == d); + CPPUNIT_ASSERT(DecimalValue(12345, 2) != d); + CPPUNIT_ASSERT(DecimalValue(1234, 3) != d); + CPPUNIT_ASSERT(d != s); + } + + + void testFieldTableValueEquals() + { + CPPUNIT_ASSERT_EQUAL(std::string("FOO"), + ft.getValue().getString("foo")); + CPPUNIT_ASSERT_EQUAL(7, ft.getValue().getInt("magic")); + + FieldTableValue f2; + CPPUNIT_ASSERT(ft != f2); + f2.getValue().setString("foo", "FOO"); + CPPUNIT_ASSERT(ft != f2); + f2.getValue().setInt("magic", 7); + CPPUNIT_ASSERT_EQUAL(ft,f2); + CPPUNIT_ASSERT(ft == f2); + f2.getValue().setString("foo", "BAR"); + CPPUNIT_ASSERT(ft != f2); + CPPUNIT_ASSERT(ft != i); + } + +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ValueTest); + diff --git a/cpp/test/unit/qpid/framing/BodyHandlerTest.cpp b/cpp/test/unit/qpid/framing/BodyHandlerTest.cpp new file mode 100644 index 0000000000..6b011743c0 --- /dev/null +++ b/cpp/test/unit/qpid/framing/BodyHandlerTest.cpp @@ -0,0 +1,103 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "../../src/qpid/framing/amqp_framing.h" +#include "qpid_test_plugin.h" +using namespace qpid::framing; + +class BodyHandlerTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(BodyHandlerTest); + CPPUNIT_TEST(testMethod); + CPPUNIT_TEST(testHeader); + CPPUNIT_TEST(testContent); + CPPUNIT_TEST(testHeartbeat); + CPPUNIT_TEST_SUITE_END(); +private: + + class TestBodyHandler : public BodyHandler{ + AMQMethodBody* const method; + AMQHeaderBody* const header; + AMQContentBody* const content; + AMQHeartbeatBody* const heartbeat; + + public: + + TestBodyHandler(AMQMethodBody* _method) : method(_method), header(0), content(0), heartbeat(0){} + TestBodyHandler(AMQHeaderBody* _header) : method(0), header(_header), content(0), heartbeat(0){} + TestBodyHandler(AMQContentBody* _content) : method(0), header(0), content(_content), heartbeat(0){} + TestBodyHandler(AMQHeartbeatBody* _heartbeat) : method(0), header(0), content(0), heartbeat(_heartbeat){} + + virtual void handleMethod(AMQMethodBody::shared_ptr body){ + CPPUNIT_ASSERT(method); + CPPUNIT_ASSERT_EQUAL(method, body.get()); + } + virtual void handleHeader(AMQHeaderBody::shared_ptr body){ + CPPUNIT_ASSERT(header); + CPPUNIT_ASSERT_EQUAL(header, body.get()); + } + virtual void handleContent(AMQContentBody::shared_ptr body){ + CPPUNIT_ASSERT(content); + CPPUNIT_ASSERT_EQUAL(content, body.get()); + } + virtual void handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ + CPPUNIT_ASSERT(heartbeat); + CPPUNIT_ASSERT_EQUAL(heartbeat, body.get()); + } + }; + +public: + + void testMethod() + { + AMQMethodBody* method = new QueueDeclareBody(); + AMQFrame frame(0, method); + TestBodyHandler handler(method); + handler.handleBody(frame.getBody()); + } + + void testHeader() + { + AMQHeaderBody* header = new AMQHeaderBody(); + AMQFrame frame(0, header); + TestBodyHandler handler(header); + handler.handleBody(frame.getBody()); + } + + void testContent() + { + AMQContentBody* content = new AMQContentBody(); + AMQFrame frame(0, content); + TestBodyHandler handler(content); + handler.handleBody(frame.getBody()); + } + + void testHeartbeat() + { + AMQHeartbeatBody* heartbeat = new AMQHeartbeatBody(); + AMQFrame frame(0, heartbeat); + TestBodyHandler handler(heartbeat); + handler.handleBody(frame.getBody()); + } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(BodyHandlerTest); + diff --git a/cpp/test/unit/qpid/framing/Makefile b/cpp/test/unit/qpid/framing/Makefile new file mode 100644 index 0000000000..487b8d537b --- /dev/null +++ b/cpp/test/unit/qpid/framing/Makefile @@ -0,0 +1,21 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +QPID_HOME = ../../../.. +LDLIBS=-lapr-1 -lcppunit $(COMMON_LIB) +INCLUDES=$(TEST_INCLUDES) -I ../generated +include ${QPID_HOME}/cpp/test_plugins.mk + diff --git a/cpp/test/unit/qpid/framing/field_table_test.cpp b/cpp/test/unit/qpid/framing/field_table_test.cpp new file mode 100644 index 0000000000..8f2fe61611 --- /dev/null +++ b/cpp/test/unit/qpid/framing/field_table_test.cpp @@ -0,0 +1,52 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "../../src/qpid/framing/amqp_framing.h" +#include <qpid_test_plugin.h> + +using namespace qpid::framing; + +class FieldTableTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(FieldTableTest); + CPPUNIT_TEST(testMe); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testMe() + { + FieldTable ft; + ft.setString("A", "BCDE"); + CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft.getString("A")); + + Buffer buffer(100); + buffer.putFieldTable(ft); + buffer.flip(); + FieldTable ft2; + buffer.getFieldTable(ft2); + CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft2.getString("A")); + + } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(FieldTableTest); + diff --git a/cpp/test/unit/qpid/framing/framing_test.cpp b/cpp/test/unit/qpid/framing/framing_test.cpp new file mode 100644 index 0000000000..40ce3d97ed --- /dev/null +++ b/cpp/test/unit/qpid/framing/framing_test.cpp @@ -0,0 +1,144 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "../../src/qpid/framing/amqp_framing.h" +#include "ConnectionRedirectBody.h" +#include <iostream> +#include <sstream> +#include <qpid_test_plugin.h> +#include <typeinfo> + +using namespace qpid::framing; + +// TODO aconway 2006-09-12: Why do we need explicit qpid::framing:: below? + +template <class T> +std::string tostring(const T& x) +{ + std::ostringstream out; + out << x; + return out.str(); +} + +class FramingTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(FramingTest); + CPPUNIT_TEST(testBasicQosBody); + CPPUNIT_TEST(testConnectionSecureBody); + CPPUNIT_TEST(testConnectionRedirectBody); + CPPUNIT_TEST(testAccessRequestBody); + CPPUNIT_TEST(testBasicConsumeBody); + CPPUNIT_TEST(ConnectionRedirectBody); + CPPUNIT_TEST(BasicConsumeOkBody); + CPPUNIT_TEST_SUITE_END(); + + private: + Buffer buffer; + + public: + + FramingTest() : buffer(100) {} + + void testBasicQosBody() + { + BasicQosBody in(0xCAFEBABE, 0xABBA, true); + in.encodeContent(buffer); + buffer.flip(); + BasicQosBody out; + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testConnectionSecureBody() + { + std::string s = "security credential"; + ConnectionSecureBody in(s); + in.encodeContent(buffer); + buffer.flip(); + ConnectionSecureBody out; + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testConnectionRedirectBody() + { + std::string a = "hostA"; + std::string b = "hostB"; + qpid::framing::ConnectionRedirectBody in(a, b); + in.encodeContent(buffer); + buffer.flip(); + qpid::framing::ConnectionRedirectBody out; + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testAccessRequestBody() + { + std::string s = "text"; + AccessRequestBody in(s, true, false, true, false, true); + in.encodeContent(buffer); + buffer.flip(); + AccessRequestBody out; + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testBasicConsumeBody() + { + std::string q = "queue"; + std::string t = "tag"; + BasicConsumeBody in(0, q, t, false, true, false, false); + in.encodeContent(buffer); + buffer.flip(); + BasicConsumeBody out; + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + + void ConnectionRedirectBody() + { + std::string a = "hostA"; + std::string b = "hostB"; + AMQFrame in(999, new qpid::framing::ConnectionRedirectBody(a, b)); + in.encode(buffer); + buffer.flip(); + AMQFrame out; + out.decode(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void BasicConsumeOkBody() + { + std::string s = "hostA"; + AMQFrame in(999, new qpid::framing::BasicConsumeOkBody(s)); + in.encode(buffer); + buffer.flip(); + AMQFrame out; + for(int i = 0; i < 5; i++){ + out.decode(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(FramingTest); + + + diff --git a/cpp/test/unit/qpid/framing/header_test.cpp b/cpp/test/unit/qpid/framing/header_test.cpp new file mode 100644 index 0000000000..9268c54272 --- /dev/null +++ b/cpp/test/unit/qpid/framing/header_test.cpp @@ -0,0 +1,141 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "../../src/qpid/framing/amqp_framing.h" +#include <qpid_test_plugin.h> + +using namespace qpid::framing; + +class HeaderTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(HeaderTest); + CPPUNIT_TEST(testGenericProperties); + CPPUNIT_TEST(testAllSpecificProperties); + CPPUNIT_TEST(testSomeSpecificProperties); + CPPUNIT_TEST_SUITE_END(); + +public: + + // TODO aconway 2006-09-12: Need more detailed tests, + // need tests to assert something! + // + void testGenericProperties() + { + AMQHeaderBody body(BASIC); + dynamic_cast<BasicHeaderProperties*>(body.getProperties())->getHeaders().setString("A", "BCDE"); + Buffer buffer(100); + + body.encode(buffer); + buffer.flip(); + AMQHeaderBody body2; + body2.decode(buffer, body.size()); + BasicHeaderProperties* props = + dynamic_cast<BasicHeaderProperties*>(body2.getProperties()); + CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), + props->getHeaders().getString("A")); + } + + void testAllSpecificProperties(){ + string contentType("text/html"); + string contentEncoding("UTF8"); + u_int8_t deliveryMode(2); + u_int8_t priority(3); + string correlationId("abc"); + string replyTo("no-address"); + string expiration("why is this a string?"); + string messageId("xyz"); + u_int64_t timestamp(0xabcd); + string type("eh?"); + string userId("guest"); + string appId("just testing"); + string clusterId("no clustering required"); + + AMQHeaderBody body(BASIC); + BasicHeaderProperties* properties = + dynamic_cast<BasicHeaderProperties*>(body.getProperties()); + properties->setContentType(contentType); + properties->getHeaders().setString("A", "BCDE"); + properties->setDeliveryMode(deliveryMode); + properties->setPriority(priority); + properties->setCorrelationId(correlationId); + properties->setReplyTo(replyTo); + properties->setExpiration(expiration); + properties->setMessageId(messageId); + properties->setTimestamp(timestamp); + properties->setType(type); + properties->setUserId(userId); + properties->setAppId(appId); + properties->setClusterId(clusterId); + + Buffer buffer(10000); + body.encode(buffer); + buffer.flip(); + AMQHeaderBody temp; + temp.decode(buffer, body.size()); + properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); + + CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); + CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), properties->getHeaders().getString("A")); + CPPUNIT_ASSERT_EQUAL(deliveryMode, properties->getDeliveryMode()); + CPPUNIT_ASSERT_EQUAL(priority, properties->getPriority()); + CPPUNIT_ASSERT_EQUAL(correlationId, properties->getCorrelationId()); + CPPUNIT_ASSERT_EQUAL(replyTo, properties->getReplyTo()); + CPPUNIT_ASSERT_EQUAL(expiration, properties->getExpiration()); + CPPUNIT_ASSERT_EQUAL(messageId, properties->getMessageId()); + CPPUNIT_ASSERT_EQUAL(timestamp, properties->getTimestamp()); + CPPUNIT_ASSERT_EQUAL(type, properties->getType()); + CPPUNIT_ASSERT_EQUAL(userId, properties->getUserId()); + CPPUNIT_ASSERT_EQUAL(appId, properties->getAppId()); + CPPUNIT_ASSERT_EQUAL(clusterId, properties->getClusterId()); + } + + void testSomeSpecificProperties(){ + string contentType("application/octet-stream"); + u_int8_t deliveryMode(5); + u_int8_t priority(6); + string expiration("Z"); + u_int64_t timestamp(0xabe4a34a); + + AMQHeaderBody body(BASIC); + BasicHeaderProperties* properties = + dynamic_cast<BasicHeaderProperties*>(body.getProperties()); + properties->setContentType(contentType); + properties->setDeliveryMode(deliveryMode); + properties->setPriority(priority); + properties->setExpiration(expiration); + properties->setTimestamp(timestamp); + + Buffer buffer(100); + body.encode(buffer); + buffer.flip(); + AMQHeaderBody temp; + temp.decode(buffer, body.size()); + properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); + + CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); + CPPUNIT_ASSERT_EQUAL((int) deliveryMode, (int) properties->getDeliveryMode()); + CPPUNIT_ASSERT_EQUAL((int) priority, (int) properties->getPriority()); + CPPUNIT_ASSERT_EQUAL(expiration, properties->getExpiration()); + CPPUNIT_ASSERT_EQUAL(timestamp, properties->getTimestamp()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(HeaderTest); + |