diff options
author | Andrew Stitcher <astitcher@apache.org> | 2007-04-02 11:40:48 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2007-04-02 11:40:48 +0000 |
commit | 16e203a0d32df9829bcf4fb738ef89fc94404155 (patch) | |
tree | b5dbb15f4a238ca377236ce16140443e20ed3e4a /cpp/src/tests | |
parent | fb410c63d08e87019b3d2a8d85820ae809758f62 (diff) | |
download | qpid-python-16e203a0d32df9829bcf4fb738ef89fc94404155.tar.gz |
Fix for the most disruptive items in QPID-243.
* All #include lines now use '""' rather than '<>' where appropriate.
* #include lines within the qpid project use relative includes so that
the same path will work in /usr/include when installed as part of the
client libraries.
* All the source code has now been rearranged to be under src in a directory
analogous to the namespace of the classes in it.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@524769 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
49 files changed, 5894 insertions, 0 deletions
diff --git a/cpp/src/tests/.vg-supp b/cpp/src/tests/.vg-supp new file mode 100644 index 0000000000..b5abdf1385 --- /dev/null +++ b/cpp/src/tests/.vg-supp @@ -0,0 +1,18 @@ +{ + <insert a suppression name here> + Memcheck:Leak + fun:_Znwj + fun:_ZN4qpid6broker17ReferenceRegistry4openERKSs + fun:_ZN13ReferenceTestC1Ev + fun:_ZN7CppUnit25ConcretTestFixtureFactoryI13ReferenceTestE11makeFixtureEv + fun:_ZNK7CppUnit27TestSuiteBuilderContextBase15makeTestFixtureEv + fun:_ZNK7CppUnit23TestSuiteBuilderContextI13ReferenceTestE11makeFixtureEv + fun:_ZN13ReferenceTest15addTestsToSuiteERN7CppUnit27TestSuiteBuilderContextBaseE + fun:_ZN13ReferenceTest5suiteEv + fun:_ZN7CppUnit16TestSuiteFactoryI13ReferenceTestE8makeTestEv + fun:_ZN7CppUnit19TestFactoryRegistry14addTestToSuiteEPNS_9TestSuiteE + fun:_ZN7CppUnit19TestFactoryRegistry8makeTestEv + obj:/usr/bin/DllPlugInTester + obj:/usr/bin/DllPlugInTester + fun:(below main) +} diff --git a/cpp/src/tests/APRBaseTest.cpp b/cpp/src/tests/APRBaseTest.cpp new file mode 100644 index 0000000000..5ed8bf1918 --- /dev/null +++ b/cpp/src/tests/APRBaseTest.cpp @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../sys/apr/APRBase.h" +#include "qpid_test_plugin.h" +#include <iostream> + +using namespace qpid::sys; + +class APRBaseTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(APRBaseTest); + CPPUNIT_TEST(testMe); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testMe() + { + APRBase::increment(); + APRBase::increment(); + APRBase::decrement(); + APRBase::decrement(); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(APRBaseTest); + diff --git a/cpp/src/tests/AccumulatedAckTest.cpp b/cpp/src/tests/AccumulatedAckTest.cpp new file mode 100644 index 0000000000..56870209fe --- /dev/null +++ b/cpp/src/tests/AccumulatedAckTest.cpp @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../broker/AccumulatedAck.h" +#include "qpid_test_plugin.h" +#include <iostream> +#include <list> + +using std::list; +using namespace qpid::broker; + +class AccumulatedAckTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(AccumulatedAckTest); + CPPUNIT_TEST(testGeneral); + CPPUNIT_TEST(testCovers); + CPPUNIT_TEST(testUpdateAndConsolidate); + CPPUNIT_TEST_SUITE_END(); + + public: + void testGeneral() + { + AccumulatedAck ack(0); + ack.clear(); + ack.update(3,3); + ack.update(7,7); + ack.update(9,9); + ack.update(1,2); + ack.update(4,5); + ack.update(6,6); + + for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(ack.covers(i)); + CPPUNIT_ASSERT(ack.covers(9)); + + CPPUNIT_ASSERT(!ack.covers(8)); + CPPUNIT_ASSERT(!ack.covers(10)); + + ack.consolidate(); + + for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(ack.covers(i)); + CPPUNIT_ASSERT(ack.covers(9)); + + CPPUNIT_ASSERT(!ack.covers(8)); + CPPUNIT_ASSERT(!ack.covers(10)); + } + + void testCovers() + { + AccumulatedAck ack(5); + ack.individual.push_back(7); + ack.individual.push_back(9); + + CPPUNIT_ASSERT(ack.covers(1)); + CPPUNIT_ASSERT(ack.covers(2)); + CPPUNIT_ASSERT(ack.covers(3)); + CPPUNIT_ASSERT(ack.covers(4)); + CPPUNIT_ASSERT(ack.covers(5)); + CPPUNIT_ASSERT(ack.covers(7)); + CPPUNIT_ASSERT(ack.covers(9)); + + CPPUNIT_ASSERT(!ack.covers(6)); + CPPUNIT_ASSERT(!ack.covers(8)); + CPPUNIT_ASSERT(!ack.covers(10)); + } + + void testUpdateAndConsolidate() + { + AccumulatedAck ack(0); + ack.update(1, 1); + ack.update(3, 3); + ack.update(10, 10); + ack.update(8, 8); + ack.update(6, 6); + ack.update(3, 3); + ack.update(2, 2); + ack.update(0, 5); + ack.consolidate(); + CPPUNIT_ASSERT_EQUAL((uint64_t) 6, ack.range); + CPPUNIT_ASSERT_EQUAL((size_t) 2, ack.individual.size()); + list<uint64_t>::iterator i = ack.individual.begin(); + CPPUNIT_ASSERT_EQUAL((uint64_t) 8, *i); + i++; + CPPUNIT_ASSERT_EQUAL((uint64_t) 10, *i); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(AccumulatedAckTest); + diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp new file mode 100644 index 0000000000..006391bbd2 --- /dev/null +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -0,0 +1,357 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../broker/BrokerChannel.h" +#include "../broker/BrokerMessage.h" +#include "../broker/BrokerQueue.h" +#include "../broker/FanOutExchange.h" +#include "../broker/NullMessageStore.h" +#include "qpid_test_plugin.h" +#include <iostream> +#include <memory> +#include "AMQP_HighestVersion.h" +#include "../framing/AMQFrame.h" +#include "MockChannel.h" +#include "../broker/Connection.h" +#include "../framing/ProtocolInitiation.h" +#include <boost/ptr_container/ptr_vector.hpp> + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; +using std::string; +using std::queue; + +struct MockHandler : ConnectionOutputHandler{ + boost::ptr_vector<AMQFrame> frames; + + void send(AMQFrame* frame){ frames.push_back(frame); } + + void close() {}; +}; + + +class BrokerChannelTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(BrokerChannelTest); + CPPUNIT_TEST(testConsumerMgmt); + CPPUNIT_TEST(testDeliveryNoAck); + CPPUNIT_TEST(testDeliveryAndRecovery); + CPPUNIT_TEST(testStaging); + CPPUNIT_TEST(testQueuePolicy); + CPPUNIT_TEST_SUITE_END(); + + Broker::shared_ptr broker; + Connection connection; + MockHandler handler; + + class MockMessageStore : public NullMessageStore + { + struct MethodCall + { + const string name; + PersistableMessage* msg; + const string data;//only needed for appendContent + + void check(const MethodCall& other) const + { + CPPUNIT_ASSERT_EQUAL(name, other.name); + CPPUNIT_ASSERT_EQUAL(msg, other.msg); + CPPUNIT_ASSERT_EQUAL(data, other.data); + } + }; + + queue<MethodCall> expected; + bool expectMode;//true when setting up expected calls + + void handle(const MethodCall& call) + { + if (expectMode) { + expected.push(call); + } else { + call.check(expected.front()); + expected.pop(); + } + } + + void handle(const string& name, PersistableMessage* msg, const string& data) + { + MethodCall call = {name, msg, data}; + handle(call); + } + + public: + + MockMessageStore() : expectMode(false) {} + + void stage(PersistableMessage& msg) + { + if(!expectMode) msg.setPersistenceId(1); + MethodCall call = {"stage", &msg, ""}; + handle(call); + } + + void appendContent(PersistableMessage& msg, const string& data) + { + MethodCall call = {"appendContent", &msg, data}; + handle(call); + } + + // Don't hide overloads. + using NullMessageStore::destroy; + + void destroy(PersistableMessage& msg) + { + MethodCall call = {"destroy", &msg, ""}; + handle(call); + } + + void expect() + { + expectMode = true; + } + + void test() + { + expectMode = false; + } + + void check() + { + CPPUNIT_ASSERT(expected.empty()); + } + }; + + + public: + + BrokerChannelTest() : + broker(Broker::create()), + connection(&handler, *broker) + { + connection.initiated(ProtocolInitiation()); + } + + + void testConsumerMgmt(){ + Queue::shared_ptr queue(new Queue("my_queue")); + Channel channel(connection, 0, 0, 0); + channel.open(); + CPPUNIT_ASSERT(!channel.exists("my_consumer")); + + ConnectionToken* owner = 0; + string tag("my_consumer"); + channel.consume(tag, queue, false, false, owner); + string tagA; + string tagB; + channel.consume(tagA, queue, false, false, owner); + channel.consume(tagB, queue, false, false, owner); + CPPUNIT_ASSERT_EQUAL((uint32_t) 3, queue->getConsumerCount()); + CPPUNIT_ASSERT(channel.exists("my_consumer")); + CPPUNIT_ASSERT(channel.exists(tagA)); + CPPUNIT_ASSERT(channel.exists(tagB)); + channel.cancel(tagA); + CPPUNIT_ASSERT_EQUAL((uint32_t) 2, queue->getConsumerCount()); + CPPUNIT_ASSERT(channel.exists("my_consumer")); + CPPUNIT_ASSERT(!channel.exists(tagA)); + CPPUNIT_ASSERT(channel.exists(tagB)); + channel.close(); + CPPUNIT_ASSERT_EQUAL((uint32_t) 0, queue->getConsumerCount()); + } + + void testDeliveryNoAck(){ + Channel channel(connection, 7, 10000); + channel.open(); + const string data("abcdefghijklmn"); + Message::shared_ptr msg( + createMessage("test", "my_routing_key", "my_message_id", 14)); + addContent(msg, data); + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner(0); + string tag("no_ack"); + channel.consume(tag, queue, false, false, owner); + + queue->deliver(msg); + CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel()); + CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( + handler.frames[0].getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( + handler.frames[1].getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( + handler.frames[2].getBody().get())); + AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( + handler.frames[3].getBody().get()); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + } + + void testDeliveryAndRecovery(){ + Channel channel(connection, 7, 10000); + channel.open(); + const string data("abcdefghijklmn"); + + Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); + addContent(msg, data); + + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner(0); + string tag("ack"); + channel.consume(tag, queue, true, false, owner); + + queue->deliver(msg); + CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[1].getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[2].getChannel()); + CPPUNIT_ASSERT_EQUAL(ChannelId(7), handler.frames[3].getChannel()); + CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( + handler.frames[0].getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<BasicDeliverBody*>( + handler.frames[1].getBody().get())); + CPPUNIT_ASSERT(dynamic_cast<AMQHeaderBody*>( + handler.frames[2].getBody().get())); + AMQContentBody* contentBody = dynamic_cast<AMQContentBody*>( + handler.frames[3].getBody().get()); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + } + + void testStaging(){ + MockMessageStore store; + Channel channel( + connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); + const string data[] = {"abcde", "fghij", "klmno"}; + + Message* msg = new BasicMessage( + 0, "my_exchange", "my_routing_key", false, false, + MockChannel::basicGetBody()); + + store.expect(); + store.stage(*msg); + for (int i = 0; i < 3; i++) { + store.appendContent(*msg, data[i]); + } + store.destroy(*msg); + store.test(); + + Exchange::shared_ptr exchange = + broker->getExchanges().declare("my_exchange", "fanout").first; + Queue::shared_ptr queue(new Queue("my_queue")); + exchange->bind(queue, "", 0); + + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + uint64_t contentSize(0); + for (int i = 0; i < 3; i++) { + contentSize += data[i].size(); + } + header->setContentSize(contentSize); + channel.handlePublish(msg); + channel.handleHeader(header); + + for (int i = 0; i < 3; i++) { + AMQContentBody::shared_ptr body(new AMQContentBody(data[i])); + channel.handleContent(body); + } + Message::shared_ptr msg2 = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg, msg2.get()); + msg2.reset();//should trigger destroy call + + store.check(); + } + + + //NOTE: strictly speaking this should/could be part of QueueTest, + //but as it can usefully use the same utility classes as this + //class it is defined here for simpllicity + void testQueuePolicy() + { + MockMessageStore store; + {//must ensure that store is last thing deleted as it is needed by destructor of lazy loaded content + const string data1("abcd"); + const string data2("efghijk"); + const string data3("lmnopqrstuvwxyz"); + Message::shared_ptr msg1(createMessage("e", "A", "MsgA", data1.size())); + Message::shared_ptr msg2(createMessage("e", "B", "MsgB", data2.size())); + Message::shared_ptr msg3(createMessage("e", "C", "MsgC", data3.size())); + addContent(msg1, data1); + addContent(msg2, data2); + addContent(msg3, data3); + + QueuePolicy policy(2, 0);//third message should be stored on disk and lazy loaded + FieldTable settings; + policy.update(settings); + + store.expect(); + store.stage(*msg3); + store.destroy(*msg3); + store.test(); + + Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0)); + queue->configure(settings);//set policy + queue->deliver(msg1); + queue->deliver(msg2); + queue->deliver(msg3); + + Message::shared_ptr next = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg1, next); + CPPUNIT_ASSERT_EQUAL((uint32_t) data1.size(), next->encodedContentSize()); + next = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg2, next); + CPPUNIT_ASSERT_EQUAL((uint32_t) data2.size(), next->encodedContentSize()); + next = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg3, next); + CPPUNIT_ASSERT_EQUAL((uint32_t) 0, next->encodedContentSize()); + + next.reset(); + msg1.reset(); + msg2.reset(); + msg3.reset();//must clear all references to messages to allow them to be destroyed + + } + store.check(); + } + + Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) + { + BasicMessage* msg = new BasicMessage( + 0, exchange, routingKey, false, false, + MockChannel::basicGetBody()); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(contentSize); + msg->setHeader(header); + msg->getHeaderProperties()->setMessageId(messageId); + return msg; + } + + void addContent(Message::shared_ptr msg, const string& data) + { + AMQContentBody::shared_ptr body(new AMQContentBody(data)); + msg->addContent(body); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(BrokerChannelTest); diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp new file mode 100644 index 0000000000..458931c4f4 --- /dev/null +++ b/cpp/src/tests/ClientChannelTest.cpp @@ -0,0 +1,193 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <vector> +#include "qpid_test_plugin.h" +#include "InProcessBroker.h" +#include "../client/ClientChannel.h" +#include "../client/ClientMessage.h" +#include "../client/ClientQueue.h" +#include "../client/ClientExchange.h" +#include "../client/MessageListener.h" + +using namespace std; +using namespace boost; +using namespace qpid::client; +using namespace qpid::sys; +using namespace qpid::framing; + +/// Small frame size so we can create fragmented messages. +const size_t FRAME_MAX = 256; + + +/** + * Test client API using an in-process broker. + */ +class ClientChannelTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ClientChannelTest); + CPPUNIT_TEST(testPublishGet); + CPPUNIT_TEST(testGetNoContent); + CPPUNIT_TEST(testConsumeCancel); + CPPUNIT_TEST(testConsumePublished); + CPPUNIT_TEST(testGetFragmentedMessage); + CPPUNIT_TEST(testConsumeFragmentedMessage); + CPPUNIT_TEST_SUITE_END(); + + struct Listener: public qpid::client::MessageListener { + vector<Message> messages; + Monitor monitor; + void received(Message& msg) { + Mutex::ScopedLock l(monitor); + messages.push_back(msg); + monitor.notifyAll(); + } + }; + + InProcessBrokerClient connection; // client::connection + local broker + Channel channel; + const std::string qname; + const std::string data; + Queue queue; + Exchange exchange; + Listener listener; + + public: + + ClientChannelTest() + : connection(FRAME_MAX), + qname("testq"), data("hello"), + queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE) + { + connection.openChannel(channel); + CPPUNIT_ASSERT(channel.getId() != 0); + channel.declareQueue(queue); + } + + void testPublishGet() { + Message pubMsg(data); + pubMsg.getHeaders().setString("hello", "world"); + channel.publish(pubMsg, exchange, qname); + Message getMsg; + CPPUNIT_ASSERT(channel.get(getMsg, queue)); + CPPUNIT_ASSERT_EQUAL(data, getMsg.getData()); + CPPUNIT_ASSERT_EQUAL(string("world"), + getMsg.getHeaders().getString("hello")); + CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue + } + + void testGetNoContent() { + Message pubMsg; + pubMsg.getHeaders().setString("hello", "world"); + channel.publish(pubMsg, exchange, qname); + Message getMsg; + CPPUNIT_ASSERT(channel.get(getMsg, queue)); + CPPUNIT_ASSERT(getMsg.getData().empty()); + CPPUNIT_ASSERT_EQUAL(string("world"), + getMsg.getHeaders().getString("hello")); + } + + void testConsumeCancel() { + string tag; // Broker assigned + channel.consume(queue, tag, &listener); + channel.start(); + CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); + channel.publish(Message("a"), exchange, qname); + { + Mutex::ScopedLock l(listener.monitor); + Time deadline(now() + 1*TIME_SEC); + while (listener.messages.size() != 1) { + CPPUNIT_ASSERT(listener.monitor.wait(deadline)); + } + } + CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size()); + CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData()); + + channel.publish(Message("b"), exchange, qname); + channel.publish(Message("c"), exchange, qname); + { + Mutex::ScopedLock l(listener.monitor); + while (listener.messages.size() != 3) { + CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); + } + } + CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size()); + CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData()); + CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData()); + + channel.cancel(tag); + channel.publish(Message("d"), exchange, qname); + CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size()); + { + Mutex::ScopedLock l(listener.monitor); + CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2)); + } + Message msg; + CPPUNIT_ASSERT(channel.get(msg, queue)); + CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData()); + } + + // Consume already-published messages + void testConsumePublished() { + Message pubMsg("x"); + pubMsg.getHeaders().setString("y", "z"); + channel.publish(pubMsg, exchange, qname); + string tag; + channel.consume(queue, tag, &listener); + CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size()); + channel.start(); + { + Mutex::ScopedLock l(listener.monitor); + while (listener.messages.size() != 1) + CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); + } + CPPUNIT_ASSERT_EQUAL(string("x"), listener.messages[0].getData()); + CPPUNIT_ASSERT_EQUAL(string("z"), + listener.messages[0].getHeaders().getString("y")); + } + + void testGetFragmentedMessage() { + string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size. + channel.publish(Message(longStr), exchange, qname); + Message getMsg; + CPPUNIT_ASSERT(channel.get(getMsg, queue)); + } + + void testConsumeFragmentedMessage() { + string xx(FRAME_MAX*2, 'x'); + channel.publish(Message(xx), exchange, qname); + channel.start(); + string tag; + channel.consume(queue, tag, &listener); + string yy(FRAME_MAX*2, 'y'); + channel.publish(Message(yy), exchange, qname); + { + Mutex::ScopedLock l(listener.monitor); + while (listener.messages.size() != 2) + CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC)); + } + CPPUNIT_ASSERT_EQUAL(xx, listener.messages[0].getData()); + CPPUNIT_ASSERT_EQUAL(yy, listener.messages[1].getData()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ClientChannelTest); diff --git a/cpp/src/tests/ConfigurationTest.cpp b/cpp/src/tests/ConfigurationTest.cpp new file mode 100644 index 0000000000..ecaa2865ce --- /dev/null +++ b/cpp/src/tests/ConfigurationTest.cpp @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../broker/Configuration.h" +#include "qpid_test_plugin.h" +#include <iostream> + +using namespace std; +using namespace qpid::broker; + +class ConfigurationTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ConfigurationTest); + CPPUNIT_TEST(testIsHelp); + CPPUNIT_TEST(testPortLongForm); + CPPUNIT_TEST(testPortShortForm); + CPPUNIT_TEST(testStore); + CPPUNIT_TEST(testStagingThreshold); + CPPUNIT_TEST(testVarious); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testIsHelp() + { + Configuration conf; + char* argv[] = {"ignore", "--help"}; + conf.parse("ignore", 2, argv); + CPPUNIT_ASSERT(conf.isHelp()); + } + + void testPortLongForm() + { + Configuration conf; + char* argv[] = {"ignore", "--port", "6789"}; + conf.parse("ignore", 3, argv); + CPPUNIT_ASSERT_EQUAL(6789, conf.getPort()); + } + + void testPortShortForm() + { + Configuration conf; + char* argv[] = {"ignore", "-p", "6789"}; + conf.parse("ignore", 3, argv); + CPPUNIT_ASSERT_EQUAL(6789, conf.getPort()); + } + + void testStore() + { + Configuration conf; + char* argv[] = {"ignore", "--store", "my-store-module.so"}; + conf.parse("ignore", 3, argv); + std::string expected("my-store-module.so"); + CPPUNIT_ASSERT_EQUAL(expected, conf.getStore()); + } + + void testStagingThreshold() + { + Configuration conf; + char* argv[] = {"ignore", "--staging-threshold", "123456789"}; + conf.parse("ignore", 3, argv); + long expected = 123456789; + CPPUNIT_ASSERT_EQUAL(expected, conf.getStagingThreshold()); + } + + void testVarious() + { + Configuration conf; + char* argv[] = {"ignore", "-t", "--worker-threads", "10"}; + conf.parse("ignore", 4, argv); + CPPUNIT_ASSERT_EQUAL(5672, conf.getPort());//default + CPPUNIT_ASSERT_EQUAL(10, conf.getWorkerThreads()); + CPPUNIT_ASSERT(conf.isTrace()); + CPPUNIT_ASSERT(!conf.isHelp()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ConfigurationTest); + diff --git a/cpp/src/tests/EventChannelConnectionTest.cpp b/cpp/src/tests/EventChannelConnectionTest.cpp new file mode 100644 index 0000000000..24cd492441 --- /dev/null +++ b/cpp/src/tests/EventChannelConnectionTest.cpp @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <iostream> +#include <boost/bind.hpp> +#include "../framing/AMQHeartbeatBody.h" +#include "../framing/AMQFrame.h" +#include "../sys/posix/EventChannelConnection.h" +#include "../sys/ConnectionInputHandler.h" +#include "../sys/ConnectionInputHandlerFactory.h" +#include "../sys/Socket.h" +#include "qpid_test_plugin.h" +#include "MockConnectionInputHandler.h" + +using namespace qpid::sys; +using namespace qpid::framing; +using namespace std; + +class EventChannelConnectionTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(EventChannelConnectionTest); + CPPUNIT_TEST(testSendReceive); + CPPUNIT_TEST(testCloseExternal); + CPPUNIT_TEST(testCloseException); + CPPUNIT_TEST_SUITE_END(); + + public: + + void setUp() { + threads = EventChannelThreads::create(); + CPPUNIT_ASSERT_EQUAL(0, ::pipe(pipe)); + connection.reset( + new EventChannelConnection(threads, factory, pipe[0], pipe[1])); + CPPUNIT_ASSERT(factory.handler != 0); + } + + void tearDown() { + threads->shutdown(); + threads->join(); + } + + void testSendReceive() + { + // Send a protocol initiation. + Buffer buf(1024); + ProtocolInitiation(4,2).encode(buf); + buf.flip(); + ssize_t n = write(pipe[1], buf.start(), buf.available()); + CPPUNIT_ASSERT_EQUAL(ssize_t(buf.available()), n); + + // Verify session handler got the protocol init. + ProtocolInitiation init = factory.handler->waitForProtocolInit(); + CPPUNIT_ASSERT_EQUAL(int(4), int(init.getMajor())); + CPPUNIT_ASSERT_EQUAL(int(2), int(init.getMinor())); + + // Send a heartbeat frame, verify connection got it. + connection->send(new AMQFrame(42, new AMQHeartbeatBody())); + AMQFrame frame = factory.handler->waitForFrame(); + CPPUNIT_ASSERT_EQUAL(uint16_t(42), frame.getChannel()); + CPPUNIT_ASSERT_EQUAL(uint8_t(HEARTBEAT_BODY), + frame.getBody()->type()); + threads->shutdown(); + } + + // Make sure the handler is closed if the connection is closed. + void testCloseExternal() { + connection->close(); + factory.handler->waitForClosed(); + } + + // Make sure the handler is closed if the connection closes or fails. + // TODO aconway 2006-12-18: logs exception message in test output. + void testCloseException() { + ::close(pipe[0]); + ::close(pipe[1]); + // TODO aconway 2006-12-18: Shouldn't this be failing? + connection->send(new AMQFrame(42, new AMQHeartbeatBody())); + factory.handler->waitForClosed(); + } + + private: + EventChannelThreads::shared_ptr threads; + int pipe[2]; + std::auto_ptr<EventChannelConnection> connection; + MockConnectionInputHandlerFactory factory; +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelConnectionTest); + diff --git a/cpp/src/tests/EventChannelTest.cpp b/cpp/src/tests/EventChannelTest.cpp new file mode 100644 index 0000000000..45229ce20f --- /dev/null +++ b/cpp/src/tests/EventChannelTest.cpp @@ -0,0 +1,187 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../sys/posix/EventChannel.h" +#include "../sys/posix/check.h" +#include "../sys/Runnable.h" +#include "../sys/Socket.h" +#include "../sys/Thread.h" +#include "qpid_test_plugin.h" + +#include <sys/socket.h> +#include <signal.h> +#include <netinet/in.h> +#include <netdb.h> +#include <iostream> + +using namespace qpid::sys; + + +const char hello[] = "hello"; +const size_t size = sizeof(hello); + +struct RunMe : public Runnable +{ + bool ran; + RunMe() : ran(false) {} + void run() { ran = true; } +}; + +class EventChannelTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(EventChannelTest); + CPPUNIT_TEST(testEvent); + CPPUNIT_TEST(testRead); + CPPUNIT_TEST(testFailedRead); + CPPUNIT_TEST(testWrite); + CPPUNIT_TEST(testFailedWrite); + CPPUNIT_TEST(testReadWrite); + CPPUNIT_TEST(testAccept); + CPPUNIT_TEST_SUITE_END(); + + private: + EventChannel::shared_ptr ec; + int pipe[2]; + char readBuf[size]; + + public: + + void setUp() + { + memset(readBuf, size, 0); + ec = EventChannel::create(); + if (::pipe(pipe) != 0) throw QPID_POSIX_ERROR(errno); + // Ignore SIGPIPE, otherwise we will crash writing to broken pipe. + signal(SIGPIPE, SIG_IGN); + } + + // Verify that calling getEvent returns event. + template <class T> bool isNextEvent(T& event) + { + return &event == dynamic_cast<T*>(ec->getEvent()); + } + + template <class T> bool isNextEventOk(T& event) + { + Event* next = ec->getEvent(); + if (next) next->throwIfError(); + return &event == next; + } + + void testEvent() + { + RunMe runMe; + CPPUNIT_ASSERT(!runMe.ran); + // Instances of Event just pass thru the channel immediately. + Event e(runMe.functor()); + ec->postEvent(e); + CPPUNIT_ASSERT(isNextEventOk(e)); + e.dispatch(); + CPPUNIT_ASSERT(runMe.ran); + } + + void testRead() { + ReadEvent re(pipe[0], readBuf, size); + ec->postEvent(re); + CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::write(pipe[1], hello, size)); + CPPUNIT_ASSERT(isNextEventOk(re)); + CPPUNIT_ASSERT_EQUAL(size, re.getSize()); + CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); + } + + void testFailedRead() + { + ReadEvent re(pipe[0], readBuf, size); + ec->postEvent(re); + + // EOF before all data read. + ::close(pipe[1]); + CPPUNIT_ASSERT(isNextEvent(re)); + CPPUNIT_ASSERT(re.hasError()); + try { + re.throwIfError(); + CPPUNIT_FAIL("Expected QpidError."); + } + catch (const qpid::QpidError&) { } + + // Bad file descriptor. Note in this case we fail + // in postEvent and throw immediately. + try { + ReadEvent bad; + ec->postEvent(bad); + CPPUNIT_FAIL("Expected QpidError."); + } + catch (const qpid::QpidError&) { } + } + + void testWrite() { + WriteEvent wr(pipe[1], hello, size); + ec->postEvent(wr); + CPPUNIT_ASSERT(isNextEventOk(wr)); + CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::read(pipe[0], readBuf, size));; + CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); + } + + void testFailedWrite() { + WriteEvent wr(pipe[1], hello, size); + ::close(pipe[0]); + ec->postEvent(wr); + CPPUNIT_ASSERT(isNextEvent(wr)); + CPPUNIT_ASSERT(wr.hasError()); + } + + void testReadWrite() + { + ReadEvent re(pipe[0], readBuf, size); + WriteEvent wr(pipe[1], hello, size); + ec->postEvent(re); + ec->postEvent(wr); + ec->getEvent(); + ec->getEvent(); + CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); + } + + void testAccept() { + Socket s = Socket::createTcp(); + int port = s.listen(0, 10); + CPPUNIT_ASSERT(port != 0); + + AcceptEvent ae(s.fd()); + ec->postEvent(ae); + Socket client = Socket::createTcp(); + client.connect("localhost", port); + CPPUNIT_ASSERT(isNextEvent(ae)); + ae.dispatch(); + + // Verify client writes are read by the accepted descriptor. + char readBuf[size]; + ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size); + ec->postEvent(re); + CPPUNIT_ASSERT_EQUAL(ssize_t(size), client.send(hello, sizeof(hello))); + CPPUNIT_ASSERT(isNextEvent(re)); + re.dispatch(); + CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelTest); + diff --git a/cpp/src/tests/EventChannelThreadsTest.cpp b/cpp/src/tests/EventChannelThreadsTest.cpp new file mode 100644 index 0000000000..ee1e2859c4 --- /dev/null +++ b/cpp/src/tests/EventChannelThreadsTest.cpp @@ -0,0 +1,247 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <boost/bind.hpp> + +#include "../sys/Socket.h" +#include "../sys/posix/EventChannelThreads.h" +#include "qpid_test_plugin.h" + + +using namespace std; + +using namespace qpid::sys; + +const int nConnections = 5; +const int nMessages = 10; // Messages read/written per connection. + + +// Accepts + reads + writes. +const int totalEvents = nConnections+2*nConnections*nMessages; + +/** + * Messages are numbered 0..nMessages. + * We count the total number of events, and the + * number of reads and writes for each message number. + */ +class TestResults : public Monitor { + public: + TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {} + + void countEvent() { + if (--nEventsRemaining == 0) + shutdown(); + } + + void countRead(int messageNo) { + ++reads[messageNo]; + countEvent(); + } + + void countWrite(int messageNo) { + ++writes[messageNo]; + countEvent(); + } + + void shutdown(const std::string& exceptionMsg = std::string()) { + ScopedLock lock(*this); + exception = exceptionMsg; + isShutdown = true; + notifyAll(); + } + + void wait() { + ScopedLock lock(*this); + Time deadline = now() + 10*TIME_SEC; + while (!isShutdown) { + CPPUNIT_ASSERT(Monitor::wait(deadline)); + } + } + + bool isShutdown; + std::string exception; + AtomicCount reads[nMessages]; + AtomicCount writes[nMessages]; + AtomicCount nEventsRemaining; +}; + +TestResults results; + +EventChannelThreads::shared_ptr threads; + +// Functor to wrap callbacks in try/catch. +class SafeCallback { + public: + SafeCallback(Runnable& r) : callback(r.functor()) {} + SafeCallback(Event::Callback cb) : callback(cb) {} + + void operator()() { + std::string exception; + try { + callback(); + return; + } + catch (const std::exception& e) { + exception = e.what(); + } + catch (...) { + exception = "Unknown exception."; + } + results.shutdown(exception); + } + + private: + Event::Callback callback; +}; + +/** Repost an event N times. */ +class Repost { + public: + Repost(int n) : count (n) {} + virtual ~Repost() {} + + void repost(Event* event) { + if (--count==0) { + delete event; + } else { + threads->postEvent(event); + } + } + private: + int count; +}; + + + +/** Repeating read event. */ +class TestReadEvent : public ReadEvent, public Runnable, private Repost { + public: + explicit TestReadEvent(int fd=-1) : + ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)), + Repost(nMessages) + {} + + void run() { + CPPUNIT_ASSERT_EQUAL(sizeof(value), getSize()); + CPPUNIT_ASSERT(0 <= value); + CPPUNIT_ASSERT(value < nMessages); + results.countRead(value); + repost(this); + } + + private: + int value; + ReadEvent original; +}; + + +/** Fire and forget write event */ +class TestWriteEvent : public WriteEvent, public Runnable, private Repost { + public: + TestWriteEvent(int fd=-1) : + WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)), + Repost(nMessages), + value(0) + {} + + void run() { + CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize()); + results.countWrite(value++); + repost(this); + } + + private: + int value; +}; + +/** Fire-and-forget Accept event, posts reads on the accepted connection. */ +class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost { + public: + TestAcceptEvent(int fd=-1) : + AcceptEvent(fd, SafeCallback(*this)), + Repost(nConnections) + {} + + void run() { + threads->postEvent(new TestReadEvent(getAcceptedDesscriptor())); + results.countEvent(); + repost(this); + } +}; + +class EventChannelThreadsTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(EventChannelThreadsTest); + CPPUNIT_TEST(testThreads); + CPPUNIT_TEST_SUITE_END(); + + public: + + void setUp() { + threads = EventChannelThreads::create(EventChannel::create()); + } + + void tearDown() { + threads.reset(); + } + + void testThreads() + { + Socket listener = Socket::createTcp(); + int port = listener.listen(); + + // Post looping accept events, will repost nConnections times. + // The accept event will automatically post read events. + threads->postEvent(new TestAcceptEvent(listener.fd())); + + // Make connections. + Socket connections[nConnections]; + for (int i = 0; i < nConnections; ++i) { + connections[i] = Socket::createTcp(); + connections[i].connect("localhost", port); + } + + // Post looping write events. + for (int i = 0; i < nConnections; ++i) { + threads->postEvent(new TestWriteEvent(connections[i].fd())); + } + + // Wait for all events to be dispatched. + results.wait(); + + if (!results.exception.empty()) CPPUNIT_FAIL(results.exception); + CPPUNIT_ASSERT_EQUAL(0, int(results.nEventsRemaining)); + + // Expect a read and write for each messageNo from each connection. + for (int messageNo = 0; messageNo < nMessages; ++messageNo) { + CPPUNIT_ASSERT_EQUAL(nConnections, int(results.reads[messageNo])); + CPPUNIT_ASSERT_EQUAL(nConnections, int(results.writes[messageNo])); + } + + threads->shutdown(); + threads->join(); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelThreadsTest); + diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp new file mode 100644 index 0000000000..97846cf527 --- /dev/null +++ b/cpp/src/tests/ExchangeTest.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "../broker/DeliverableMessage.h" +#include "../broker/DirectExchange.h" +#include "../broker/BrokerExchange.h" +#include "../broker/BrokerQueue.h" +#include "../broker/TopicExchange.h" +#include "qpid_test_plugin.h" +#include <iostream> +#include "BasicGetBody.h" + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +class ExchangeTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ExchangeTest); + CPPUNIT_TEST(testMe); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testMe() + { + Queue::shared_ptr queue(new Queue("queue", true)); + Queue::shared_ptr queue2(new Queue("queue2", true)); + + TopicExchange topic("topic"); + topic.bind(queue, "abc", 0); + topic.bind(queue2, "abc", 0); + + DirectExchange direct("direct"); + direct.bind(queue, "abc", 0); + direct.bind(queue2, "abc", 0); + + queue.reset(); + queue2.reset(); + + Message::shared_ptr msgPtr( + new BasicMessage( + 0, "e", "A", true, true, + AMQMethodBody::shared_ptr( + new BasicGetBody(ProtocolVersion())))); + DeliverableMessage msg(msgPtr); + topic.route(msg, "abc", 0); + direct.route(msg, "abc", 0); + + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ExchangeTest); diff --git a/cpp/src/tests/FieldTableTest.cpp b/cpp/src/tests/FieldTableTest.cpp new file mode 100644 index 0000000000..f485ca187e --- /dev/null +++ b/cpp/src/tests/FieldTableTest.cpp @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include "../framing/amqp_framing.h" +#include "qpid_test_plugin.h" + +using namespace qpid::framing; + +class FieldTableTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(FieldTableTest); + CPPUNIT_TEST(testMe); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testMe() + { + FieldTable ft; + ft.setString("A", "BCDE"); + CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft.getString("A")); + + Buffer buffer(100); + buffer.putFieldTable(ft); + buffer.flip(); + FieldTable ft2; + buffer.getFieldTable(ft2); + CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft2.getString("A")); + + } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(FieldTableTest); + diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp new file mode 100644 index 0000000000..89a559d0bb --- /dev/null +++ b/cpp/src/tests/FramingTest.cpp @@ -0,0 +1,381 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <memory> +#include <boost/lexical_cast.hpp> + +#include "ConnectionRedirectBody.h" +#include "../framing/ProtocolVersion.h" +#include "../framing/amqp_framing.h" +#include <iostream> +#include "qpid_test_plugin.h" +#include <sstream> +#include <typeinfo> +#include "../QpidError.h" +#include "AMQP_HighestVersion.h" +#include "../framing/AMQRequestBody.h" +#include "../framing/AMQResponseBody.h" +#include "../framing/Requester.h" +#include "../framing/Responder.h" +#include "InProcessBroker.h" +#include "../client/Connection.h" +#include "../client/ClientExchange.h" +#include "../client/ClientQueue.h" + +using namespace qpid; +using namespace qpid::framing; +using namespace std; + +template <class T> +std::string tostring(const T& x) +{ + std::ostringstream out; + out << x; + return out.str(); +} + +class FramingTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(FramingTest); + CPPUNIT_TEST(testBasicQosBody); + CPPUNIT_TEST(testConnectionSecureBody); + CPPUNIT_TEST(testConnectionRedirectBody); + CPPUNIT_TEST(testAccessRequestBody); + CPPUNIT_TEST(testBasicConsumeBody); + CPPUNIT_TEST(testConnectionRedirectBodyFrame); + CPPUNIT_TEST(testBasicConsumeOkBodyFrame); + CPPUNIT_TEST(testRequestBodyFrame); + CPPUNIT_TEST(testResponseBodyFrame); + CPPUNIT_TEST(testRequester); + CPPUNIT_TEST(testResponder); + CPPUNIT_TEST(testInlineContent); + CPPUNIT_TEST(testContentReference); + CPPUNIT_TEST(testContentValidation); + CPPUNIT_TEST(testRequestResponseRoundtrip); + CPPUNIT_TEST_SUITE_END(); + + private: + Buffer buffer; + ProtocolVersion version; + AMQP_MethodVersionMap versionMap; + + public: + + FramingTest() : buffer(1024), version(highestProtocolVersion) {} + + void testBasicQosBody() + { + BasicQosBody in(version, 0xCAFEBABE, 0xABBA, true); + in.encodeContent(buffer); + buffer.flip(); + BasicQosBody out(version); + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testConnectionSecureBody() + { + std::string s = "security credential"; + ConnectionSecureBody in(version, s); + in.encodeContent(buffer); + buffer.flip(); + ConnectionSecureBody out(version); + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testConnectionRedirectBody() + { + std::string a = "hostA"; + std::string b = "hostB"; + ConnectionRedirectBody in(version, 0, a, b); + in.encodeContent(buffer); + buffer.flip(); + ConnectionRedirectBody out(version); + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testAccessRequestBody() + { + std::string s = "text"; + AccessRequestBody in(version, s, true, false, true, false, true); + in.encodeContent(buffer); + buffer.flip(); + AccessRequestBody out(version); + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testBasicConsumeBody() + { + std::string q = "queue"; + std::string t = "tag"; + BasicConsumeBody in(version, 0, q, t, false, true, false, false, + FieldTable()); + in.encodeContent(buffer); + buffer.flip(); + BasicConsumeBody out(version); + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + + void testConnectionRedirectBodyFrame() + { + std::string a = "hostA"; + std::string b = "hostB"; + AMQFrame in(version, 999, + new ConnectionRedirectBody(version, 0, a, b)); + in.encode(buffer); + buffer.flip(); + AMQFrame out; + out.decode(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testBasicConsumeOkBodyFrame() + { + std::string s = "hostA"; + AMQFrame in(version, 999, new BasicConsumeOkBody(version, 0, s)); + in.encode(buffer); + buffer.flip(); + AMQFrame out; + for(int i = 0; i < 5; i++){ + out.decode(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + } + + void testRequestBodyFrame() { + std::string testing("testing"); + AMQBody::shared_ptr request(new ChannelOpenBody(version, testing)); + AMQFrame in(version, 999, request); + in.encode(buffer); + buffer.flip(); + AMQFrame out; + out.decode(buffer); + ChannelOpenBody* decoded = + dynamic_cast<ChannelOpenBody*>(out.getBody().get()); + CPPUNIT_ASSERT(decoded); + CPPUNIT_ASSERT_EQUAL(testing, decoded->getOutOfBand()); + } + + void testResponseBodyFrame() { + AMQBody::shared_ptr response(new ChannelOkBody(version)); + AMQFrame in(version, 999, response); + in.encode(buffer); + buffer.flip(); + AMQFrame out; + out.decode(buffer); + ChannelOkBody* decoded = + dynamic_cast<ChannelOkBody*>(out.getBody().get()); + CPPUNIT_ASSERT(decoded); + } + + void testInlineContent() { + Content content(INLINE, "MyData"); + CPPUNIT_ASSERT(content.isInline()); + content.encode(buffer); + buffer.flip(); + Content recovered; + recovered.decode(buffer); + CPPUNIT_ASSERT(recovered.isInline()); + CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue()); + } + + void testContentReference() { + Content content(REFERENCE, "MyRef"); + CPPUNIT_ASSERT(content.isReference()); + content.encode(buffer); + buffer.flip(); + Content recovered; + recovered.decode(buffer); + CPPUNIT_ASSERT(recovered.isReference()); + CPPUNIT_ASSERT_EQUAL(content.getValue(), recovered.getValue()); + } + + void testContentValidation() { + try { + Content content(REFERENCE, ""); + CPPUNIT_ASSERT(false);//fail, expected exception + } catch (QpidError& e) { + CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code); + CPPUNIT_ASSERT_EQUAL(string("Reference cannot be empty"), e.msg); + } + + try { + Content content(2, "Blah"); + CPPUNIT_ASSERT(false);//fail, expected exception + } catch (QpidError& e) { + CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code); + CPPUNIT_ASSERT_EQUAL(string("Invalid discriminator: 2"), e.msg); + } + + try { + buffer.putOctet(2); + buffer.putLongString("blah, blah"); + buffer.flip(); + Content content; + content.decode(buffer); + CPPUNIT_ASSERT(false);//fail, expected exception + } catch (QpidError& e) { + CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code); + CPPUNIT_ASSERT_EQUAL(string("Invalid discriminator: 2"), e.msg); + } + + } + + void testRequester() { + Requester r; + AMQRequestBody::Data q; + AMQResponseBody::Data p; + + r.sending(q); + CPPUNIT_ASSERT_EQUAL(1ULL, q.requestId); + CPPUNIT_ASSERT_EQUAL(0ULL, q.responseMark); + + r.sending(q); + CPPUNIT_ASSERT_EQUAL(2ULL, q.requestId); + CPPUNIT_ASSERT_EQUAL(0ULL, q.responseMark); + + // Now process a response + p.responseId = 1; + p.requestId = 2; + r.processed(AMQResponseBody::Data(1, 2)); + + r.sending(q); + CPPUNIT_ASSERT_EQUAL(3ULL, q.requestId); + CPPUNIT_ASSERT_EQUAL(1ULL, q.responseMark); + + try { + r.processed(p); // Already processed this response. + CPPUNIT_FAIL("Expected exception"); + } catch (...) {} + + try { + p.requestId = 50; + r.processed(p); // No such request + CPPUNIT_FAIL("Expected exception"); + } catch (...) {} + + r.sending(q); // reqId=4 + r.sending(q); // reqId=5 + r.sending(q); // reqId=6 + p.responseId++; + p.requestId = 4; + p.batchOffset = 2; + r.processed(p); + r.sending(q); + CPPUNIT_ASSERT_EQUAL(7ULL, q.requestId); + CPPUNIT_ASSERT_EQUAL(2ULL, q.responseMark); + + p.responseId++; + p.requestId = 1; // Out of order + p.batchOffset = 0; + r.processed(p); + r.sending(q); + CPPUNIT_ASSERT_EQUAL(8ULL, q.requestId); + CPPUNIT_ASSERT_EQUAL(3ULL, q.responseMark); + } + + void testResponder() { + Responder r; + AMQRequestBody::Data q; + AMQResponseBody::Data p; + + q.requestId = 1; + q.responseMark = 0; + r.received(q); + p.requestId = q.requestId; + r.sending(p); + CPPUNIT_ASSERT_EQUAL(1ULL, p.responseId); + CPPUNIT_ASSERT_EQUAL(1ULL, p.requestId); + CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset); + CPPUNIT_ASSERT_EQUAL(0ULL, r.getResponseMark()); + + q.requestId++; + q.responseMark = 1; + r.received(q); + r.sending(p); + CPPUNIT_ASSERT_EQUAL(2ULL, p.responseId); + CPPUNIT_ASSERT_EQUAL(0U, p.batchOffset); + CPPUNIT_ASSERT_EQUAL(1ULL, r.getResponseMark()); + + try { + // Response mark higher any request ID sent. + q.responseMark = 3; + r.received(q); + } catch(...) {} + + try { + // Response mark lower than previous response mark. + q.responseMark = 0; + r.received(q); + } catch(...) {} + + // TODO aconway 2007-01-14: Test for batching when supported. + + } + + // expect may contain null chars so use string(ptr,size) constructor + // Use sizeof(expect)-1 to strip the trailing null. +#define ASSERT_FRAME(expect, frame) \ + CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), boost::lexical_cast<string>(frame)) + + void testRequestResponseRoundtrip() { + broker::InProcessBroker ibroker(version); + client::Connection clientConnection; + clientConnection.setConnector(ibroker); + clientConnection.open(""); + client::Channel c; + clientConnection.openChannel(c); + + client::Exchange exchange( + "MyExchange", client::Exchange::TOPIC_EXCHANGE); + client::Queue queue("MyQueue", true); + c.declareExchange(exchange); + c.declareQueue(queue); + c.bind(exchange, queue, "MyTopic", framing::FieldTable()); + broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin(); + ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=9; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=100; frameMax=65536; heartbeat=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=100; frameMax=65536; heartbeat=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: channelId=]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; passive=0; durable=0; autoDelete=0; internal=0; nowait=0; arguments={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=2,batch=0): ExchangeDeclareOk: ]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=2): QueueDeclare: ticket=0; queue=MyQueue; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=3,request=3,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=3): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; nowait=0; arguments={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; response(id=4,request=4,batch=0): QueueBindOk: ]", *i++); + } + }; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(FramingTest); + + + diff --git a/cpp/src/tests/HeaderTest.cpp b/cpp/src/tests/HeaderTest.cpp new file mode 100644 index 0000000000..29e2ddee3d --- /dev/null +++ b/cpp/src/tests/HeaderTest.cpp @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include "../framing/amqp_framing.h" +#include "qpid_test_plugin.h" + +using namespace qpid::framing; + +class HeaderTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(HeaderTest); + CPPUNIT_TEST(testGenericProperties); + CPPUNIT_TEST(testAllSpecificProperties); + CPPUNIT_TEST(testSomeSpecificProperties); + CPPUNIT_TEST_SUITE_END(); + +public: + + void testGenericProperties() + { + AMQHeaderBody body(BASIC); + dynamic_cast<BasicHeaderProperties*>(body.getProperties())->getHeaders().setString("A", "BCDE"); + Buffer buffer(100); + + body.encode(buffer); + buffer.flip(); + AMQHeaderBody body2; + body2.decode(buffer, body.size()); + BasicHeaderProperties* props = + dynamic_cast<BasicHeaderProperties*>(body2.getProperties()); + CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), + props->getHeaders().getString("A")); + } + + void testAllSpecificProperties(){ + string contentType("text/html"); + string contentEncoding("UTF8"); + DeliveryMode deliveryMode(PERSISTENT); + uint8_t priority(3); + string correlationId("abc"); + string replyTo("no-address"); + string expiration("why is this a string?"); + string messageId("xyz"); + uint64_t timestamp(0xabcd); + string type("eh?"); + string userId("guest"); + string appId("just testing"); + string clusterId("no clustering required"); + + AMQHeaderBody body(BASIC); + BasicHeaderProperties* properties = + dynamic_cast<BasicHeaderProperties*>(body.getProperties()); + properties->setContentType(contentType); + properties->getHeaders().setString("A", "BCDE"); + properties->setDeliveryMode(deliveryMode); + properties->setPriority(priority); + properties->setCorrelationId(correlationId); + properties->setReplyTo(replyTo); + properties->setExpiration(expiration); + properties->setMessageId(messageId); + properties->setTimestamp(timestamp); + properties->setType(type); + properties->setUserId(userId); + properties->setAppId(appId); + properties->setClusterId(clusterId); + + Buffer buffer(10000); + body.encode(buffer); + buffer.flip(); + AMQHeaderBody temp; + temp.decode(buffer, body.size()); + properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); + + CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); + CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), properties->getHeaders().getString("A")); + CPPUNIT_ASSERT_EQUAL(deliveryMode, properties->getDeliveryMode()); + CPPUNIT_ASSERT_EQUAL(priority, properties->getPriority()); + CPPUNIT_ASSERT_EQUAL(correlationId, properties->getCorrelationId()); + CPPUNIT_ASSERT_EQUAL(replyTo, properties->getReplyTo()); + CPPUNIT_ASSERT_EQUAL(expiration, properties->getExpiration()); + CPPUNIT_ASSERT_EQUAL(messageId, properties->getMessageId()); + CPPUNIT_ASSERT_EQUAL(timestamp, properties->getTimestamp()); + CPPUNIT_ASSERT_EQUAL(type, properties->getType()); + CPPUNIT_ASSERT_EQUAL(userId, properties->getUserId()); + CPPUNIT_ASSERT_EQUAL(appId, properties->getAppId()); + CPPUNIT_ASSERT_EQUAL(clusterId, properties->getClusterId()); + } + + void testSomeSpecificProperties(){ + string contentType("application/octet-stream"); + DeliveryMode deliveryMode(PERSISTENT); + uint8_t priority(6); + string expiration("Z"); + uint64_t timestamp(0xabe4a34a); + + AMQHeaderBody body(BASIC); + BasicHeaderProperties* properties = + dynamic_cast<BasicHeaderProperties*>(body.getProperties()); + properties->setContentType(contentType); + properties->setDeliveryMode(deliveryMode); + properties->setPriority(priority); + properties->setExpiration(expiration); + properties->setTimestamp(timestamp); + + Buffer buffer(100); + body.encode(buffer); + buffer.flip(); + AMQHeaderBody temp; + temp.decode(buffer, body.size()); + properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); + + CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); + CPPUNIT_ASSERT_EQUAL((int) deliveryMode, (int) properties->getDeliveryMode()); + CPPUNIT_ASSERT_EQUAL((int) priority, (int) properties->getPriority()); + CPPUNIT_ASSERT_EQUAL(expiration, properties->getExpiration()); + CPPUNIT_ASSERT_EQUAL(timestamp, properties->getTimestamp()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(HeaderTest); + diff --git a/cpp/src/tests/HeadersExchangeTest.cpp b/cpp/src/tests/HeadersExchangeTest.cpp new file mode 100644 index 0000000000..64125f4a0a --- /dev/null +++ b/cpp/src/tests/HeadersExchangeTest.cpp @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "../broker/HeadersExchange.h" +#include "../framing/FieldTable.h" +#include "../framing/Value.h" +#include "qpid_test_plugin.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +class HeadersExchangeTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(HeadersExchangeTest); + CPPUNIT_TEST(testMatchAll); + CPPUNIT_TEST(testMatchAny); + CPPUNIT_TEST(testMatchEmptyValue); + CPPUNIT_TEST(testMatchEmptyArgs); + CPPUNIT_TEST(testMatchNoXMatch); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testMatchAll() + { + FieldTable b, m; + b.setString("x-match", "all"); + b.setString("foo", "FOO"); + b.setInt("n", 42); + m.setString("foo", "FOO"); + m.setInt("n", 42); + CPPUNIT_ASSERT(HeadersExchange::match(b, m)); + + // Ignore extras. + m.setString("extra", "x"); + CPPUNIT_ASSERT(HeadersExchange::match(b, m)); + + // Fail mismatch, wrong value. + m.setString("foo", "NotFoo"); + CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + + // Fail mismatch, missing value + m.erase("foo"); + CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + } + + void testMatchAny() + { + FieldTable b, m; + b.setString("x-match", "any"); + b.setString("foo", "FOO"); + b.setInt("n", 42); + m.setString("foo", "FOO"); + CPPUNIT_ASSERT(HeadersExchange::match(b, m)); + m.erase("foo"); + CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + m.setInt("n", 42); + CPPUNIT_ASSERT(HeadersExchange::match(b, m)); + } + + void testMatchEmptyValue() + { + FieldTable b, m; + b.setString("x-match", "all"); + b.getMap()["foo"] = FieldTable::ValuePtr(new EmptyValue()); + b.getMap()["n"] = FieldTable::ValuePtr(new EmptyValue()); + CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + m.setString("foo", "blah"); + m.setInt("n", 123); + } + + void testMatchEmptyArgs() + { + FieldTable b, m; + m.setString("foo", "FOO"); + + b.setString("x-match", "all"); + CPPUNIT_ASSERT(HeadersExchange::match(b, m)); + b.setString("x-match", "any"); + CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + } + + + void testMatchNoXMatch() + { + FieldTable b, m; + b.setString("foo", "FOO"); + m.setString("foo", "FOO"); + CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + } + + +}; + +// make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(HeadersExchangeTest); diff --git a/cpp/src/tests/InMemoryContentTest.cpp b/cpp/src/tests/InMemoryContentTest.cpp new file mode 100644 index 0000000000..6c7dd58258 --- /dev/null +++ b/cpp/src/tests/InMemoryContentTest.cpp @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../broker/InMemoryContent.h" +#include "qpid_test_plugin.h" +#include "AMQP_HighestVersion.h" +#include <iostream> +#include <list> +#include "../framing/AMQFrame.h" +#include "MockChannel.h" + +using std::list; +using std::string; +using boost::dynamic_pointer_cast; +using namespace qpid::broker; +using namespace qpid::framing; + + +class InMemoryContentTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(InMemoryContentTest); + CPPUNIT_TEST(testRefragmentation); + CPPUNIT_TEST_SUITE_END(); + +public: + void testRefragmentation() + { + {//no remainder + string out[] = {"abcde", "fghij", "klmno", "pqrst"}; + string in[] = {out[0] + out[1], out[2] + out[3]}; + refragment(2, in, 4, out); + } + {//remainder for last frame + string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvw"}; + string in[] = {out[0] + out[1], out[2] + out[3] + out[4]}; + refragment(2, in, 5, out); + } + } + + + void refragment(size_t inCount, string* in, size_t outCount, string* out, uint32_t framesize = 5) + { + InMemoryContent content; + MockChannel channel(3); + + addframes(content, inCount, in); + content.send(channel, framesize); + CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); + + for (unsigned int i = 0; i < outCount; i++) { + AMQContentBody::shared_ptr chunk( + dynamic_pointer_cast<AMQContentBody>( + channel.out.frames[i].getBody())); + CPPUNIT_ASSERT(chunk); + CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData()); + CPPUNIT_ASSERT_EQUAL( + ChannelId(3), channel.out.frames[i].getChannel()); + } + } + + void addframes(InMemoryContent& content, size_t frameCount, string* frameData) + { + for (unsigned int i = 0; i < frameCount; i++) { + AMQContentBody::shared_ptr frame(new AMQContentBody(frameData[i])); + content.add(frame); + } + } + + +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(InMemoryContentTest); + diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h new file mode 100644 index 0000000000..ff94ddbe9f --- /dev/null +++ b/cpp/src/tests/InProcessBroker.h @@ -0,0 +1,163 @@ +#ifndef _tests_InProcessBroker_h +#define _tests_InProcessBroker_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "AMQP_HighestVersion.h" +#include "../framing/AMQFrame.h" +#include "../broker/Broker.h" +#include "../broker/Connection.h" +#include "../client/Connector.h" +#include "../client/Connection.h" + +#include <vector> +#include <iostream> +#include <algorithm> + + +namespace qpid { +namespace broker { + +/** + * A broker that implements client::Connector allowing direct + * in-process connection of client to broker. Used to write round-trip + * tests without requiring an external broker process. + * + * Also allows you to "snoop" on frames exchanged between client & broker. + * + * see FramingTest::testRequestResponseRoundtrip() for example of use. + */ +class InProcessBroker : public client::Connector { + public: + enum Sender {CLIENT,BROKER}; + + /** A frame tagged with the sender */ + struct TaggedFrame { + TaggedFrame(Sender e, framing::AMQFrame* f) : frame(f), sender(e) {} + bool fromBroker() const { return sender == BROKER; } + bool fromClient() const { return sender == CLIENT; } + + template <class MethodType> + MethodType* asMethod() { + return dynamic_cast<MethodType*>(frame->getBody().get()); + } + shared_ptr<framing::AMQFrame> frame; + Sender sender; + }; + + typedef std::vector<TaggedFrame> Conversation; + + InProcessBroker(framing::ProtocolVersion ver= + framing::highestProtocolVersion + ) : + Connector(ver), + protocolInit(ver), + broker(broker::Broker::create()), + brokerOut(BROKER, conversation), + brokerConnection(&brokerOut, *broker), + clientOut(CLIENT, conversation, &brokerConnection) + {} + + ~InProcessBroker() { broker->shutdown(); } + + void connect(const std::string& /*host*/, int /*port*/) {} + void init() { brokerConnection.initiated(protocolInit); } + void close() {} + + /** Client's input handler. */ + void setInputHandler(framing::InputHandler* handler) { + brokerOut.in = handler; + } + + /** Called by client to send a frame */ + void send(framing::AMQFrame* frame) { + clientOut.send(frame); + } + + /** Entire client-broker conversation is recorded here */ + Conversation conversation; + + private: + /** OutputHandler that forwards data to an InputHandler */ + struct OutputToInputHandler : public sys::ConnectionOutputHandler { + OutputToInputHandler( + Sender sender_, Conversation& conversation_, + framing::InputHandler* ih=0 + ) : sender(sender_), conversation(conversation_), in(ih) {} + + void send(framing::AMQFrame* frame) { + conversation.push_back(TaggedFrame(sender, frame)); + in->received(frame); + } + + void close() {} + + Sender sender; + Conversation& conversation; + framing::InputHandler* in; + }; + + framing::ProtocolInitiation protocolInit; + Broker::shared_ptr broker; + OutputToInputHandler brokerOut; + broker::Connection brokerConnection; + OutputToInputHandler clientOut; +}; + +std::ostream& operator<<( + std::ostream& out, const InProcessBroker::TaggedFrame& tf) +{ + return out << (tf.fromBroker()? "BROKER: ":"CLIENT: ") << *tf.frame; +} + +std::ostream& operator<<( + std::ostream& out, const InProcessBroker::Conversation& conv) +{ + copy(conv.begin(), conv.end(), + std::ostream_iterator<InProcessBroker::TaggedFrame>(out, "\n")); + return out; +} + +} // namespace broker + + +namespace client { +/** An in-process client+broker all in one. */ +class InProcessBrokerClient : public client::Connection { + public: + broker::InProcessBroker broker; + broker::InProcessBroker::Conversation& conversation; + + /** Constructor creates broker and opens client connection. */ + InProcessBrokerClient( + u_int32_t max_frame_size=65536, + framing::ProtocolVersion version= framing::highestProtocolVersion + ) : client::Connection(false, max_frame_size, version), + broker(version), + conversation(broker.conversation) + { + setConnector(broker); + open(""); + } +}; + + +}} // namespace qpid::client + + +#endif // _tests_InProcessBroker_h diff --git a/cpp/src/tests/LazyLoadedContentTest.cpp b/cpp/src/tests/LazyLoadedContentTest.cpp new file mode 100644 index 0000000000..9d0da2206d --- /dev/null +++ b/cpp/src/tests/LazyLoadedContentTest.cpp @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../broker/LazyLoadedContent.h" +#include "AMQP_HighestVersion.h" +#include "../broker/NullMessageStore.h" +#include "qpid_test_plugin.h" +#include <iostream> +#include <list> +#include <sstream> +#include "../framing/AMQFrame.h" +#include "MockChannel.h" +using std::list; +using std::string; +using boost::dynamic_pointer_cast; +using namespace qpid::broker; +using namespace qpid::framing; + + + +class LazyLoadedContentTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(LazyLoadedContentTest); + CPPUNIT_TEST(testFragmented); + CPPUNIT_TEST(testWhole); + CPPUNIT_TEST(testHalved); + CPPUNIT_TEST_SUITE_END(); + + class TestMessageStore : public NullMessageStore + { + const string content; + + public: + TestMessageStore(const string& _content) : content(_content) {} + + void loadContent(PersistableMessage&, string& data, uint64_t offset, uint32_t length) + { + if (offset + length <= content.size()) { + data = content.substr(offset, length); + } else{ + std::stringstream error; + error << "Invalid segment: offset=" << offset << ", length=" << length << ", content_length=" << content.size(); + throw qpid::Exception(error.str()); + } + } + }; + + +public: + void testFragmented() + { + string data = "abcdefghijklmnopqrstuvwxyz"; + uint32_t framesize = 5; + string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvwxy", "z"}; + load(data, 6, out, framesize); + } + + void testWhole() + { + string data = "abcdefghijklmnopqrstuvwxyz"; + uint32_t framesize = 50; + string out[] = {data}; + load(data, 1, out, framesize); + } + + void testHalved() + { + string data = "abcdefghijklmnopqrstuvwxyz"; + uint32_t framesize = 13; + string out[] = {"abcdefghijklm", "nopqrstuvwxyz"}; + load(data, 2, out, framesize); + } + + void load(string& in, size_t outCount, string* out, uint32_t framesize) + { + TestMessageStore store(in); + LazyLoadedContent content(&store, 0, in.size()); + MockChannel channel(3); + content.send(channel, framesize); + CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); + + for (unsigned int i = 0; i < outCount; i++) { + AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[i].getBody())); + CPPUNIT_ASSERT(chunk); + CPPUNIT_ASSERT_EQUAL(out[i], chunk->getData()); + CPPUNIT_ASSERT_EQUAL( + ChannelId(3), channel.out.frames[i].getChannel()); + } + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(LazyLoadedContentTest); + diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am new file mode 100644 index 0000000000..1b4b65fb4f --- /dev/null +++ b/cpp/src/tests/Makefile.am @@ -0,0 +1,117 @@ +AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) +INCLUDES = \ + -I$(srcdir)/../gen \ + $(APR_CXXFLAGS) + +# Unit tests +broker_tests = \ + AccumulatedAckTest \ + BrokerChannelTest \ + ConfigurationTest \ + ExchangeTest \ + HeadersExchangeTest \ + InMemoryContentTest \ + LazyLoadedContentTest \ + MessageBuilderTest \ + MessageTest \ + ReferenceTest \ + QueueRegistryTest \ + QueueTest \ + QueuePolicyTest \ + TopicExchangeTest \ + TxAckTest \ + TxBufferTest \ + TxPublishTest \ + ValueTest \ + MessageHandlerTest + +client_tests = \ + ClientChannelTest + +framing_tests = \ + FieldTableTest \ + FramingTest \ + HeaderTest + +misc_tests = \ + ProducerConsumerTest + +posix_tests = \ + EventChannelTest \ + EventChannelThreadsTest + +unit_tests = \ + $(broker_tests) \ + $(client_tests) \ + $(framing_tests) \ + $(misc_tests) + +# Executable client tests + +client_exe_tests = \ + client_test \ + echo_service \ + topic_listener \ + topic_publisher + +noinst_PROGRAMS = $(client_exe_tests) + +TESTS_ENVIRONMENT = \ + VALGRIND=$(VALGRIND) \ + abs_builddir='$(abs_builddir)' \ + PATH="$(abs_builddir)/../src$(PATH_SEPARATOR)$$PATH" \ + abs_srcdir='$(abs_srcdir)' + +CLIENT_TESTS = client_test quick_topictest +TESTS = run-unit-tests start_broker $(CLIENT_TESTS) python_tests kill_broker + +EXTRA_DIST = \ + $(TESTS) \ + .vg-supp \ + InProcessBroker.h \ + MockChannel.h \ + MockConnectionInputHandler.h \ + qpid_test_plugin.h \ + setup \ + topicall \ + topictest \ + APRBaseTest.cpp + +CLEANFILES=qpidd.log +DISTCLEANFILES=gen.mk + +include gen.mk + +check_LTLIBRARIES += libdlclose_noop.la +libdlclose_noop_la_LDFLAGS = -module -rpath /home/aconway/svn/qpid/cpp/tests +libdlclose_noop_la_SOURCES = dlclose_noop.c + + +abs_builddir = @abs_builddir@ +extra_libs = $(CPPUNIT_LIBS) +lib_client = $(abs_builddir)/../client/libqpidclient.la +lib_common = $(abs_builddir)/../libqpidcommon.la +lib_broker = $(abs_builddir)/../broker/libqpidbroker.la + +gen.mk: Makefile.am + ( \ + for i in $(client_exe_tests); do \ + echo $${i}_SOURCES = $$i.cpp; \ + echo $${i}_LDADD = '$$(lib_client) $$(lib_common) $$(extra_libs)'; \ + done; \ + libs=; \ + for i in $(unit_tests); do \ + libs="$$libs $${i}.la"; \ + echo $${i}_la_SOURCES = $$i.cpp; \ + echo $${i}_la_LIBADD = '$$(lib_common) $$(lib_client)'; \ + echo $${i}_la_LIBADD += '$$(lib_broker) $$(extra_libs)'; \ + echo $${i}_la_LDFLAGS = "-module -rpath `pwd`"; \ + done; \ + echo "check_LTLIBRARIES =$$libs"; \ + ) \ + > $@-t + mv $@-t $@ + +check: $(check_LTLIBRARIES) $(lib_common) $(lib_client) $(lib_broker) + +# Rule to run unit tests from an individual test module. diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp new file mode 100644 index 0000000000..b660987708 --- /dev/null +++ b/cpp/src/tests/MessageBuilderTest.cpp @@ -0,0 +1,225 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../Exception.h" +#include "../broker/BrokerMessage.h" +#include "../broker/MessageBuilder.h" +#include "../broker/NullMessageStore.h" +#include "../framing/Buffer.h" +#include "qpid_test_plugin.h" +#include <iostream> +#include <memory> +#include "MockChannel.h" + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +class MessageBuilderTest : public CppUnit::TestCase +{ + struct MockHandler : CompletionHandler { + Message::shared_ptr msg; + + virtual void complete(Message::shared_ptr _msg){ + msg = _msg; + } + }; + + class TestMessageStore : public NullMessageStore + { + Buffer* header; + Buffer* content; + const uint32_t contentBufferSize; + + public: + + void stage(PersistableMessage& msg) + { + if (msg.getPersistenceId() == 0) { + header = new Buffer(msg.encodedSize()); + msg.encode(*header); + content = new Buffer(contentBufferSize); + msg.setPersistenceId(1); + } else { + throw qpid::Exception("Message already staged!"); + } + } + + void appendContent(PersistableMessage& msg, const string& data) + { + if (msg.getPersistenceId() == 1) { + content->putRawData(data); + } else { + throw qpid::Exception("Invalid message id!"); + } + } + + using NullMessageStore::destroy; + + void destroy(PersistableMessage& msg) + { + CPPUNIT_ASSERT(msg.getPersistenceId()); + } + + BasicMessage::shared_ptr getRestoredMessage() + { + BasicMessage::shared_ptr msg(new BasicMessage()); + if (header) { + header->flip(); + msg->decodeHeader(*header); + delete header; + header = 0; + if (content) { + content->flip(); + msg->decodeContent(*content); + delete content; + content = 0; + } + } + return msg; + } + + //dont care about any of the other methods: + TestMessageStore(uint32_t _contentBufferSize) : NullMessageStore(), header(0), content(0), + contentBufferSize(_contentBufferSize) {} + ~TestMessageStore(){} + }; + + CPPUNIT_TEST_SUITE(MessageBuilderTest); + CPPUNIT_TEST(testHeaderOnly); + CPPUNIT_TEST(test1ContentFrame); + CPPUNIT_TEST(test2ContentFrames); + CPPUNIT_TEST(testStaging); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testHeaderOnly(){ + MockHandler handler; + MessageBuilder builder(&handler); + + Message::shared_ptr message( + new BasicMessage( + 0, "test", "my_routing_key", false, false, + MockChannel::basicGetBody())); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(0); + + builder.initialise(message); + CPPUNIT_ASSERT(!handler.msg); + builder.setHeader(header); + CPPUNIT_ASSERT(handler.msg); + CPPUNIT_ASSERT_EQUAL(message, handler.msg); + } + + void test1ContentFrame(){ + MockHandler handler; + MessageBuilder builder(&handler); + + string data1("abcdefg"); + + Message::shared_ptr message( + new BasicMessage(0, "test", "my_routing_key", false, false, + MockChannel::basicGetBody())); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(7); + AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); + + builder.initialise(message); + CPPUNIT_ASSERT(!handler.msg); + builder.setHeader(header); + CPPUNIT_ASSERT(!handler.msg); + builder.addContent(part1); + CPPUNIT_ASSERT(handler.msg); + CPPUNIT_ASSERT_EQUAL(message, handler.msg); + } + + void test2ContentFrames(){ + MockHandler handler; + MessageBuilder builder(&handler); + + string data1("abcdefg"); + string data2("hijklmn"); + + Message::shared_ptr message( + new BasicMessage(0, "test", "my_routing_key", false, false, + MockChannel::basicGetBody())); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); + AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); + + builder.initialise(message); + CPPUNIT_ASSERT(!handler.msg); + builder.setHeader(header); + CPPUNIT_ASSERT(!handler.msg); + builder.addContent(part1); + CPPUNIT_ASSERT(!handler.msg); + builder.addContent(part2); + CPPUNIT_ASSERT(handler.msg); + CPPUNIT_ASSERT_EQUAL(message, handler.msg); + } + + void testStaging(){ + //store must be the last thing to be destroyed or destructor + //of Message fails (it uses the store to call destroy if lazy + //loaded content is in use) + TestMessageStore store(14); + { + MockHandler handler; + MessageBuilder builder(&handler, &store, 5); + + string data1("abcdefg"); + string data2("hijklmn"); + + Message::shared_ptr message( + new BasicMessage(0, "test", "my_routing_key", false, false, + MockChannel::basicGetBody())); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); + properties->setMessageId("MyMessage"); + properties->getHeaders().setString("abc", "xyz"); + + AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); + AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); + + builder.initialise(message); + builder.setHeader(header); + builder.addContent(part1); + builder.addContent(part2); + CPPUNIT_ASSERT(handler.msg); + CPPUNIT_ASSERT_EQUAL(message, handler.msg); + + BasicMessage::shared_ptr restored = store.getRestoredMessage(); + CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange()); + CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey()); + CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId()); + CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"), + restored->getHeaderProperties()->getHeaders().getString("abc")); + CPPUNIT_ASSERT_EQUAL((uint64_t) 14, restored->contentSize()); + } + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(MessageBuilderTest); diff --git a/cpp/src/tests/MessageHandlerTest.cpp b/cpp/src/tests/MessageHandlerTest.cpp new file mode 100644 index 0000000000..277c0fc4b9 --- /dev/null +++ b/cpp/src/tests/MessageHandlerTest.cpp @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +//#include <iostream> +//#include <AMQP_HighestVersion.h> +#include "../framing/amqp_framing.h" +#include "qpid_test_plugin.h" + +#include "../broker/BrokerAdapter.h" + +using namespace qpid::framing; +using namespace qpid::broker; + +class MessageHandlerTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(MessageHandlerTest); + CPPUNIT_TEST(testOpenMethod); + CPPUNIT_TEST_SUITE_END(); +private: + +public: + + MessageHandlerTest() + { + } + + void testOpenMethod() + { + //AMQFrame frame(highestProtocolVersion, 0, method); + //TestBodyHandler handler(method); + //handler.handleBody(frame.getBody()); + } + +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(MessageHandlerTest); + diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp new file mode 100644 index 0000000000..136c6f2d8d --- /dev/null +++ b/cpp/src/tests/MessageTest.cpp @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../broker/BrokerMessage.h" +#include "qpid_test_plugin.h" +#include <iostream> +#include "AMQP_HighestVersion.h" +#include "../framing/AMQFrame.h" +#include "MockChannel.h" + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; + +class MessageTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(MessageTest); + CPPUNIT_TEST(testEncodeDecode); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testEncodeDecode() + { + string exchange = "MyExchange"; + string routingKey = "MyRoutingKey"; + string messageId = "MyMessage"; + string data1("abcdefg"); + string data2("hijklmn"); + + BasicMessage::shared_ptr msg( + new BasicMessage(0, exchange, routingKey, false, false, + MockChannel::basicGetBody())); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); + AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); + msg->setHeader(header); + msg->addContent(part1); + msg->addContent(part2); + + msg->getHeaderProperties()->setMessageId(messageId); + msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); + msg->getHeaderProperties()->getHeaders().setString("abc", "xyz"); + + Buffer buffer(msg->encodedSize()); + msg->encode(buffer); + buffer.flip(); + + msg.reset(new BasicMessage()); + msg->decode(buffer); + CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange()); + CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey()); + CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId()); + CPPUNIT_ASSERT_EQUAL(PERSISTENT, msg->getHeaderProperties()->getDeliveryMode()); + CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc")); + CPPUNIT_ASSERT_EQUAL((uint64_t) 14, msg->contentSize()); + + MockChannel channel(1); + msg->deliver(channel, "ignore", 0, 100); + CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size()); + AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(channel.out.frames[2].getBody())); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(MessageTest); + diff --git a/cpp/src/tests/MockChannel.h b/cpp/src/tests/MockChannel.h new file mode 100644 index 0000000000..e47d591a9e --- /dev/null +++ b/cpp/src/tests/MockChannel.h @@ -0,0 +1,70 @@ +#ifndef _tests_MockChannel_h +#define _tests_MockChannel_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "../framing/MethodContext.h" +#include "../framing/ChannelAdapter.h" +#include "../framing/OutputHandler.h" +#include "../framing/AMQFrame.h" +#include "BasicGetBody.h" +#include <boost/shared_ptr.hpp> +#include <boost/ptr_container/ptr_vector.hpp> + +/** Mock output handler to collect frames */ +struct MockOutputHandler : public qpid::framing::OutputHandler { + boost::ptr_vector<qpid::framing::AMQFrame> frames; + void send(qpid::framing::AMQFrame* frame){ frames.push_back(frame); } +}; + +/** + * Combination mock OutputHandler and ChannelAdapter for tests. + */ +struct MockChannel : public qpid::framing::ChannelAdapter +{ + typedef qpid::framing::BasicGetBody Body; + static Body::shared_ptr basicGetBody() { + return Body::shared_ptr( + new Body(qpid::framing::ProtocolVersion())); + } + + MockOutputHandler out; + + MockChannel(qpid::framing::ChannelId id) { + init(id, out, qpid::framing::ProtocolVersion()); + } + + bool isOpen() const { return true; } + + void handleHeader( + boost::shared_ptr<qpid::framing::AMQHeaderBody> b) { send(b); } + void handleContent( + boost::shared_ptr<qpid::framing::AMQContentBody> b) { send(b); } + void handleHeartbeat( + boost::shared_ptr<qpid::framing::AMQHeartbeatBody> b) { send(b); } + void handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const qpid::framing::MethodContext& context) + { + context.channel->send(method); + }; + +}; + +#endif /*!_tests_MockChannel_h*/ diff --git a/cpp/src/tests/MockConnectionInputHandler.h b/cpp/src/tests/MockConnectionInputHandler.h new file mode 100644 index 0000000000..4503ac33a5 --- /dev/null +++ b/cpp/src/tests/MockConnectionInputHandler.h @@ -0,0 +1,113 @@ +#ifndef _tests_MockConnectionInputHandler_h +#define _tests_MockConnectionInputHandler_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "../sys/ConnectionInputHandler.h" +#include "../sys/ConnectionInputHandlerFactory.h" +#include "../sys/Monitor.h" +#include "../framing/ProtocolInitiation.h" + +struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler { + + MockConnectionInputHandler() : state(START) {} + + ~MockConnectionInputHandler() {} + + void initiated(const qpid::framing::ProtocolInitiation& pi) { + qpid::sys::Monitor::ScopedLock l(monitor); + init = pi; + setState(GOT_INIT); + } + + void received(qpid::framing::AMQFrame* framep) { + qpid::sys::Monitor::ScopedLock l(monitor); + frame = *framep; + setState(GOT_FRAME); + } + + qpid::framing::ProtocolInitiation waitForProtocolInit() { + waitFor(GOT_INIT); + return init; + } + + qpid::framing::AMQFrame waitForFrame() { + waitFor(GOT_FRAME); + return frame; + } + + void waitForClosed() { + waitFor(CLOSED); + } + + void closed() { + qpid::sys::Monitor::ScopedLock l(monitor); + setState(CLOSED); + } + + void idleOut() {} + void idleIn() {} + + private: + typedef enum { START, GOT_INIT, GOT_FRAME, CLOSED } State; + + void setState(State s) { + state = s; + monitor.notify(); + } + + void waitFor(State s) { + qpid::sys::Monitor::ScopedLock l(monitor); + qpid::sys::Time deadline = qpid::sys::now() + 10*qpid::sys::TIME_SEC; + while (state != s) + CPPUNIT_ASSERT(monitor.wait(deadline)); + } + + qpid::sys::Monitor monitor; + State state; + qpid::framing::ProtocolInitiation init; + qpid::framing::AMQFrame frame; +}; + + +struct MockConnectionInputHandlerFactory : public qpid::sys::ConnectionInputHandlerFactory { + MockConnectionInputHandlerFactory() : handler(0) {} + + qpid::sys::ConnectionInputHandler* create(qpid::sys::ConnectionOutputHandler*) { + qpid::sys::Monitor::ScopedLock lock(monitor); + handler = new MockConnectionInputHandler(); + monitor.notifyAll(); + return handler; + } + + void waitForHandler() { + qpid::sys::Monitor::ScopedLock lock(monitor); + qpid::sys::Time deadline = + qpid::sys::now() + 500 * qpid::sys::TIME_SEC; + while (handler == 0) + CPPUNIT_ASSERT(monitor.wait(deadline)); + } + + MockConnectionInputHandler* handler; + qpid::sys::Monitor monitor; +}; + + + +#endif /*!_tests_MockConnectionInputHandler_h*/ diff --git a/cpp/src/tests/ProducerConsumerTest.cpp b/cpp/src/tests/ProducerConsumerTest.cpp new file mode 100644 index 0000000000..f2f3ab689a --- /dev/null +++ b/cpp/src/tests/ProducerConsumerTest.cpp @@ -0,0 +1,284 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <vector> +#include <iostream> + +#include <boost/bind.hpp> + +#include "qpid_test_plugin.h" +#include "InProcessBroker.h" +#include "../sys/ProducerConsumer.h" +#include "../sys/Thread.h" +#include "AMQP_HighestVersion.h" +#include "../sys/AtomicCount.h" + +using namespace qpid; +using namespace sys; +using namespace framing; +using namespace boost; +using namespace std; + +/** A counter that notifies a monitor when changed */ +class WatchedCounter : public Monitor { + public: + WatchedCounter(int i=0) : count(i) {} + WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {} + + WatchedCounter& operator=(const WatchedCounter& x) { + return *this = int(x); + } + + WatchedCounter& operator=(int i) { + Lock l(*this); + count = i; + return *this; + } + + int operator++() { + Lock l(*this); + notifyAll(); + return ++count; + } + + int operator++(int) { + Lock l(*this); + notifyAll(); + return count++; + } + + bool operator==(int i) const { + Lock l(const_cast<WatchedCounter&>(*this)); + return i == count; + } + + operator int() const { + Lock l(const_cast<WatchedCounter&>(*this)); + return count; + } + + bool waitFor(int i, Time timeout=TIME_SEC) { + Lock l(*this); + Time deadline = timeout+now(); + while (count != i) { + if (!wait(deadline)) + return false; + } + assert(count == i); + return true; + } + + private: + typedef Mutex::ScopedLock Lock; + int count; +}; + +class ProducerConsumerTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ProducerConsumerTest); + CPPUNIT_TEST(testProduceConsume); + CPPUNIT_TEST(testTimeout); + CPPUNIT_TEST(testShutdown); + CPPUNIT_TEST(testCancel); + CPPUNIT_TEST_SUITE_END(); + + public: + client::InProcessBrokerClient client; + ProducerConsumer pc; + + WatchedCounter shutdown; + WatchedCounter timeout; + WatchedCounter consumed; + WatchedCounter produced; + + struct ConsumeRunnable : public Runnable { + ProducerConsumerTest& test; + ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {} + void run() { test.consume(); } + }; + + struct ConsumeTimeoutRunnable : public Runnable { + ProducerConsumerTest& test; + Time timeout; + ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Time& t) + : test(test_), timeout(t) {} + void run() { test.consumeTimeout(timeout); } + }; + + + void consumeInternal(ProducerConsumer::ConsumerLock& consumer) { + if (pc.isShutdown()) { + ++shutdown; + return; + } + if (consumer.isTimedOut()) { + ++timeout; + return; + } + CPPUNIT_ASSERT(consumer.isOk()); + CPPUNIT_ASSERT(pc.available() > 0); + consumer.confirm(); + consumed++; + } + + void consume() { + ProducerConsumer::ConsumerLock consumer(pc); + consumeInternal(consumer); + }; + + void consumeTimeout(const Time& timeout) { + ProducerConsumer::ConsumerLock consumer(pc, timeout); + consumeInternal(consumer); + }; + + void produce() { + ProducerConsumer::ProducerLock producer(pc); + CPPUNIT_ASSERT(producer.isOk()); + producer.confirm(); + produced++; + } + + void join(vector<Thread>& threads) { + for_each(threads.begin(), threads.end(), bind(&Thread::join,_1)); + } + + vector<Thread> startThreads(size_t n, Runnable& runnable) { + vector<Thread> threads(n); + while (n > 0) + threads[--n] = Thread(runnable); + return threads; + } + +public: + ProducerConsumerTest() : client() {} + + void testProduceConsume() { + ConsumeRunnable runMe(*this); + produce(); + produce(); + CPPUNIT_ASSERT(produced.waitFor(2)); + vector<Thread> threads = startThreads(1, runMe); + CPPUNIT_ASSERT(consumed.waitFor(1)); + join(threads); + + threads = startThreads(1, runMe); + CPPUNIT_ASSERT(consumed.waitFor(2)); + join(threads); + + threads = startThreads(3, runMe); + produce(); + produce(); + CPPUNIT_ASSERT(consumed.waitFor(4)); + produce(); + CPPUNIT_ASSERT(consumed.waitFor(5)); + join(threads); + CPPUNIT_ASSERT_EQUAL(0, int(shutdown)); + } + + void testTimeout() { + try { + // 0 timeout no items available throws exception + ProducerConsumer::ConsumerLock consumer(pc, 0); + CPPUNIT_FAIL("Expected exception"); + } catch(...){} + + produce(); + CPPUNIT_ASSERT(produced.waitFor(1)); + CPPUNIT_ASSERT_EQUAL(1, int(pc.available())); + { + // 0 timeout succeeds if there's an item available. + ProducerConsumer::ConsumerLock consume(pc, 0); + CPPUNIT_ASSERT(consume.isOk()); + consume.confirm(); + } + CPPUNIT_ASSERT_EQUAL(0, int(pc.available())); + + // Produce an item within the timeout. + ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC); + vector<Thread> threads = startThreads(1, runMe); + produce(); + CPPUNIT_ASSERT(consumed.waitFor(1)); + join(threads); + } + + + void testShutdown() { + ConsumeRunnable runMe(*this); + vector<Thread> threads = startThreads(2, runMe); + while (pc.consumers() != 2) + Thread::yield(); + pc.shutdown(); + CPPUNIT_ASSERT(shutdown.waitFor(2)); + join(threads); + + threads = startThreads(1, runMe); // Should shutdown immediately. + CPPUNIT_ASSERT(shutdown.waitFor(3)); + join(threads); + + // Produce/consume while shutdown should return isShutdown and + // throw on confirm. + try { + ProducerConsumer::ProducerLock p(pc); + CPPUNIT_ASSERT(pc.isShutdown()); + CPPUNIT_FAIL("Expected exception"); + } + catch (...) {} // Expected + try { + ProducerConsumer::ConsumerLock c(pc); + CPPUNIT_ASSERT(pc.isShutdown()); + CPPUNIT_FAIL("Expected exception"); + } + catch (...) {} // Expected + } + + void testCancel() { + CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available()); + { + ProducerConsumer::ProducerLock p(pc); + CPPUNIT_ASSERT(p.isOk()); + p.cancel(); + } + // Nothing was produced. + CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available()); + { + ProducerConsumer::ConsumerLock c(pc, 0); + CPPUNIT_ASSERT(c.isTimedOut()); + } + // Now produce but cancel the consume + { + ProducerConsumer::ProducerLock p(pc); + CPPUNIT_ASSERT(p.isOk()); + p.confirm(); + } + CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available()); + { + ProducerConsumer::ConsumerLock c(pc); + CPPUNIT_ASSERT(c.isOk()); + c.cancel(); + } + CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available()); + } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest); + diff --git a/cpp/src/tests/QueuePolicyTest.cpp b/cpp/src/tests/QueuePolicyTest.cpp new file mode 100644 index 0000000000..5ccc9417cd --- /dev/null +++ b/cpp/src/tests/QueuePolicyTest.cpp @@ -0,0 +1,89 @@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../broker/QueuePolicy.h" +#include "qpid_test_plugin.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +class QueuePolicyTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(QueuePolicyTest); + CPPUNIT_TEST(testCount); + CPPUNIT_TEST(testSize); + CPPUNIT_TEST(testBoth); + CPPUNIT_TEST(testSettings); + CPPUNIT_TEST_SUITE_END(); + + public: + void testCount(){ + QueuePolicy policy(5, 0); + CPPUNIT_ASSERT(!policy.limitExceeded()); + for (int i = 0; i < 5; i++) policy.enqueued(10); + CPPUNIT_ASSERT_EQUAL((uint64_t) 0, policy.getMaxSize()); + CPPUNIT_ASSERT_EQUAL((uint32_t) 5, policy.getMaxCount()); + CPPUNIT_ASSERT(!policy.limitExceeded()); + policy.enqueued(10); + CPPUNIT_ASSERT(policy.limitExceeded()); + policy.dequeued(10); + CPPUNIT_ASSERT(!policy.limitExceeded()); + policy.enqueued(10); + CPPUNIT_ASSERT(policy.limitExceeded()); + } + + void testSize(){ + QueuePolicy policy(0, 50); + for (int i = 0; i < 5; i++) policy.enqueued(10); + CPPUNIT_ASSERT(!policy.limitExceeded()); + policy.enqueued(10); + CPPUNIT_ASSERT(policy.limitExceeded()); + policy.dequeued(10); + CPPUNIT_ASSERT(!policy.limitExceeded()); + policy.enqueued(10); + CPPUNIT_ASSERT(policy.limitExceeded()); + } + + void testBoth(){ + QueuePolicy policy(5, 50); + for (int i = 0; i < 5; i++) policy.enqueued(11); + CPPUNIT_ASSERT(policy.limitExceeded()); + policy.dequeued(20); + CPPUNIT_ASSERT(!policy.limitExceeded());//fails + policy.enqueued(5); + policy.enqueued(10); + CPPUNIT_ASSERT(policy.limitExceeded()); + } + + void testSettings(){ + //test reading and writing the policy from/to field table + FieldTable settings; + QueuePolicy a(101, 303); + a.update(settings); + QueuePolicy b(settings); + CPPUNIT_ASSERT_EQUAL(a.getMaxCount(), b.getMaxCount()); + CPPUNIT_ASSERT_EQUAL(a.getMaxSize(), b.getMaxSize()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(QueuePolicyTest); + diff --git a/cpp/src/tests/QueueRegistryTest.cpp b/cpp/src/tests/QueueRegistryTest.cpp new file mode 100644 index 0000000000..d01fbd0ad4 --- /dev/null +++ b/cpp/src/tests/QueueRegistryTest.cpp @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "../broker/QueueRegistry.h" +#include "qpid_test_plugin.h" +#include <string> + +using namespace qpid::broker; + +class QueueRegistryTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(QueueRegistryTest); + CPPUNIT_TEST(testDeclare); + CPPUNIT_TEST(testDeclareTmp); + CPPUNIT_TEST(testFind); + CPPUNIT_TEST(testDestroy); + CPPUNIT_TEST_SUITE_END(); + + private: + std::string foo, bar; + QueueRegistry reg; + std::pair<Queue::shared_ptr, bool> qc; + + public: + void setUp() { + foo = "foo"; + bar = "bar"; + } + + void testDeclare() { + qc = reg.declare(foo, false, 0, 0); + Queue::shared_ptr q = qc.first; + CPPUNIT_ASSERT(q); + CPPUNIT_ASSERT(qc.second); // New queue + CPPUNIT_ASSERT_EQUAL(foo, q->getName()); + + qc = reg.declare(foo, false, 0, 0); + CPPUNIT_ASSERT_EQUAL(q, qc.first); + CPPUNIT_ASSERT(!qc.second); + + qc = reg.declare(bar, false, 0, 0); + q = qc.first; + CPPUNIT_ASSERT(q); + CPPUNIT_ASSERT_EQUAL(true, qc.second); + CPPUNIT_ASSERT_EQUAL(bar, q->getName()); + } + + void testDeclareTmp() + { + qc = reg.declare(std::string(), false, 0, 0); + CPPUNIT_ASSERT(qc.second); + CPPUNIT_ASSERT_EQUAL(std::string("tmp_1"), qc.first->getName()); + } + + void testFind() { + CPPUNIT_ASSERT(reg.find(foo) == 0); + + reg.declare(foo, false, 0, 0); + reg.declare(bar, false, 0, 0); + Queue::shared_ptr q = reg.find(bar); + CPPUNIT_ASSERT(q); + CPPUNIT_ASSERT_EQUAL(bar, q->getName()); + } + + void testDestroy() { + qc = reg.declare(foo, false, 0, 0); + reg.destroy(foo); + // Queue is gone from the registry. + CPPUNIT_ASSERT(reg.find(foo) == 0); + // Queue is not actually destroyed till we drop our reference. + CPPUNIT_ASSERT_EQUAL(foo, qc.first->getName()); + // We shoud be the only reference. + CPPUNIT_ASSERT_EQUAL(1L, qc.first.use_count()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(QueueRegistryTest); diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp new file mode 100644 index 0000000000..bb2b424375 --- /dev/null +++ b/cpp/src/tests/QueueTest.cpp @@ -0,0 +1,149 @@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../broker/BrokerQueue.h" +#include "../broker/QueueRegistry.h" +#include "qpid_test_plugin.h" +#include <iostream> +#include "MockChannel.h" + +using namespace qpid::broker; +using namespace qpid::sys; + + +class TestConsumer : public virtual Consumer{ +public: + Message::shared_ptr last; + + virtual bool deliver(Message::shared_ptr& msg); +}; + + +class QueueTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(QueueTest); + CPPUNIT_TEST(testConsumers); + CPPUNIT_TEST(testRegistry); + CPPUNIT_TEST(testDequeue); + CPPUNIT_TEST_SUITE_END(); + + public: + Message::shared_ptr message(std::string exchange, std::string routingKey) { + return Message::shared_ptr( + new BasicMessage(0, exchange, routingKey, true, true, + MockChannel::basicGetBody())); + } + + void testConsumers(){ + Queue::shared_ptr queue(new Queue("my_queue", true)); + + //Test adding consumers: + TestConsumer c1; + TestConsumer c2; + queue->consume(&c1); + queue->consume(&c2); + + CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getConsumerCount()); + + //Test basic delivery: + Message::shared_ptr msg1 = message("e", "A"); + Message::shared_ptr msg2 = message("e", "B"); + Message::shared_ptr msg3 = message("e", "C"); + + queue->deliver(msg1); + CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); + + queue->deliver(msg2); + CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get()); + + queue->deliver(msg3); + CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get()); + + //Test cancellation: + queue->cancel(&c1); + CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getConsumerCount()); + queue->cancel(&c2); + CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getConsumerCount()); + } + + void testRegistry(){ + //Test use of queues in registry: + QueueRegistry registry; + registry.declare("queue1", true, true); + registry.declare("queue2", true, true); + registry.declare("queue3", true, true); + + CPPUNIT_ASSERT(registry.find("queue1")); + CPPUNIT_ASSERT(registry.find("queue2")); + CPPUNIT_ASSERT(registry.find("queue3")); + + registry.destroy("queue1"); + registry.destroy("queue2"); + registry.destroy("queue3"); + + CPPUNIT_ASSERT(!registry.find("queue1")); + CPPUNIT_ASSERT(!registry.find("queue2")); + CPPUNIT_ASSERT(!registry.find("queue3")); + } + + void testDequeue(){ + Queue::shared_ptr queue(new Queue("my_queue", true)); + Message::shared_ptr msg1 = message("e", "A"); + Message::shared_ptr msg2 = message("e", "B"); + Message::shared_ptr msg3 = message("e", "C"); + Message::shared_ptr received; + + queue->deliver(msg1); + queue->deliver(msg2); + queue->deliver(msg3); + + CPPUNIT_ASSERT_EQUAL(uint32_t(3), queue->getMessageCount()); + + received = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get()); + CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getMessageCount()); + + received = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get()); + CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount()); + + TestConsumer consumer; + queue->consume(&consumer); + queue->dispatch(); + CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get()); + CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount()); + + received = queue->dequeue(); + CPPUNIT_ASSERT(!received); + CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount()); + + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest); + +//TestConsumer +bool TestConsumer::deliver(Message::shared_ptr& msg){ + last = msg; + return true; +} + diff --git a/cpp/src/tests/ReferenceTest.cpp b/cpp/src/tests/ReferenceTest.cpp new file mode 100644 index 0000000000..b179ab8fdd --- /dev/null +++ b/cpp/src/tests/ReferenceTest.cpp @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <memory> +#include "qpid_test_plugin.h" +#include "../broker/Reference.h" +#include "../broker/BrokerMessageMessage.h" +#include "MessageTransferBody.h" +#include "MessageAppendBody.h" +#include "../broker/CompletionHandler.h" + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace std; + +class ReferenceTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ReferenceTest); + CPPUNIT_TEST(testRegistry); + CPPUNIT_TEST(testReference); + CPPUNIT_TEST_SUITE_END(); + + ProtocolVersion v; + ReferenceRegistry registry; + Reference::shared_ptr r1; + MessageTransferBody::shared_ptr t1, t2; + MessageMessage::shared_ptr m1, m2; + MessageAppendBody::shared_ptr a1, a2; + public: + + ReferenceTest() : + r1(registry.open("bar")), + t1(new MessageTransferBody(v)), + t2(new MessageTransferBody(v)), + m1(new MessageMessage(0, 1, t1, r1)), + m2(new MessageMessage(0, 2, t2, r1)), + a1(new MessageAppendBody(v)), + a2(new MessageAppendBody(v)) + {} + + void testRegistry() { + Reference::shared_ptr ref = registry.open("foo"); + CPPUNIT_ASSERT_EQUAL(string("foo"), ref->getId()); + CPPUNIT_ASSERT(ref == registry.get("foo")); + try { + registry.get("none"); + CPPUNIT_FAIL("Expected exception"); + } catch (...) {} + try { + registry.open("foo"); + CPPUNIT_FAIL("Expected exception"); + } catch(...) {} + } + + void testReference() { + r1->addMessage(m1); + r1->addMessage(m2); + CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getMessages().size()); + r1->append(a1); + r1->append(a2); + CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size()); + const vector<MessageMessage::shared_ptr> messages = r1->getMessages(); + r1->close(); + try { + registry.open("bar"); + CPPUNIT_FAIL("Expected exception"); + } catch(...) {} + + CPPUNIT_ASSERT_EQUAL(messages[0], m1); + CPPUNIT_ASSERT_EQUAL(messages[0]->getReference()->getAppends()[0], a1); + CPPUNIT_ASSERT_EQUAL(messages[0]->getReference()->getAppends()[1], a2); + + CPPUNIT_ASSERT_EQUAL(messages[1], m2); + CPPUNIT_ASSERT_EQUAL(messages[1]->getReference()->getAppends()[0], a1); + CPPUNIT_ASSERT_EQUAL(messages[1]->getReference()->getAppends()[1], a2); + } + + +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ReferenceTest); diff --git a/cpp/src/tests/TopicExchangeTest.cpp b/cpp/src/tests/TopicExchangeTest.cpp new file mode 100644 index 0000000000..39035c776f --- /dev/null +++ b/cpp/src/tests/TopicExchangeTest.cpp @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "../broker/TopicExchange.h" +#include "qpid_test_plugin.h" + +using namespace qpid::broker; + +Tokens makeTokens(char** begin, char** end) +{ + Tokens t; + t.insert(t.end(), begin, end); + return t; +} + +// Calculate size of an array. +#define LEN(a) (sizeof(a)/sizeof(a[0])) + +// Convert array to token vector +#define TOKENS(a) makeTokens(a, a + LEN(a)) + +// Allow CPPUNIT_EQUALS to print a Tokens. +CppUnit::OStringStream& operator <<(CppUnit::OStringStream& out, const Tokens& v) +{ + out << "[ "; + for (Tokens::const_iterator i = v.begin(); + i != v.end(); ++i) + { + out << '"' << *i << '"' << (i+1 == v.end() ? "]" : ", "); + } + return out; +} + + +class TokensTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(TokensTest); + CPPUNIT_TEST(testTokens); + CPPUNIT_TEST_SUITE_END(); + + public: + void testTokens() + { + Tokens tokens("hello.world"); + char* expect[] = {"hello", "world"}; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect), tokens); + + tokens = "a.b.c"; + char* expect2[] = { "a", "b", "c" }; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect2), tokens); + + tokens = ""; + CPPUNIT_ASSERT(tokens.empty()); + + tokens = "x"; + char* expect3[] = { "x" }; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect3), tokens); + + tokens = (".x"); + char* expect4[] = { "", "x" }; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect4), tokens); + + tokens = ("x."); + char* expect5[] = { "x", "" }; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect5), tokens); + + tokens = ("."); + char* expect6[] = { "", "" }; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect6), tokens); + + tokens = (".."); + char* expect7[] = { "", "", "" }; + CPPUNIT_ASSERT_EQUAL(TOKENS(expect7), tokens); + } + +}; + +#define ASSERT_NORMALIZED(expect, pattern) \ + CPPUNIT_ASSERT_EQUAL(Tokens(expect), static_cast<Tokens>(TopicPattern(pattern))) +class TopicPatternTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(TopicPatternTest); + CPPUNIT_TEST(testNormalize); + CPPUNIT_TEST(testPlain); + CPPUNIT_TEST(testStar); + CPPUNIT_TEST(testHash); + CPPUNIT_TEST(testMixed); + CPPUNIT_TEST(testCombo); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testNormalize() + { + CPPUNIT_ASSERT(TopicPattern("").empty()); + ASSERT_NORMALIZED("a.b.c", "a.b.c"); + ASSERT_NORMALIZED("a.*.c", "a.*.c"); + ASSERT_NORMALIZED("#", "#"); + ASSERT_NORMALIZED("#", "#.#.#.#"); + ASSERT_NORMALIZED("*.*.*.#", "#.*.#.*.#.#.*"); + ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*.#"); + ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*"); + } + + void testPlain() { + TopicPattern p("ab.cd.e"); + CPPUNIT_ASSERT(p.match("ab.cd.e")); + CPPUNIT_ASSERT(!p.match("abx.cd.e")); + CPPUNIT_ASSERT(!p.match("ab.cd")); + CPPUNIT_ASSERT(!p.match("ab.cd..e.")); + CPPUNIT_ASSERT(!p.match("ab.cd.e.")); + CPPUNIT_ASSERT(!p.match(".ab.cd.e")); + + p = ""; + CPPUNIT_ASSERT(p.match("")); + + p = "."; + CPPUNIT_ASSERT(p.match(".")); + } + + + void testStar() + { + TopicPattern p("a.*.b"); + CPPUNIT_ASSERT(p.match("a.xx.b")); + CPPUNIT_ASSERT(!p.match("a.b")); + + p = "*.x"; + CPPUNIT_ASSERT(p.match("y.x")); + CPPUNIT_ASSERT(p.match(".x")); + CPPUNIT_ASSERT(!p.match("x")); + + p = "x.x.*"; + CPPUNIT_ASSERT(p.match("x.x.y")); + CPPUNIT_ASSERT(p.match("x.x.")); + CPPUNIT_ASSERT(!p.match("x.x")); + CPPUNIT_ASSERT(!p.match("q.x.y")); + } + + void testHash() + { + TopicPattern p("a.#.b"); + CPPUNIT_ASSERT(p.match("a.b")); + CPPUNIT_ASSERT(p.match("a.x.b")); + CPPUNIT_ASSERT(p.match("a..x.y.zz.b")); + CPPUNIT_ASSERT(!p.match("a.b.")); + CPPUNIT_ASSERT(!p.match("q.x.b")); + + p = "a.#"; + CPPUNIT_ASSERT(p.match("a")); + CPPUNIT_ASSERT(p.match("a.b")); + CPPUNIT_ASSERT(p.match("a.b.c")); + + p = "#.a"; + CPPUNIT_ASSERT(p.match("a")); + CPPUNIT_ASSERT(p.match("x.y.a")); + } + + void testMixed() + { + TopicPattern p("*.x.#.y"); + CPPUNIT_ASSERT(p.match("a.x.y")); + CPPUNIT_ASSERT(p.match("a.x.p.qq.y")); + CPPUNIT_ASSERT(!p.match("a.a.x.y")); + CPPUNIT_ASSERT(!p.match("aa.x.b.c")); + + p = "a.#.b.*"; + CPPUNIT_ASSERT(p.match("a.b.x")); + CPPUNIT_ASSERT(p.match("a.x.x.x.b.x")); + } + + void testCombo() { + TopicPattern p("*.#.#.*.*.#"); + CPPUNIT_ASSERT(p.match("x.y.z")); + CPPUNIT_ASSERT(p.match("x.y.z.a.b.c")); + CPPUNIT_ASSERT(!p.match("x.y")); + CPPUNIT_ASSERT(!p.match("x")); + } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TopicPatternTest); +CPPUNIT_TEST_SUITE_REGISTRATION(TokensTest); diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp new file mode 100644 index 0000000000..91e07a3faa --- /dev/null +++ b/cpp/src/tests/TxAckTest.cpp @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../broker/NullMessageStore.h" +#include "../broker/RecoveryManager.h" +#include "../broker/TxAck.h" +#include "qpid_test_plugin.h" +#include <iostream> +#include <list> +#include <vector> +#include "MockChannel.h" + +using std::list; +using std::vector; +using namespace qpid::broker; +using namespace qpid::framing; + +class TxAckTest : public CppUnit::TestCase +{ + + class TestMessageStore : public NullMessageStore + { + public: + vector<PersistableMessage*> dequeued; + + void dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& /*queue*/) + { + dequeued.push_back(&msg); + } + + TestMessageStore() : NullMessageStore() {} + ~TestMessageStore(){} + }; + + CPPUNIT_TEST_SUITE(TxAckTest); + CPPUNIT_TEST(testPrepare); + CPPUNIT_TEST(testCommit); + CPPUNIT_TEST_SUITE_END(); + + + AccumulatedAck acked; + TestMessageStore store; + Queue::shared_ptr queue; + vector<Message::shared_ptr> messages; + list<DeliveryRecord> deliveries; + TxAck op; + + +public: + + TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) + { + for(int i = 0; i < 10; i++){ + Message::shared_ptr msg( + new BasicMessage(0, "exchange", "routing_key", false, false, + MockChannel::basicGetBody())); + msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); + msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); + messages.push_back(msg); + deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1))); + } + + //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not) + acked.range = 5; + acked.individual.push_back(7); + acked.individual.push_back(9); + } + + void testPrepare() + { + //ensure acked messages are discarded, i.e. dequeued from store + op.prepare(0); + CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size()); + CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size()); + int dequeued[] = {0, 1, 2, 3, 4, 6, 8}; + for (int i = 0; i < 7; i++) { + CPPUNIT_ASSERT_EQUAL((PersistableMessage*) messages[dequeued[i]].get(), store.dequeued[i]); + } + } + + void testCommit() + { + //emsure acked messages are removed from list + op.commit(); + CPPUNIT_ASSERT_EQUAL((size_t) 3, deliveries.size()); + list<DeliveryRecord>::iterator i = deliveries.begin(); + CPPUNIT_ASSERT(i->matches(6));//msg 6 + CPPUNIT_ASSERT((++i)->matches(8));//msg 8 + CPPUNIT_ASSERT((++i)->matches(10));//msg 10 + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TxAckTest); + diff --git a/cpp/src/tests/TxBufferTest.cpp b/cpp/src/tests/TxBufferTest.cpp new file mode 100644 index 0000000000..0d1fe7a04b --- /dev/null +++ b/cpp/src/tests/TxBufferTest.cpp @@ -0,0 +1,269 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../broker/TxBuffer.h" +#include "qpid_test_plugin.h" +#include <iostream> +#include <vector> + +using namespace qpid::broker; + +template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){ + unsigned int i = 0; + while(i < expected.size() && i < actual.size()){ + CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]); + i++; + } + CPPUNIT_ASSERT(i == expected.size()); + CPPUNIT_ASSERT(i == actual.size()); +} + +class TxBufferTest : public CppUnit::TestCase +{ + class MockTxOp : public TxOp{ + enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8}; + std::vector<int> expected; + std::vector<int> actual; + bool failOnPrepare; + public: + MockTxOp() : failOnPrepare(false) {} + MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {} + + bool prepare(TransactionContext*) throw(){ + actual.push_back(PREPARE); + return !failOnPrepare; + } + void commit() throw(){ + actual.push_back(COMMIT); + } + void rollback() throw(){ + actual.push_back(ROLLBACK); + } + MockTxOp& expectPrepare(){ + expected.push_back(PREPARE); + return *this; + } + MockTxOp& expectCommit(){ + expected.push_back(COMMIT); + return *this; + } + MockTxOp& expectRollback(){ + expected.push_back(ROLLBACK); + return *this; + } + void check(){ + assertEqualVector(expected, actual); + } + ~MockTxOp(){} + }; + + class MockTransactionalStore : public TransactionalStore{ + enum op_codes {BEGIN=2, COMMIT=4, ABORT=8}; + std::vector<int> expected; + std::vector<int> actual; + + enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3}; + int state; + + class TestTransactionContext : public TransactionContext{ + MockTransactionalStore* store; + public: + TestTransactionContext(MockTransactionalStore* _store) : store(_store) {} + void commit(){ + if(store->state != OPEN) throw "txn already completed"; + store->state = COMMITTED; + } + + void abort(){ + if(store->state != OPEN) throw "txn already completed"; + store->state = ABORTED; + } + ~TestTransactionContext(){} + }; + + + public: + MockTransactionalStore() : state(OPEN){} + + std::auto_ptr<TPCTransactionContext> begin(const std::string&){ + throw "Operation not supported"; + } + void prepare(TPCTransactionContext&){ + throw "Operation not supported"; + } + + std::auto_ptr<TransactionContext> begin(){ + actual.push_back(BEGIN); + std::auto_ptr<TransactionContext> txn(new TestTransactionContext(this)); + return txn; + } + void commit(TransactionContext& ctxt){ + actual.push_back(COMMIT); + dynamic_cast<TestTransactionContext&>(ctxt).commit(); + } + void abort(TransactionContext& ctxt){ + actual.push_back(ABORT); + dynamic_cast<TestTransactionContext&>(ctxt).abort(); + } + MockTransactionalStore& expectBegin(){ + expected.push_back(BEGIN); + return *this; + } + MockTransactionalStore& expectCommit(){ + expected.push_back(COMMIT); + return *this; + } + MockTransactionalStore& expectAbort(){ + expected.push_back(ABORT); + return *this; + } + void check(){ + assertEqualVector(expected, actual); + } + + bool isCommitted(){ + return state == COMMITTED; + } + + bool isAborted(){ + return state == ABORTED; + } + + bool isOpen() const{ + return state == OPEN; + } + ~MockTransactionalStore(){} + }; + + CPPUNIT_TEST_SUITE(TxBufferTest); + CPPUNIT_TEST(testPrepareAndCommit); + CPPUNIT_TEST(testFailOnPrepare); + CPPUNIT_TEST(testRollback); + CPPUNIT_TEST(testBufferIsClearedAfterRollback); + CPPUNIT_TEST(testBufferIsClearedAfterCommit); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testPrepareAndCommit(){ + MockTransactionalStore store; + store.expectBegin().expectCommit(); + + MockTxOp opA; + opA.expectPrepare().expectCommit(); + MockTxOp opB; + opB.expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test reative order + MockTxOp opC; + opC.expectPrepare().expectCommit(); + + TxBuffer buffer; + buffer.enlist(&opA); + buffer.enlist(&opB); + buffer.enlist(&opB);//opB enlisted twice + buffer.enlist(&opC); + + CPPUNIT_ASSERT(buffer.prepare(&store)); + buffer.commit(); + store.check(); + CPPUNIT_ASSERT(store.isCommitted()); + opA.check(); + opB.check(); + opC.check(); + } + + void testFailOnPrepare(){ + MockTransactionalStore store; + store.expectBegin().expectAbort(); + + MockTxOp opA; + opA.expectPrepare(); + MockTxOp opB(true); + opB.expectPrepare(); + MockTxOp opC;//will never get prepare as b will fail + + TxBuffer buffer; + buffer.enlist(&opA); + buffer.enlist(&opB); + buffer.enlist(&opC); + + CPPUNIT_ASSERT(!buffer.prepare(&store)); + store.check(); + CPPUNIT_ASSERT(store.isAborted()); + opA.check(); + opB.check(); + opC.check(); + } + + void testRollback(){ + MockTxOp opA; + opA.expectRollback(); + MockTxOp opB(true); + opB.expectRollback(); + MockTxOp opC; + opC.expectRollback(); + + TxBuffer buffer; + buffer.enlist(&opA); + buffer.enlist(&opB); + buffer.enlist(&opC); + + buffer.rollback(); + opA.check(); + opB.check(); + opC.check(); + } + + void testBufferIsClearedAfterRollback(){ + MockTxOp opA; + opA.expectRollback(); + MockTxOp opB; + opB.expectRollback(); + + TxBuffer buffer; + buffer.enlist(&opA); + buffer.enlist(&opB); + + buffer.rollback(); + buffer.commit();//second call should not reach ops + opA.check(); + opB.check(); + } + + void testBufferIsClearedAfterCommit(){ + MockTxOp opA; + opA.expectCommit(); + MockTxOp opB; + opB.expectCommit(); + + TxBuffer buffer; + buffer.enlist(&opA); + buffer.enlist(&opB); + + buffer.commit(); + buffer.rollback();//second call should not reach ops + opA.check(); + opB.check(); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TxBufferTest); + diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp new file mode 100644 index 0000000000..84d2666b6c --- /dev/null +++ b/cpp/src/tests/TxPublishTest.cpp @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "../broker/NullMessageStore.h" +#include "../broker/RecoveryManager.h" +#include "../broker/TxPublish.h" +#include "qpid_test_plugin.h" +#include <iostream> +#include <list> +#include <vector> +#include "MockChannel.h" + +using std::list; +using std::pair; +using std::vector; +using namespace qpid::broker; +using namespace qpid::framing; + +class TxPublishTest : public CppUnit::TestCase +{ + typedef std::pair<string, PersistableMessage*> msg_queue_pair; + + class TestMessageStore : public NullMessageStore + { + public: + vector<msg_queue_pair> enqueued; + + void enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue) + { + enqueued.push_back(msg_queue_pair(queue.getName(), &msg)); + } + + //dont care about any of the other methods: + TestMessageStore() : NullMessageStore(false) {} + ~TestMessageStore(){} + }; + + CPPUNIT_TEST_SUITE(TxPublishTest); + CPPUNIT_TEST(testPrepare); + CPPUNIT_TEST(testCommit); + CPPUNIT_TEST_SUITE_END(); + + + TestMessageStore store; + Queue::shared_ptr queue1; + Queue::shared_ptr queue2; + Message::shared_ptr const msg; + TxPublish op; + +public: + + TxPublishTest() : + queue1(new Queue("queue1", false, &store, 0)), + queue2(new Queue("queue2", false, &store, 0)), + msg(new BasicMessage(0, "exchange", "routing_key", false, false, + MockChannel::basicGetBody())), + op(msg) + { + msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); + msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); + op.deliverTo(queue1); + op.deliverTo(queue2); + } + + void testPrepare() + { + //ensure messages are enqueued in store + op.prepare(0); + CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size()); + CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first); + CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[0].second); + CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first); + CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[1].second); + } + + void testCommit() + { + //ensure messages are delivered to queue + op.commit(); + CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue1->getMessageCount()); + CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue()); + + CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue2->getMessageCount()); + CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TxPublishTest); + diff --git a/cpp/src/tests/ValueTest.cpp b/cpp/src/tests/ValueTest.cpp new file mode 100644 index 0000000000..2d1fc45461 --- /dev/null +++ b/cpp/src/tests/ValueTest.cpp @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "../framing/Value.h" +#include "qpid_test_plugin.h" + +using namespace qpid::framing; + + +class ValueTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ValueTest); + CPPUNIT_TEST(testStringValueEquals); + CPPUNIT_TEST(testIntegerValueEquals); + CPPUNIT_TEST(testDecimalValueEquals); + CPPUNIT_TEST(testFieldTableValueEquals); + CPPUNIT_TEST_SUITE_END(); + + StringValue s; + IntegerValue i; + DecimalValue d; + FieldTableValue ft; + EmptyValue e; + + public: + ValueTest() : + s("abc"), + i(42), + d(1234,2) + + { + ft.getValue().setString("foo", "FOO"); + ft.getValue().setInt("magic", 7); + } + + void testStringValueEquals() + { + + CPPUNIT_ASSERT(StringValue("abc") == s); + CPPUNIT_ASSERT(s != StringValue("foo")); + CPPUNIT_ASSERT(s != e); + CPPUNIT_ASSERT(e != d); + CPPUNIT_ASSERT(e != ft); + } + + void testIntegerValueEquals() + { + CPPUNIT_ASSERT(IntegerValue(42) == i); + CPPUNIT_ASSERT(IntegerValue(5) != i); + CPPUNIT_ASSERT(i != e); + CPPUNIT_ASSERT(i != d); + } + + void testDecimalValueEquals() + { + CPPUNIT_ASSERT(DecimalValue(1234, 2) == d); + CPPUNIT_ASSERT(DecimalValue(12345, 2) != d); + CPPUNIT_ASSERT(DecimalValue(1234, 3) != d); + CPPUNIT_ASSERT(d != s); + } + + + void testFieldTableValueEquals() + { + CPPUNIT_ASSERT_EQUAL(std::string("FOO"), + ft.getValue().getString("foo")); + CPPUNIT_ASSERT_EQUAL(7, ft.getValue().getInt("magic")); + + FieldTableValue f2; + CPPUNIT_ASSERT(ft != f2); + f2.getValue().setString("foo", "FOO"); + CPPUNIT_ASSERT(ft != f2); + f2.getValue().setInt("magic", 7); + CPPUNIT_ASSERT_EQUAL(ft,f2); + CPPUNIT_ASSERT(ft == f2); + f2.getValue().setString("foo", "BAR"); + CPPUNIT_ASSERT(ft != f2); + CPPUNIT_ASSERT(ft != i); + } + +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ValueTest); + diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp new file mode 100644 index 0000000000..5c084302d8 --- /dev/null +++ b/cpp/src/tests/client_test.cpp @@ -0,0 +1,138 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * This file provides a simple test (and example) of basic + * functionality including declaring an exchange and a queue, binding + * these together, publishing a message and receiving that message + * asynchronously. + */ + +#include <iostream> + +#include "../QpidError.h" +#include "../client/ClientChannel.h" +#include "../client/Connection.h" +#include "../client/ClientMessage.h" +#include "../client/MessageListener.h" +#include "../sys/Monitor.h" +#include "../framing/FieldTable.h" + +using namespace qpid::client; +using namespace qpid::sys; +using std::string; + +bool verbose = false; + +/** + * A simple message listener implementation that prints out the + * message content then notifies a montitor allowing the test to + * complete. + */ +class SimpleListener : public virtual MessageListener{ + Monitor* monitor; + +public: + inline SimpleListener(Monitor* _monitor) : monitor(_monitor){} + + inline virtual void received(Message& msg){ + if (verbose) + std::cout << "Received message " << msg.getData() << std::endl; + monitor->notify(); + } +}; + +int main(int argc, char**) +{ + verbose = argc > 1; + try { + //Use a custom exchange + Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE); + //Use a named, temporary queue + Queue queue("MyQueue", true); + + + Connection con(verbose); + string host("localhost"); + con.open(host, 5672, "guest", "guest", "/test"); + if (verbose) + std::cout << "Opened connection." << std::endl; + + //Create and open a channel on the connection through which + //most functionality is exposed + Channel channel; + con.openChannel(channel); + if (verbose) std::cout << "Opened channel." << std::endl; + + //'declare' the exchange and the queue, which will create them + //as they don't exist + channel.declareExchange(exchange); + if (verbose) std::cout << "Declared exchange." << std::endl; + channel.declareQueue(queue); + if (verbose) std::cout << "Declared queue." << std::endl; + + //now bind the queue to the exchange + qpid::framing::FieldTable args; + channel.bind(exchange, queue, "MyTopic", args); + if (verbose) std::cout << "Bound queue to exchange." << std::endl; + + //Set up a message listener to receive any messages that + //arrive in our queue on the broker. We only expect one, and + //as it will be received on another thread, we create a + //montior to use to notify the main thread when that message + //is received. + Monitor monitor; + SimpleListener listener(&monitor); + string tag("MyTag"); + channel.consume(queue, tag, &listener); + if (verbose) std::cout << "Registered consumer." << std::endl; + + //we need to enable the message dispatching for this channel + //and we want that to occur on another thread so we call + //start(). + channel.start(); + + //Now we create and publish a message to our exchange with a + //routing key that will cause it to be routed to our queue + Message msg; + string data("MyMessage"); + msg.setData(data); + channel.publish(msg, exchange, "MyTopic"); + if (verbose) std::cout << "Published message: " << data << std::endl; + + { + Monitor::ScopedLock l(monitor); + //now we wait until we receive notification that the + //message was received + monitor.wait(); + } + + //close the channel & connection + channel.close(); + if (verbose) std::cout << "Closed channel." << std::endl; + con.close(); + if (verbose) std::cout << "Closed connection." << std::endl; + return 0; + } catch(const std::exception& e) { + std::cout << e.what() << std::endl; + } + return 1; +} diff --git a/cpp/src/tests/dlclose_noop.c b/cpp/src/tests/dlclose_noop.c new file mode 100644 index 0000000000..ba2fa75891 --- /dev/null +++ b/cpp/src/tests/dlclose_noop.c @@ -0,0 +1,30 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * Loaded via LD_PRELOAD this will turn dlclose into a no-op. + * + * Allows valgrind to generate useful reports from programs that + * dynamically unload libraries before exit, such as CppUnit's + * DllPlugInTester. + * + */ + +#include <stdio.h> +void* dlclose(void* handle) {} + diff --git a/cpp/src/tests/echo_service.cpp b/cpp/src/tests/echo_service.cpp new file mode 100644 index 0000000000..14224eeee5 --- /dev/null +++ b/cpp/src/tests/echo_service.cpp @@ -0,0 +1,230 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * This class provides an example of using AMQP for a request-response + * style system. 'Requests' are messages sent to a well known + * destination. A 'service' process consumes these message and + * responds by echoing the message back to the sender on a + * sender-specified private queue. + */ + +#include "../QpidError.h" +#include "../client/ClientChannel.h" +#include "../client/Connection.h" +#include "../client/ClientExchange.h" +#include "../client/MessageListener.h" +#include "../client/ClientQueue.h" +#include "../sys/Time.h" +#include <iostream> +#include <sstream> + +using namespace qpid::client; +using namespace qpid::sys; +using std::string; + + +/** + * A message listener implementation representing the 'service', this + * will 'echo' any requests received. + */ +class EchoServer : public MessageListener{ + Channel* const channel; +public: + EchoServer(Channel* channel); + virtual void received(Message& msg); +}; + +/** + * A message listener implementation that merely prints received + * messages to the console. Used to report on 'echo' responses. + */ +class LoggingListener : public MessageListener{ +public: + virtual void received(Message& msg); +}; + +/** + * A utility class that manages the command line options needed to run + * the example confirgurably. + */ +class Args{ + string host; + int port; + bool trace; + bool help; + bool client; +public: + inline Args() : host("localhost"), port(5672), trace(false), help(false), client(false){} + void parse(int argc, char** argv); + void usage(); + + inline const string& getHost() const { return host;} + inline int getPort() const { return port; } + inline bool getTrace() const { return trace; } + inline bool getHelp() const { return help; } + inline bool getClient() const { return client; } +}; + +/** + * The main test path. There are two basic modes: 'client' and + * 'service'. First one or more services are started, then one or more + * clients are started and messages can be sent. + */ +int main(int argc, char** argv){ + const std::string echo_service("echo_service"); + Args args; + args.parse(argc, argv); + if (args.getHelp()) { + args.usage(); + } else if (args.getClient()) { + //we have been started in 'client' mode, i.e. we will send an + //echo requests and print responses received. + try { + //Create connection & open a channel + Connection connection(args.getTrace()); + connection.open(args.getHost(), args.getPort()); + Channel channel; + connection.openChannel(channel); + + //Setup: declare the private 'response' queue and bind it + //to the direct exchange by its name which will be + //generated by the server + Queue response; + channel.declareQueue(response); + qpid::framing::FieldTable emptyArgs; + channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, response, response.getName(), emptyArgs); + + //Consume from the response queue, logging all echoed message to console: + LoggingListener listener; + std::string tag; + channel.consume(response, tag, &listener); + + //Process incoming requests on a new thread + channel.start(); + + //get messages from console and send them: + std::string text; + std::cout << "Enter text to send:" << std::endl; + while (std::getline(std::cin, text)) { + std::cout << "Sending " << text << " to echo server." << std::endl; + Message msg; + msg.getHeaders().setString("RESPONSE_QUEUE", response.getName()); + msg.setData(text); + channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service); + + std::cout << "Enter text to send:" << std::endl; + } + + connection.close(); + } catch(qpid::QpidError error) { + std::cout << error.what() << std::endl; + } + } else { + // we are in 'service' mode, i.e. we will consume messages + // from the request queue and echo each request back to the + // senders own private response queue. + try { + //Create connection & open a channel + Connection connection(args.getTrace()); + connection.open(args.getHost(), args.getPort()); + Channel channel; + connection.openChannel(channel); + + //Setup: declare the 'request' queue and bind it to the direct exchange with a 'well known' name + Queue request("request"); + channel.declareQueue(request); + qpid::framing::FieldTable emptyArgs; + channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, request, echo_service, emptyArgs); + + //Consume from the request queue, echoing back all messages received to the client that sent them + EchoServer server(&channel); + std::string tag = "server_tag"; + channel.consume(request, tag, &server); + + //Process incoming requests on the main thread + channel.run(); + + connection.close(); + } catch(qpid::QpidError error) { + std::cout << error.what() << std::endl; + } + } +} + +EchoServer::EchoServer(Channel* _channel) : channel(_channel){} + +void EchoServer::received(Message& message) +{ + //get name of response queues binding to the default direct exchange: + const std::string name = message.getHeaders().getString("RESPONSE_QUEUE"); + + if (name.empty()) { + std::cout << "Cannot echo " << message.getData() << ", no response queue specified." << std::endl; + } else { + //print message to console: + std::cout << "Echoing " << message.getData() << " back to " << name << std::endl; + + //'echo' the message back: + channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name); + } +} + +void LoggingListener::received(Message& message) +{ + //print message to console: + std::cout << "Received echo: " << message.getData() << std::endl; +} + + +void Args::parse(int argc, char** argv){ + for(int i = 1; i < argc; i++){ + string name(argv[i]); + if("-help" == name){ + help = true; + break; + }else if("-host" == name){ + host = argv[++i]; + }else if("-port" == name){ + port = atoi(argv[++i]); + }else if("-trace" == name){ + trace = true; + }else if("-client" == name){ + client = true; + }else{ + std::cout << "Warning: unrecognised option " << name << std::endl; + } + } +} + +void Args::usage(){ + std::cout << "Options:" << std::endl; + std::cout << " -help" << std::endl; + std::cout << " Prints this usage message" << std::endl; + std::cout << " -host <host>" << std::endl; + std::cout << " Specifies host to connect to (default is localhost)" << std::endl; + std::cout << " -port <port>" << std::endl; + std::cout << " Specifies port to conect to (default is 5762)" << std::endl; + std::cout << " -trace" << std::endl; + std::cout << " Indicates that the frames sent and received should be logged" << std::endl; + std::cout << " -client" << std::endl; + std::cout << " Run as a client (else will run as a server)" << std::endl; +} diff --git a/cpp/src/tests/examples.Makefile b/cpp/src/tests/examples.Makefile new file mode 100644 index 0000000000..45999f7852 --- /dev/null +++ b/cpp/src/tests/examples.Makefile @@ -0,0 +1,66 @@ +# +# XXX: Edit these locations to suit. +# +BOOST_LOCATION := $(HOME)/local/boost-1.33.1 +APR_LOCATION := $(HOME)/local/apr-1.2.7 + +CXXFLAGS := -DNDEBUG -DUSE_APR -MMD -fpic + +# +# Configure Boost. +# +BOOST_CFLAGS := -I$(BOOST_LOCATION)/include/boost-1_33_1 +CXXFLAGS := $(CXXFLAGS) $(BOOST_CFLAGS) + +# +# Configure APR. +# +APR_CFLAGS := -I$(APR_LOCATION)/include/apr-1 +APR_LDFLAGS := $(shell $(APR_LOCATION)/bin/apr-1-config --libs) -L$(APR_LOCATION)/lib -lapr-1 +CXXFLAGS := $(CXXFLAGS) $(APR_CFLAGS) +LDFLAGS := $(LDFLAGS) $(APR_LDFLAGS) + +# +# Configure Qpid cpp client. +# +QPID_CLIENT_LDFLAGS := ../lib/libcommon.so ../lib/libclient.so +includeDir := ../include +QPID_CLIENT_CFLAGS := \ + -I$(includeDir)/gen \ + -I$(includeDir)/client \ + -I$(includeDir)/broker \ + -I$(includeDir)/common \ + -I$(includeDir)/common/sys \ + -I$(includeDir)/common/framing + +CXXFLAGS := $(CXXFLAGS) $(QPID_CLIENT_CFLAGS) +LDFLAGS := $(LDFLAGS) $(QPID_CLIENT_LDFLAGS) + +CXX := g++ + +# +# Add rule to build examples. +# +.SUFFIX: .cpp +%: %.cpp + $(CXX) $(CXXFLAGS) $(LDFLAGS) $< -o $@ + +# +# Define targets. +# + +EXAMPLES := client_test topic_listener topic_publisher echo_service + +cppFiles := $(wildcard *.cpp) +programs = $(foreach cppFile, $(cppFiles), $(subst .cpp, ,$(cppFile))) + +.PHONY: +all: $(programs) + +debug: + @echo cppFiles = $(cppFiles) + @echo programs = $(programs) + +.PHONY: +clean: + -rm $(EXAMPLES) diff --git a/cpp/src/tests/examples.README b/cpp/src/tests/examples.README new file mode 100644 index 0000000000..65f908c249 --- /dev/null +++ b/cpp/src/tests/examples.README @@ -0,0 +1,18 @@ +Building the examples +--------------------- + +You had better edit the Makefile and provide the locations for APR and boost. + +Then just type 'make'. + + +Running the examples +-------------------- + +Before running the examples ensure that you have setup your LD_LIBRARY_PATH. + +Most of the examples take the following connection parameters for your +AMQP broker: + + -host host + -port port diff --git a/cpp/src/tests/kill_broker b/cpp/src/tests/kill_broker new file mode 100755 index 0000000000..b71ca22ffd --- /dev/null +++ b/cpp/src/tests/kill_broker @@ -0,0 +1,3 @@ +#!/bin/sh +PID=qpidd.pid +if [ -f $PID ] ; then kill -9 `cat $PID` ; rm -f $PID ; fi diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests new file mode 100755 index 0000000000..585c1da913 --- /dev/null +++ b/cpp/src/tests/python_tests @@ -0,0 +1,8 @@ +#!/bin/sh +# Run the python tests. +if test -d ../../../python ; then + cd ../../../python && ./run-tests -v -s "0-9" -I cpp_failing_0-9.txt +else + echo Warning: python tests not found. +fi + diff --git a/cpp/src/tests/qpid_test_plugin.h b/cpp/src/tests/qpid_test_plugin.h new file mode 100644 index 0000000000..b2f4a8ffed --- /dev/null +++ b/cpp/src/tests/qpid_test_plugin.h @@ -0,0 +1,43 @@ +#ifndef _qpid_test_plugin_ +#define _qpid_test_plugin_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * Convenience to include cppunit headers needed by qpid test plugins and + * workaround for warning from superfluous main() declaration + * in cppunit/TestPlugIn.h + */ + +#include <cppunit/TestCase.h> +#include <cppunit/TextTestRunner.h> +#include <cppunit/extensions/HelperMacros.h> +#include <cppunit/plugin/TestPlugIn.h> + +// Redefine CPPUNIT_PLUGIN_IMPLEMENT_MAIN to a dummy typedef to avoid warnings. +// +#if defined(CPPUNIT_HAVE_UNIX_DLL_LOADER) || defined(CPPUNIT_HAVE_UNIX_SHL_LOADER) +#undef CPPUNIT_PLUGIN_IMPLEMENT_MAIN +#define CPPUNIT_PLUGIN_IMPLEMENT_MAIN() typedef char __CppUnitPlugInImplementMainDummyTypeDef +#endif + +#endif /*!_qpid_test_plugin_*/ diff --git a/cpp/src/tests/quick_topictest b/cpp/src/tests/quick_topictest new file mode 100755 index 0000000000..9df5b5c84c --- /dev/null +++ b/cpp/src/tests/quick_topictest @@ -0,0 +1,7 @@ +#!/bin/sh +# Quick and quiet topic test for make check. +./topictest -s2 -m2 -b1 > topictest.log 2>&1 || { + echo See topictest.log. + exit 1 +} +rm topictest.log diff --git a/cpp/src/tests/run-python-tests b/cpp/src/tests/run-python-tests new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/cpp/src/tests/run-python-tests diff --git a/cpp/src/tests/run-unit-tests b/cpp/src/tests/run-unit-tests new file mode 100755 index 0000000000..f066a38205 --- /dev/null +++ b/cpp/src/tests/run-unit-tests @@ -0,0 +1,37 @@ +#!/bin/sh +# +# Library names (without path or .so) and CppUnit test paths can be +# specified on the command line or in env var UNIT_TESTS. For example: +# +# Selected test classes: +# ./run-unit-tests ValueTest ClientChannelTest +# +# Individual test method +# ./run-unit-tests ValueTest :ValueTest::testStringValueEquals +# +# Build and run selected tests: +# make check TESTS=run-unit-tests UNIT_TESTS=ClientChannelTest +# + +# Default VALGRIND from the path and $srcdir to . but +# don't override values set by make. +test -z "$VALGRIND" -a -z "$MAKEFLAGS" && VALGRIND=`which valgrind` 2>/dev/null +test -z "$srcdir" && srcdir=. + +rm -f valgrind.out +vg_log=--log-file-exactly=valgrind.out +source $srcdir/setup +for u in $* $UNIT_TESTS ; do + case $u in + :*) TEST_ARGS="$TEST_ARGS $u" ;; # A test path. + *) TEST_ARGS="$TEST_ARGS $pwd/.libs/$u.so" ;; # A test library. + esac +done +# If none specified, run all tests in .libs +test -z "$TEST_ARGS" && TEST_ARGS="$pwd/.libs/*Test.so" +fail=0 + +$vg DllPlugInTester -c -b $TEST_ARGS || fail=1 +vg_check valgrind.out || fail=1 + +exit $fail diff --git a/cpp/src/tests/setup b/cpp/src/tests/setup new file mode 100644 index 0000000000..aaa3afd9b8 --- /dev/null +++ b/cpp/src/tests/setup @@ -0,0 +1,81 @@ +# -*- sh -*- + +test "$VERBOSE" = yes && set -x + +pwd=`pwd` +test -z "$abs_srcdir" && abs_srcdir=$pwd + +t0=`echo "$0"|sed 's,.*/,,'`.tmp; tmp=$t0/$$ +pid=0 +test -z "$TEST_DEBUG" && +trap 's=$?;test $pid = 0||kill -2 $pid;cd "$pwd" && rm -rf $t0 && exit $s' 0 +test -z "$TEST_DEBUG" && trap '(exit $?); exit $?' 1 2 13 15 + +framework_failure=0 +mkdir -p $tmp || framework_failure=1 +cd $tmp || framework_failure=1 + +gen_supp=--gen-suppressions=all +# This option makes valgrind significantly slower. +full_leak_check=--leak-check=full +demangle=--demangle=yes + +vg_options=" + --suppressions=$abs_srcdir/.vg-supp + --num-callers=25 + --track-fds=yes + $demangle + $full_leak_check + $gen_supp + $vg_log + " +# configure tests for the existence of valgrind. +# If it's not available, then make $vg and vg_check no-ops. +if test x$VALGRIND = x; then + vg= +else + vg="libtool --mode=execute $VALGRIND `echo $vg_options` --" + # Suppress dlclose or valgrind traces wont have test library symbols. + vg="env LD_PRELOAD=$pwd/.libs/libdlclose_noop.so $vg" +fi + + +vg_leak_check() +{ + local file=$1 + local fail + # If we detect a leak, dump all output to stderr. + grep -E '^==[0-9]+== +definitely lost: [^0]' $file \ + && { fail=1; cat $file 1>&2; + echo "found memory leaks (see log file, $file); see above" 1>&2; } + test "$fail" = '' +} + + +# Ensure 1) that there is an ERROR SUMMARY line, and +# 2) that the number of errors is 0. +# An offending line looks like this: +# ==29302== ERROR SUMMARY: 4 errors from 2 contexts (suppressed: 16 from 5) +vg_error_check() +{ + local file=$1 + local fail + # If we detect a leak, dump all output to stderr. + grep -E '^==[0-9]+== ERROR SUMMARY:' $file > /dev/null \ + || { fail=1; cat $file 1>&2; + echo "no valgrind ERROR SUMMARY line in $file" 1>&2; } + if test "$fail" = ''; then + grep -E '^==[0-9]+== ERROR SUMMARY: [^0] ' $file \ + && { fail=1; cat $file 1>&2; + echo "valgrind reported errors in $file; see above" 1>&2; } + fi + test "$fail" = '' +} + +vg_check() +{ + local file=$1 + if test x$VALGRIND != x; then + vg_error_check $file && vg_leak_check $file + fi +} diff --git a/cpp/src/tests/start_broker b/cpp/src/tests/start_broker new file mode 100755 index 0000000000..64d26883be --- /dev/null +++ b/cpp/src/tests/start_broker @@ -0,0 +1,14 @@ +#!/bin/sh +set -e + +LOG=`pwd`/qpidd.log +PID=`pwd`/qpidd.pid + +rm -rf $LOG $PID + +# Start the daemon, recording its PID. +../qpidd > $LOG 2>&1 & echo $! > $PID + +# FIXME aconway 2007-01-18: qpidd should not return till it is accepting +# connections, remove arbitrary sleep. +sleep 5 diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp new file mode 100644 index 0000000000..0b7d6c4e86 --- /dev/null +++ b/cpp/src/tests/topic_listener.cpp @@ -0,0 +1,217 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * This file provides one half of a test and example of a pub-sub + * style of interaction. See topic_publisher.cpp for the other half, + * in which the logic for publishing is defined. + * + * This file contains the listener logic. A listener will subscribe to + * a logical 'topic'. It will count the number of messages it receives + * and the time elapsed between the first one and the last one. It + * recognises two types of 'special' message that tell it to (a) send + * a report containing this information, (b) shutdown (i.e. stop + * listening). + */ + +#include "../QpidError.h" +#include "../client/ClientChannel.h" +#include "../client/Connection.h" +#include "../client/ClientExchange.h" +#include "../client/MessageListener.h" +#include "../client/ClientQueue.h" +#include "../sys/Time.h" +#include <iostream> +#include <sstream> + +using namespace qpid::client; +using namespace qpid::sys; +using namespace std; + +/** + * A message listener implementation in which the runtime logic is + * defined. + */ +class Listener : public MessageListener{ + Channel* const channel; + const string responseQueue; + const bool transactional; + bool init; + int count; + Time start; + + void shutdown(); + void report(); +public: + Listener(Channel* channel, const string& reponseQueue, bool tx); + virtual void received(Message& msg); +}; + +/** + * A utility class for managing the options passed in. + */ +class Args{ + string host; + int port; + AckMode ackMode; + bool transactional; + int prefetch; + bool trace; + bool help; +public: + inline Args() : host("localhost"), port(5672), ackMode(NO_ACK), transactional(false), prefetch(1000), trace(false), help(false){} + void parse(int argc, char** argv); + void usage(); + + const string& getHost() const { return host;} + int getPort() const { return port; } + AckMode getAckMode(){ return ackMode; } + bool getTransactional() const { return transactional; } + int getPrefetch(){ return prefetch; } + bool getTrace() const { return trace; } + bool getHelp() const { return help; } +}; + +/** + * The main routine creates a Listener instance and sets it up to + * consume from a private queue bound to the exchange with the + * appropriate topic name. + */ +int main(int argc, char** argv){ + Args args; + args.parse(argc, argv); + if(args.getHelp()){ + args.usage(); + }else{ + try{ + cout << "topic_listener: Started." << endl; + Connection connection(args.getTrace()); + connection.open(args.getHost(), args.getPort(), "guest", "guest", "/test"); + Channel channel(args.getTransactional(), args.getPrefetch()); + connection.openChannel(channel); + + //declare exchange, queue and bind them: + Queue response("response"); + channel.declareQueue(response); + + Queue control; + channel.declareQueue(control); + qpid::framing::FieldTable bindArgs; + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "topic_control", bindArgs); + //set up listener + Listener listener(&channel, response.getName(), args.getTransactional()); + string tag; + channel.consume(control, tag, &listener, args.getAckMode()); + cout << "topic_listener: Consuming." << endl; + channel.run(); + connection.close(); + cout << "topic_listener: normal exit" << endl; + return 0; + }catch(qpid::QpidError error){ + cout << "topic_listener: " << error.what() << endl; + } + } + return 1; +} + +Listener::Listener(Channel* _channel, const string& _responseq, bool tx) : + channel(_channel), responseQueue(_responseq), transactional(tx), init(false), count(0){} + +void Listener::received(Message& message){ + if(!init){ + start = now(); + count = 0; + init = true; + } + string type(message.getHeaders().getString("TYPE")); + + if(type == "TERMINATION_REQUEST"){ + shutdown(); + }else if(type == "REPORT_REQUEST"){ + //send a report: + report(); + init = false; + }else if (++count % 100 == 0){ + cout <<"Received " << count << " messages." << endl; + } +} + +void Listener::shutdown(){ + channel->close(); +} + +void Listener::report(){ + Time finish = now(); + Time time = finish - start; + stringstream reportstr; + reportstr << "Received " << count << " messages in " + << time/TIME_MSEC << " ms."; + Message msg(reportstr.str()); + msg.getHeaders().setString("TYPE", "REPORT"); + channel->publish(msg, string(), responseQueue); + if(transactional){ + channel->commit(); + } +} + + +void Args::parse(int argc, char** argv){ + for(int i = 1; i < argc; i++){ + string name(argv[i]); + if("-help" == name){ + help = true; + break; + }else if("-host" == name){ + host = argv[++i]; + }else if("-port" == name){ + port = atoi(argv[++i]); + }else if("-ack_mode" == name){ + ackMode = AckMode(atoi(argv[++i])); + }else if("-transactional" == name){ + transactional = true; + }else if("-prefetch" == name){ + prefetch = atoi(argv[++i]); + }else if("-trace" == name){ + trace = true; + }else{ + cout << "Warning: unrecognised option " << name << endl; + } + } +} + +void Args::usage(){ + cout << "Options:" << endl; + cout << " -help" << endl; + cout << " Prints this usage message" << endl; + cout << " -host <host>" << endl; + cout << " Specifies host to connect to (default is localhost)" << endl; + cout << " -port <port>" << endl; + cout << " Specifies port to conect to (default is 5762)" << endl; + cout << " -ack_mode <mode>" << endl; + cout << " Sets the acknowledgement mode" << endl; + cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << endl; + cout << " -transactional" << endl; + cout << " Indicates the client should use transactions" << endl; + cout << " -prefetch <count>" << endl; + cout << " Specifies the prefetch count (default is 1000)" << endl; + cout << " -trace" << endl; + cout << " Indicates that the frames sent and received should be logged" << endl; +} diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp new file mode 100644 index 0000000000..e4b39a2966 --- /dev/null +++ b/cpp/src/tests/topic_publisher.cpp @@ -0,0 +1,287 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * This file provides one half of a test and example of a pub-sub + * style of interaction. See topic_listener.cpp for the other half, in + * which the logic for subscribers is defined. + * + * This file contains the publisher logic. The publisher will send a + * number of messages to the exchange with the appropriate routing key + * for the logical 'topic'. Once it has done this it will then send a + * request that each subscriber report back with the number of message + * it has received and the time that elapsed between receiving the + * first one and receiving the report request. Once the expected + * number of reports are received, it sends out a request that each + * subscriber shutdown. + */ + +#include "../QpidError.h" +#include "../client/ClientChannel.h" +#include "../client/Connection.h" +#include "../client/ClientExchange.h" +#include "../client/MessageListener.h" +#include "../client/ClientQueue.h" +#include "../sys/Monitor.h" +#include <unistd.h> +#include "../sys/Time.h" +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; +using namespace qpid::sys; +using std::string; + +/** + * The publishing logic is defined in this class. It implements + * message listener and can therfore be used to receive messages sent + * back by the subscribers. + */ +class Publisher : public MessageListener{ + Channel* const channel; + const std::string controlTopic; + const bool transactional; + Monitor monitor; + int count; + + void waitForCompletion(int msgs); + string generateData(int size); + +public: + Publisher(Channel* channel, const std::string& controlTopic, bool tx); + virtual void received(Message& msg); + int64_t publish(int msgs, int listeners, int size); + void terminate(); +}; + +/** + * A utility class for managing the options passed in to the test + */ +class Args{ + string host; + int port; + int messages; + int subscribers; + AckMode ackMode; + bool transactional; + int prefetch; + int batches; + int delay; + int size; + bool trace; + bool help; +public: + inline Args() : host("localhost"), port(5672), messages(1000), subscribers(1), + ackMode(NO_ACK), transactional(false), prefetch(1000), batches(1), + delay(0), size(256), trace(false), help(false){} + + void parse(int argc, char** argv); + void usage(); + + const string& getHost() const { return host;} + int getPort() const { return port; } + int getMessages() const { return messages; } + int getSubscribers() const { return subscribers; } + AckMode getAckMode(){ return ackMode; } + bool getTransactional() const { return transactional; } + int getPrefetch(){ return prefetch; } + int getBatches(){ return batches; } + int getDelay(){ return delay; } + int getSize(){ return size; } + bool getTrace() const { return trace; } + bool getHelp() const { return help; } +}; + +int main(int argc, char** argv) { + Args args; + args.parse(argc, argv); + if(args.getHelp()){ + args.usage(); + } else { + try{ + Connection connection(args.getTrace()); + connection.open(args.getHost(), args.getPort(), "guest", "guest", "/test"); + Channel channel(args.getTransactional(), args.getPrefetch()); + connection.openChannel(channel); + + //declare queue (relying on default binding): + Queue response("response"); + channel.declareQueue(response); + + //set up listener + Publisher publisher(&channel, "topic_control", args.getTransactional()); + std::string tag("mytag"); + channel.consume(response, tag, &publisher, args.getAckMode()); + channel.start(); + + int batchSize(args.getBatches()); + int64_t max(0); + int64_t min(0); + int64_t sum(0); + for(int i = 0; i < batchSize; i++){ + if(i > 0 && args.getDelay()) sleep(args.getDelay()); + int64_t msecs = + publisher.publish(args.getMessages(), + args.getSubscribers(), + args.getSize()) / TIME_MSEC; + if(!max || msecs > max) max = msecs; + if(!min || msecs < min) min = msecs; + sum += msecs; + std::cout << "Completed " << (i+1) << " of " << batchSize + << " in " << msecs << "ms" << std::endl; + } + publisher.terminate(); + int64_t avg = sum / batchSize; + if(batchSize > 1){ + std::cout << batchSize << " batches completed. avg=" << avg << + ", max=" << max << ", min=" << min << std::endl; + } + channel.close(); + connection.close(); + return 0; + }catch(qpid::QpidError error) { + std::cout << error.what() << std::endl; + } + } + return 1; +} + +Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) : + channel(_channel), controlTopic(_controlTopic), transactional(tx){} + +void Publisher::received(Message& ){ + //count responses and when all are received end the current batch + Monitor::ScopedLock l(monitor); + if(--count == 0){ + monitor.notify(); + } +} + +void Publisher::waitForCompletion(int msgs){ + count = msgs; + monitor.wait(); +} + +int64_t Publisher::publish(int msgs, int listeners, int size){ + Message msg; + msg.setData(generateData(size)); + Time start = now(); + { + Monitor::ScopedLock l(monitor); + for(int i = 0; i < msgs; i++){ + channel->publish( + msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); + } + //send report request + Message reportRequest; + reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); + channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); + if(transactional){ + channel->commit(); + } + + waitForCompletion(listeners); + } + + Time finish = now(); + return finish - start; +} + +string Publisher::generateData(int size){ + string data; + for(int i = 0; i < size; i++){ + data += ('A' + (i / 26)); + } + return data; +} + +void Publisher::terminate(){ + //send termination request + Message terminationRequest; + terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); + channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); + if(transactional){ + channel->commit(); + } +} + +void Args::parse(int argc, char** argv){ + for(int i = 1; i < argc; i++){ + string name(argv[i]); + if("-help" == name){ + help = true; + break; + }else if("-host" == name){ + host = argv[++i]; + }else if("-port" == name){ + port = atoi(argv[++i]); + }else if("-messages" == name){ + messages = atoi(argv[++i]); + }else if("-subscribers" == name){ + subscribers = atoi(argv[++i]); + }else if("-ack_mode" == name){ + ackMode = AckMode(atoi(argv[++i])); + }else if("-transactional" == name){ + transactional = true; + }else if("-prefetch" == name){ + prefetch = atoi(argv[++i]); + }else if("-batches" == name){ + batches = atoi(argv[++i]); + }else if("-delay" == name){ + delay = atoi(argv[++i]); + }else if("-size" == name){ + size = atoi(argv[++i]); + }else if("-trace" == name){ + trace = true; + }else{ + std::cout << "Warning: unrecognised option " << name << std::endl; + } + } +} + +void Args::usage(){ + std::cout << "Options:" << std::endl; + std::cout << " -help" << std::endl; + std::cout << " Prints this usage message" << std::endl; + std::cout << " -host <host>" << std::endl; + std::cout << " Specifies host to connect to (default is localhost)" << std::endl; + std::cout << " -port <port>" << std::endl; + std::cout << " Specifies port to conect to (default is 5762)" << std::endl; + std::cout << " -messages <count>" << std::endl; + std::cout << " Specifies how many messages to send" << std::endl; + std::cout << " -subscribers <count>" << std::endl; + std::cout << " Specifies how many subscribers to expect reports from" << std::endl; + std::cout << " -ack_mode <mode>" << std::endl; + std::cout << " Sets the acknowledgement mode" << std::endl; + std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl; + std::cout << " -transactional" << std::endl; + std::cout << " Indicates the client should use transactions" << std::endl; + std::cout << " -prefetch <count>" << std::endl; + std::cout << " Specifies the prefetch count (default is 1000)" << std::endl; + std::cout << " -batches <count>" << std::endl; + std::cout << " Specifies how many batches to run" << std::endl; + std::cout << " -delay <seconds>" << std::endl; + std::cout << " Causes a delay between each batch" << std::endl; + std::cout << " -size <bytes>" << std::endl; + std::cout << " Sets the size of the published messages (default is 256 bytes)" << std::endl; + std::cout << " -trace" << std::endl; + std::cout << " Indicates that the frames sent and received should be logged" << std::endl; +} diff --git a/cpp/src/tests/topictest b/cpp/src/tests/topictest new file mode 100755 index 0000000000..92e40b2c37 --- /dev/null +++ b/cpp/src/tests/topictest @@ -0,0 +1,39 @@ +#!/bin/bash +# Run the C++ topic test + +# Clean up old log files +rm -f subscriber_*.log + +# Defaults values +SUBSCRIBERS=10 +MESSAGES=2000 +BATCHES=10 + +while getopts "s:m:b:" opt ; do + case $opt in + s) SUBSCRIBERS=$OPTARG ;; + m) MESSAGES=$OPTARG ;; + b) BATCHES=$OPTARG ;; + ?) + echo "Usage: %0 [-s <subscribers>] [-m <messages.] [-b <batches>]" + exit 1 + ;; + esac +done + +subscribe() { + echo Start subscriber $1 + LOG="subscriber_$1.log" + ./topic_listener > $LOG 2>&1 && rm -f $LOG +} + +publish() { + ./topic_publisher -messages $MESSAGES -batches $BATCHES -subscribers $SUBSCRIBERS +} + +for ((i=$SUBSCRIBERS ; i--; )); do + subscribe $i & +done +# FIXME aconway 2007-03-27: Hack around startup race. Fix topic test. +sleep 1 +publish 2>&1 || exit 1 |