diff options
author | Gordon Sim <gsim@apache.org> | 2007-06-06 16:39:03 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-06-06 16:39:03 +0000 |
commit | 70a3cdf33b3e38ee26ee2840a55f83ebd26589b4 (patch) | |
tree | 07c3dab5cb7d97158737c36efa1caa8d9254c266 | |
parent | 480e99cfc6071f15bc7135895cf2b60d0dd9c981 (diff) | |
download | qpid-python-70a3cdf33b3e38ee26ee2840a55f83ebd26589b4.tar.gz |
Merged in channel.flow implementation and interoperability tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@544879 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 5 | ||||
-rw-r--r-- | cpp/src/tests/BasicP2PTest.cpp | 66 | ||||
-rw-r--r-- | cpp/src/tests/BasicP2PTest.h | 46 | ||||
-rw-r--r-- | cpp/src/tests/BasicPubSubTest.cpp | 121 | ||||
-rw-r--r-- | cpp/src/tests/BasicPubSubTest.h | 51 | ||||
-rw-r--r-- | cpp/src/tests/BrokerChannelTest.cpp | 37 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 14 | ||||
-rw-r--r-- | cpp/src/tests/SimpleTestCaseBase.cpp | 87 | ||||
-rw-r--r-- | cpp/src/tests/SimpleTestCaseBase.h | 88 | ||||
-rw-r--r-- | cpp/src/tests/TestCase.h | 64 | ||||
-rw-r--r-- | cpp/src/tests/TestOptions.h | 69 | ||||
-rw-r--r-- | cpp/src/tests/interop_runner.cpp | 252 | ||||
-rw-r--r-- | python/tests_0-9/broker.py | 17 |
16 files changed, 932 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 3c742b8d2d..592995f10f 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -100,7 +100,11 @@ void BrokerAdapter::ChannelHandlerImpl::open( std::string()/* ID */, context.getRequestId()); } -void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} +void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext& context, bool active){ + channel.flow(active); + client.flowOk(active, context.getRequestId()); +} + void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} void BrokerAdapter::ChannelHandlerImpl::close( diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 0c06350c02..e256566d35 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -66,6 +66,7 @@ Channel::Channel( store(_store), messageBuilder(this, _store, _stagingThreshold), opened(id == 0),//channel 0 is automatically open, other must be explicitly opened + flowActive(true), adapter(new BrokerAdapter(*this, con, con.broker)) { outstanding.reset(); @@ -221,7 +222,7 @@ Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local - if(ackExpected && !parent->checkPrefetch(msg)){ + if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){ blocked = true; }else{ blocked = false; @@ -396,3 +397,14 @@ void Channel::handleMethodInContext( connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } + +void Channel::flow(bool active) +{ + Mutex::ScopedLock locker(deliveryLock); + bool requestDelivery(!flowActive && active); + flowActive = active; + if (requestDelivery) { + //there may be messages that can be now be delivered + std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); + } +} diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index a2f17f85f4..1fbfc2063e 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -97,6 +97,7 @@ class Channel : public framing::ChannelAdapter, MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message bool opened; + bool flowActive; boost::scoped_ptr<BrokerAdapter> adapter; // completion handler for MessageBuilder @@ -147,6 +148,7 @@ class Channel : public framing::ChannelAdapter, void ack(uint64_t deliveryTag, bool multiple); void ack(uint64_t deliveryTag, uint64_t endTag); void recover(bool requeue); + void flow(bool active); void deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag); void handlePublish(Message* msg); void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>); diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index ee1a913a96..3521e63444 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -78,10 +78,7 @@ bool Queue::dispatch(Message::shared_ptr& msg){ if(consumers.empty()){ return false; }else if(exclusive){ - if(!exclusive->deliver(msg)){ - QPID_LOG(warning, "Dropping undeliverable message from queue with exclusive consumer."); - } - return true; + return exclusive->deliver(msg); }else{ //deliver to next consumer next = next % consumers.size(); diff --git a/cpp/src/tests/BasicP2PTest.cpp b/cpp/src/tests/BasicP2PTest.cpp new file mode 100644 index 0000000000..b202f88ca6 --- /dev/null +++ b/cpp/src/tests/BasicP2PTest.cpp @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BasicP2PTest.h" + +using namespace qpid; +using namespace qpid::client; + +class BasicP2PTest::Receiver : public Worker, public MessageListener +{ + const std::string queue; + std::string tag; +public: + Receiver(TestOptions& options, const std::string& _queue, const int _messages) + : Worker(options, _messages), queue(_queue){} + void init() + { + Queue q(queue, true); + channel.declareQueue(q); + framing::FieldTable args; + channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, q, queue, args); + channel.consume(q, tag, this); + channel.start(); + } + + void start() + { + } + + void received(Message&) + { + count++; + } +}; + +void BasicP2PTest::assign(const std::string& role, framing::FieldTable& params, TestOptions& options) +{ + std::string queue = params.getString("P2P_QUEUE_AND_KEY_NAME"); + int messages = params.getInt("P2P_NUM_MESSAGES"); + if (role == "SENDER") { + worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_DIRECT_EXCHANGE, queue, messages)); + } else if(role == "RECEIVER"){ + worker = std::auto_ptr<Worker>(new Receiver(options, queue, messages)); + } else { + throw Exception("unrecognised role"); + } + worker->init(); +} diff --git a/cpp/src/tests/BasicP2PTest.h b/cpp/src/tests/BasicP2PTest.h new file mode 100644 index 0000000000..b80fff1171 --- /dev/null +++ b/cpp/src/tests/BasicP2PTest.h @@ -0,0 +1,46 @@ +#ifndef _BasicP2PTest_ +#define _BasicP2PTest_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <memory> +#include <sstream> + +#include "qpid/Exception.h" +#include "qpid/client/ClientChannel.h" +#include "qpid/client/ClientMessage.h" +#include "qpid/client/Connection.h" +#include "qpid/client/MessageListener.h" +#include "SimpleTestCaseBase.h" + + +namespace qpid { + +class BasicP2PTest : public SimpleTestCaseBase +{ + class Receiver; +public: + void assign(const std::string& role, framing::FieldTable& params, TestOptions& options); +}; + +} + +#endif diff --git a/cpp/src/tests/BasicPubSubTest.cpp b/cpp/src/tests/BasicPubSubTest.cpp new file mode 100644 index 0000000000..623194d331 --- /dev/null +++ b/cpp/src/tests/BasicPubSubTest.cpp @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BasicPubSubTest.h" + +using namespace qpid; + +class BasicPubSubTest::Receiver : public Worker, public MessageListener +{ + const Exchange& exchange; + const std::string queue; + const std::string key; + std::string tag; +public: + Receiver(TestOptions& options, const Exchange& _exchange, const std::string& _queue, const std::string& _key, const int _messages) + : Worker(options, _messages), exchange(_exchange), queue(_queue), key(_key){} + + void init() + { + Queue q(queue, true); + channel.declareQueue(q); + framing::FieldTable args; + channel.bind(exchange, q, key, args); + channel.consume(q, tag, this); + channel.start(); + } + + void start(){ + } + + void received(Message&) + { + count++; + } +}; + +class BasicPubSubTest::MultiReceiver : public Worker, public MessageListener +{ + typedef boost::ptr_vector<Receiver> ReceiverList; + ReceiverList receivers; + +public: + MultiReceiver(TestOptions& options, const Exchange& exchange, const std::string& key, const int _messages, int receiverCount) + : Worker(options, _messages) + { + for (int i = 0; i != receiverCount; i++) { + std::string queue = (boost::format("%1%_%2%") % options.clientid % i).str(); + receivers.push_back(new Receiver(options, exchange, queue, key, _messages)); + } + } + + void init() + { + for (ReceiverList::size_type i = 0; i != receivers.size(); i++) { + receivers[i].init(); + } + } + + void start() + { + for (ReceiverList::size_type i = 0; i != receivers.size(); i++) { + receivers[i].start(); + } + } + + void received(Message& msg) + { + for (ReceiverList::size_type i = 0; i != receivers.size(); i++) { + receivers[i].received(msg); + } + } + + virtual int getCount() + { + count = 0; + for (ReceiverList::size_type i = 0; i != receivers.size(); i++) { + count += receivers[i].getCount(); + } + return count; + } + virtual void stop() + { + for (ReceiverList::size_type i = 0; i != receivers.size(); i++) { + receivers[i].stop(); + } + } +}; + +void BasicPubSubTest::assign(const std::string& role, framing::FieldTable& params, TestOptions& options) +{ + std::string key = params.getString("PUBSUB_KEY"); + int messages = params.getInt("PUBSUB_NUM_MESSAGES"); + int receivers = params.getInt("PUBSUB_NUM_RECEIVERS"); + if (role == "SENDER") { + worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_TOPIC_EXCHANGE, key, messages)); + } else if(role == "RECEIVER"){ + worker = std::auto_ptr<Worker>(new MultiReceiver(options, Exchange::STANDARD_TOPIC_EXCHANGE, key, messages, receivers)); + } else { + throw Exception("unrecognised role"); + } + worker->init(); +} + diff --git a/cpp/src/tests/BasicPubSubTest.h b/cpp/src/tests/BasicPubSubTest.h new file mode 100644 index 0000000000..b7ccba1a81 --- /dev/null +++ b/cpp/src/tests/BasicPubSubTest.h @@ -0,0 +1,51 @@ +#ifndef _BasicPubSubTest_ +#define _BasicPubSubTest_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <memory> +#include <sstream> + +#include "qpid/Exception.h" +#include "qpid/client/ClientChannel.h" +#include "qpid/client/ClientMessage.h" +#include "qpid/client/Connection.h" +#include "qpid/client/MessageListener.h" +#include "SimpleTestCaseBase.h" +#include <boost/ptr_container/ptr_vector.hpp> +#include <boost/format.hpp> + + +namespace qpid { + +using namespace qpid::client; + +class BasicPubSubTest : public SimpleTestCaseBase +{ + class Receiver; + class MultiReceiver; +public: + void assign(const std::string& role, framing::FieldTable& params, TestOptions& options); +}; + +} + +#endif diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 3b2119be13..19841dc18d 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -57,6 +57,7 @@ class BrokerChannelTest : public CppUnit::TestCase CPPUNIT_TEST(testDeliveryAndRecovery); CPPUNIT_TEST(testStaging); CPPUNIT_TEST(testQueuePolicy); + CPPUNIT_TEST(testFlow); CPPUNIT_TEST_SUITE_END(); Broker::shared_ptr broker; @@ -333,6 +334,42 @@ class BrokerChannelTest : public CppUnit::TestCase store.check(); } + void testFlow(){ + Channel channel(connection, 7, 10000); + channel.open(); + //there will always be a connection-start frame + CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); + CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>( + handler.frames[0].getBody().get())); + + const string data("abcdefghijklmn"); + + Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); + addContent(msg, data); + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner(0); + string tag("no_ack"); + channel.consume(tag, queue, false, false, owner); + channel.flow(false); + queue->deliver(msg); + //ensure no more frames have been delivered + CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount()); + channel.flow(true); + CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1].getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2].getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[3].getChannel()); + BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[1].getBody())); + AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[2].getBody())); + AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3].getBody())); + CPPUNIT_ASSERT(deliver); + CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(data, contentBody->getData()); + } + Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) { BasicMessage* msg = new BasicMessage( diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 699a2f073c..c351408988 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -63,7 +63,7 @@ testprogs = \ topic_listener \ topic_publisher -check_PROGRAMS = $(UNIT_TESTS) $(testprogs) +check_PROGRAMS = $(UNIT_TESTS) $(testprogs) interop_runner # FIXME aconway 2007-05-30: TESTS_ENVIRONMENT should have ./run_test # as below to run valgrind on all test programs. @@ -137,3 +137,15 @@ check-unit: CLEANFILES=valgrind.out qpidd.log .valgrindrc .valgrind.supp MAINTAINERCLEANFILES=gen.mk + +interop_runner_SOURCES = \ + interop_runner.cpp \ + SimpleTestCaseBase.cpp \ + BasicP2PTest.cpp \ + BasicPubSubTest.cpp \ + SimpleTestCaseBase.h \ + BasicP2PTest.h \ + BasicPubSubTest.h \ + TestCase.h \ + TestOptions.h +interop_runner_LDADD = $(lib_client) $(lib_common) $(extra_libs) diff --git a/cpp/src/tests/SimpleTestCaseBase.cpp b/cpp/src/tests/SimpleTestCaseBase.cpp new file mode 100644 index 0000000000..5f37fd0eb3 --- /dev/null +++ b/cpp/src/tests/SimpleTestCaseBase.cpp @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SimpleTestCaseBase.h" + +using namespace qpid; + +void SimpleTestCaseBase::start() +{ + if (worker.get()) { + worker->start(); + } +} + +void SimpleTestCaseBase::stop() +{ + if (worker.get()) { + worker->stop(); + } +} + +void SimpleTestCaseBase::report(client::Message& report) +{ + if (worker.get()) { + report.getHeaders().setInt("MESSAGE_COUNT", worker->getCount()); + //add number of messages sent or received + std::stringstream reportstr; + reportstr << worker->getCount(); + report.setData(reportstr.str()); + } +} + +SimpleTestCaseBase::Sender::Sender(TestOptions& options, + const Exchange& _exchange, + const std::string& _key, + const int _messages) + : Worker(options, _messages), exchange(_exchange), key(_key) {} + +void SimpleTestCaseBase::Sender::init() +{ + channel.start(); +} + +void SimpleTestCaseBase::Sender::start(){ + Message msg; + while (count < messages) { + channel.publish(msg, exchange, key); + count++; + } + stop(); +} + +SimpleTestCaseBase::Worker::Worker(TestOptions& options, const int _messages) : + connection(options.trace), messages(_messages), count(0) +{ + connection.open(options.broker, options.port); + connection.openChannel(channel); +} + +void SimpleTestCaseBase::Worker::stop() +{ + channel.close(); + connection.close(); +} + +int SimpleTestCaseBase::Worker::getCount() +{ + return count; +} + diff --git a/cpp/src/tests/SimpleTestCaseBase.h b/cpp/src/tests/SimpleTestCaseBase.h new file mode 100644 index 0000000000..ee1742a7f7 --- /dev/null +++ b/cpp/src/tests/SimpleTestCaseBase.h @@ -0,0 +1,88 @@ +#ifndef _SimpleTestCaseBase_ +#define _SimpleTestCaseBase_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <memory> +#include <sstream> + +#include "qpid/Exception.h" +#include "qpid/client/ClientChannel.h" +#include "qpid/client/ClientMessage.h" +#include "qpid/client/Connection.h" +#include "qpid/client/MessageListener.h" +#include "TestCase.h" + + +namespace qpid { + +using namespace qpid::client; + +class SimpleTestCaseBase : public TestCase +{ +protected: + class Worker + { + protected: + client::Connection connection; + client::Channel channel; + const int messages; + int count; + + public: + + Worker(TestOptions& options, const int messages); + virtual ~Worker(){} + + virtual void stop(); + virtual int getCount(); + virtual void init() = 0; + virtual void start() = 0; + }; + + class Sender : public Worker + { + const Exchange& exchange; + const std::string key; + public: + Sender(TestOptions& options, + const Exchange& exchange, + const std::string& key, + const int messages); + void init(); + void start(); + }; + + std::auto_ptr<Worker> worker; + +public: + virtual void assign(const std::string& role, framing::FieldTable& params, TestOptions& options) = 0; + + virtual ~SimpleTestCaseBase() {} + + void start(); + void stop(); + void report(client::Message& report); +}; + +} + +#endif diff --git a/cpp/src/tests/TestCase.h b/cpp/src/tests/TestCase.h new file mode 100644 index 0000000000..71e1d1118c --- /dev/null +++ b/cpp/src/tests/TestCase.h @@ -0,0 +1,64 @@ +#ifndef _TestCase_ +#define _TestCase_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/ClientMessage.h" +#include "TestOptions.h" + + +namespace qpid { + +/** + * Interface to be implemented by test cases for use with the test + * runner. + */ +class TestCase +{ +public: + /** + * Directs the test case to act in a particular role. Some roles + * may be 'activated' at this stage others may require an explicit + * start request. + */ + virtual void assign(const std::string& role, framing::FieldTable& params, TestOptions& options) = 0; + /** + * Each test will be started on its own thread, which should block + * until the test completes (this may or may not require an + * explicit stop() request). + */ + virtual void start() = 0; + /** + * Requests that the test be stopped if still running. + */ + virtual void stop() = 0; + /** + * Allows the test to fill in details on the final report + * message. Will be called only after start has returned. + */ + virtual void report(client::Message& report) = 0; + + virtual ~TestCase() {} +}; + +} + +#endif diff --git a/cpp/src/tests/TestOptions.h b/cpp/src/tests/TestOptions.h new file mode 100644 index 0000000000..45575ba450 --- /dev/null +++ b/cpp/src/tests/TestOptions.h @@ -0,0 +1,69 @@ +#ifndef _TestOptions_ +#define _TestOptions_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/CommonOptions.h" + +namespace qpid { + +struct TestOptions : public qpid::CommonOptions +{ + TestOptions() : desc("Options"), broker("localhost"), virtualhost(""), clientid("cpp"), help(false) + { + using namespace qpid::program_options; + using namespace boost::program_options; + CommonOptions::addTo(desc); + desc.add_options() + ("broker,b", optValue(broker, "HOSTNAME"), "the hostname to connect to") + ("virtualhost,v", optValue(virtualhost, "VIRTUAL_HOST"), "virtual host") + ("clientname,n", optValue(clientid, "ID"), "unique client identifier") + ("help,h", optValue(help), "print this usage statement"); + } + + void parse(int argc, char** argv) + { + using namespace boost::program_options; + try { + variables_map vm; + store(parse_command_line(argc, argv, desc), vm); + notify(vm); + } catch(const error& e) { + std::cerr << "Error: " << e.what() << std::endl + << "Specify '--help' for usage." << std::endl; + } + } + + void usage() + { + std::cout << desc << std::endl; + } + + boost::program_options::options_description desc; + std::string broker; + std::string virtualhost; + std::string clientid; + bool help; +}; + +} + +#endif diff --git a/cpp/src/tests/interop_runner.cpp b/cpp/src/tests/interop_runner.cpp new file mode 100644 index 0000000000..26639ede17 --- /dev/null +++ b/cpp/src/tests/interop_runner.cpp @@ -0,0 +1,252 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/CommonOptions.h" +#include "qpid/Exception.h" +#include "qpid/QpidError.h" +#include "qpid/client/ClientChannel.h" +#include "qpid/client/Connection.h" +#include "qpid/client/ClientExchange.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/ClientQueue.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Time.h" +#include <iostream> +#include <memory> +#include "BasicP2PTest.h" +#include "BasicPubSubTest.h" +#include "TestCase.h" +#include <boost/ptr_container/ptr_map.hpp> + +/** + * Framework for interop tests. + * + * [see http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification for details]. + */ + +using namespace qpid::client; +using namespace qpid::sys; +using qpid::TestCase; +using qpid::TestOptions; +using qpid::framing::FieldTable; +using std::string; + +class DummyRun : public TestCase +{ +public: + DummyRun() {} + void assign(const std::string&, FieldTable&, TestOptions&) {} + void start() {} + void stop() {} + void report(qpid::client::Message&) {} +}; + +string parse_next_word(const string& input, const string& delims, string::size_type& position); + +/** + */ +class Listener : public MessageListener, private Runnable{ + typedef boost::ptr_map<std::string, TestCase> TestMap; + + Channel& channel; + TestOptions& options; + TestMap tests; + const string name; + const string topic; + TestMap::iterator test; + std::auto_ptr<Thread> runner; + string reportTo; + string reportCorrelator; + + void shutdown(); + bool invite(const string& name); + void run(); + + void sendResponse(Message& response, string replyTo); + void sendResponse(Message& response, Message& request); + void sendSimpleResponse(const string& type, Message& request); + void sendReport(); +public: + Listener(Channel& channel, TestOptions& options); + void received(Message& msg); + void bindAndConsume(); + void registerTest(std::string name, TestCase* test); +}; + +/** + */ +int main(int argc, char** argv){ + TestOptions options; + options.parse(argc, argv); + + if (options.help) { + options.usage(); + } else { + try{ + Connection connection(options.trace); + connection.open(options.broker, options.port, "guest", "guest", options.virtualhost); + + Channel channel; + connection.openChannel(channel); + + Listener listener(channel, options); + listener.registerTest("TC1_DummyRun", new DummyRun()); + listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest()); + listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest()); + + listener.bindAndConsume(); + + channel.run(); + connection.close(); + } catch(qpid::Exception error) { + std::cout << error.what() << std::endl; + } + } +} + +Listener::Listener(Channel& _channel, TestOptions& _options) : channel(_channel), options(_options), name(options.clientid), topic("iop.control." + name) +{} + +void Listener::registerTest(std::string name, TestCase* test) +{ + tests.insert(name, test); +} + +void Listener::bindAndConsume() +{ + Queue control(name, true); + channel.declareQueue(control); + qpid::framing::FieldTable bindArgs; + //replace these separate binds with a wildcard once that is supported on java broker + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "iop.control", bindArgs); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, topic, bindArgs); + + std::string tag; + channel.consume(control, tag, this); +} + +void Listener::sendSimpleResponse(const string& type, Message& request) +{ + Message response; + response.getHeaders().setString("CONTROL_TYPE", type); + response.getHeaders().setString("CLIENT_NAME", name); + response.getHeaders().setString("CLIENT_PRIVATE_CONTROL_KEY", topic); + response.setCorrelationId(request.getCorrelationId()); + sendResponse(response, request); +} + +void Listener::sendResponse(Message& response, Message& request) +{ + sendResponse(response, request.getReplyTo()); +} + +void Listener::sendResponse(Message& response, string replyTo) +{ + //Exchange and routing key need to be extracted from the reply-to + //field. Format is assumed to be: + // + // <exchange type>://<exchange name>/<routing key>?<options> + // + //and all we need is the exchange name and routing key + // + if (replyTo.empty()) throw qpid::Exception("Reply address not set!"); + const string delims(":/?="); + + string::size_type start = replyTo.find(':');//skip exchange type + string exchange = parse_next_word(replyTo, delims, start); + string routingKey = parse_next_word(replyTo, delims, start); + channel.publish(response, exchange, routingKey); +} + +void Listener::received(Message& message) +{ + std::string type(message.getHeaders().getString("CONTROL_TYPE")); + + if (type == "INVITE") { + std::string name(message.getHeaders().getString("TEST_NAME")); + if (name.empty() || invite(name)) { + sendSimpleResponse("ENLIST", message); + } else { + std::cout << "Can't take part in '" << name << "'" << std::endl; + } + } else if (type == "ASSIGN_ROLE") { + test->assign(message.getHeaders().getString("ROLE"), message.getHeaders(), options); + sendSimpleResponse("ACCEPT_ROLE", message); + } else if (type == "START") { + reportTo = message.getReplyTo(); + reportCorrelator = message.getCorrelationId(); + runner = std::auto_ptr<Thread>(new Thread(this)); + } else if (type == "STATUS_REQUEST") { + reportTo = message.getReplyTo(); + reportCorrelator = message.getCorrelationId(); + test->stop(); + sendReport(); + } else if (type == "TERMINATE") { + if (test != tests.end()) test->stop(); + shutdown(); + } else { + std::cerr <<"ERROR!: Received unknown control message: " << type << std::endl; + shutdown(); + } +} + +void Listener::shutdown() +{ + channel.close(); +} + +bool Listener::invite(const string& name) +{ + test = tests.find(name); + return test != tests.end(); +} + +void Listener::run() +{ + //NB: this method will be called in its own thread + //start test and when start returns... + test->start(); + sendReport(); +} + +void Listener::sendReport() +{ + Message report; + report.getHeaders().setString("CONTROL_TYPE", "REPORT"); + test->report(report); + report.setCorrelationId(reportCorrelator); + sendResponse(report, reportTo); +} + +string parse_next_word(const string& input, const string& delims, string::size_type& position) +{ + string::size_type start = input.find_first_not_of(delims, position); + if (start == string::npos) { + return ""; + } else { + string::size_type end = input.find_first_of(delims, start); + if (end == string::npos) { + end = input.length(); + } + position = end; + return input.substr(start, end - start); + } +} diff --git a/python/tests_0-9/broker.py b/python/tests_0-9/broker.py index a978993891..30eda03d48 100644 --- a/python/tests_0-9/broker.py +++ b/python/tests_0-9/broker.py @@ -114,3 +114,20 @@ class BrokerTests(TestBase): self.assertEqual(reply.method.klass.name, "channel") self.assertEqual(reply.method.name, "ok") #todo: provide a way to get notified of incoming pongs... + + def test_channel_flow(self): + channel = self.channel + channel.queue_declare(queue="flow_test_queue", exclusive=True) + channel.message_consume(destination="my-tag", queue="flow_test_queue") + incoming = self.client.queue("my-tag") + + channel.channel_flow(active=False) + channel.message_transfer(routing_key="flow_test_queue", body="abcdefghijklmnopqrstuvwxyz") + try: + incoming.get(timeout=1) + self.fail("Received message when flow turned off.") + except Empty: None + + channel.channel_flow(active=True) + msg = incoming.get(timeout=1) + self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.body) |