summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-06-06 16:39:03 +0000
committerGordon Sim <gsim@apache.org>2007-06-06 16:39:03 +0000
commit70a3cdf33b3e38ee26ee2840a55f83ebd26589b4 (patch)
tree07c3dab5cb7d97158737c36efa1caa8d9254c266 /cpp/src/tests
parent480e99cfc6071f15bc7135895cf2b60d0dd9c981 (diff)
downloadqpid-python-70a3cdf33b3e38ee26ee2840a55f83ebd26589b4.tar.gz
Merged in channel.flow implementation and interoperability tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@544879 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/BasicP2PTest.cpp66
-rw-r--r--cpp/src/tests/BasicP2PTest.h46
-rw-r--r--cpp/src/tests/BasicPubSubTest.cpp121
-rw-r--r--cpp/src/tests/BasicPubSubTest.h51
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp37
-rw-r--r--cpp/src/tests/Makefile.am14
-rw-r--r--cpp/src/tests/SimpleTestCaseBase.cpp87
-rw-r--r--cpp/src/tests/SimpleTestCaseBase.h88
-rw-r--r--cpp/src/tests/TestCase.h64
-rw-r--r--cpp/src/tests/TestOptions.h69
-rw-r--r--cpp/src/tests/interop_runner.cpp252
11 files changed, 894 insertions, 1 deletions
diff --git a/cpp/src/tests/BasicP2PTest.cpp b/cpp/src/tests/BasicP2PTest.cpp
new file mode 100644
index 0000000000..b202f88ca6
--- /dev/null
+++ b/cpp/src/tests/BasicP2PTest.cpp
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BasicP2PTest.h"
+
+using namespace qpid;
+using namespace qpid::client;
+
+class BasicP2PTest::Receiver : public Worker, public MessageListener
+{
+ const std::string queue;
+ std::string tag;
+public:
+ Receiver(TestOptions& options, const std::string& _queue, const int _messages)
+ : Worker(options, _messages), queue(_queue){}
+ void init()
+ {
+ Queue q(queue, true);
+ channel.declareQueue(q);
+ framing::FieldTable args;
+ channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, q, queue, args);
+ channel.consume(q, tag, this);
+ channel.start();
+ }
+
+ void start()
+ {
+ }
+
+ void received(Message&)
+ {
+ count++;
+ }
+};
+
+void BasicP2PTest::assign(const std::string& role, framing::FieldTable& params, TestOptions& options)
+{
+ std::string queue = params.getString("P2P_QUEUE_AND_KEY_NAME");
+ int messages = params.getInt("P2P_NUM_MESSAGES");
+ if (role == "SENDER") {
+ worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_DIRECT_EXCHANGE, queue, messages));
+ } else if(role == "RECEIVER"){
+ worker = std::auto_ptr<Worker>(new Receiver(options, queue, messages));
+ } else {
+ throw Exception("unrecognised role");
+ }
+ worker->init();
+}
diff --git a/cpp/src/tests/BasicP2PTest.h b/cpp/src/tests/BasicP2PTest.h
new file mode 100644
index 0000000000..b80fff1171
--- /dev/null
+++ b/cpp/src/tests/BasicP2PTest.h
@@ -0,0 +1,46 @@
+#ifndef _BasicP2PTest_
+#define _BasicP2PTest_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <sstream>
+
+#include "qpid/Exception.h"
+#include "qpid/client/ClientChannel.h"
+#include "qpid/client/ClientMessage.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/MessageListener.h"
+#include "SimpleTestCaseBase.h"
+
+
+namespace qpid {
+
+class BasicP2PTest : public SimpleTestCaseBase
+{
+ class Receiver;
+public:
+ void assign(const std::string& role, framing::FieldTable& params, TestOptions& options);
+};
+
+}
+
+#endif
diff --git a/cpp/src/tests/BasicPubSubTest.cpp b/cpp/src/tests/BasicPubSubTest.cpp
new file mode 100644
index 0000000000..623194d331
--- /dev/null
+++ b/cpp/src/tests/BasicPubSubTest.cpp
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BasicPubSubTest.h"
+
+using namespace qpid;
+
+class BasicPubSubTest::Receiver : public Worker, public MessageListener
+{
+ const Exchange& exchange;
+ const std::string queue;
+ const std::string key;
+ std::string tag;
+public:
+ Receiver(TestOptions& options, const Exchange& _exchange, const std::string& _queue, const std::string& _key, const int _messages)
+ : Worker(options, _messages), exchange(_exchange), queue(_queue), key(_key){}
+
+ void init()
+ {
+ Queue q(queue, true);
+ channel.declareQueue(q);
+ framing::FieldTable args;
+ channel.bind(exchange, q, key, args);
+ channel.consume(q, tag, this);
+ channel.start();
+ }
+
+ void start(){
+ }
+
+ void received(Message&)
+ {
+ count++;
+ }
+};
+
+class BasicPubSubTest::MultiReceiver : public Worker, public MessageListener
+{
+ typedef boost::ptr_vector<Receiver> ReceiverList;
+ ReceiverList receivers;
+
+public:
+ MultiReceiver(TestOptions& options, const Exchange& exchange, const std::string& key, const int _messages, int receiverCount)
+ : Worker(options, _messages)
+ {
+ for (int i = 0; i != receiverCount; i++) {
+ std::string queue = (boost::format("%1%_%2%") % options.clientid % i).str();
+ receivers.push_back(new Receiver(options, exchange, queue, key, _messages));
+ }
+ }
+
+ void init()
+ {
+ for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
+ receivers[i].init();
+ }
+ }
+
+ void start()
+ {
+ for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
+ receivers[i].start();
+ }
+ }
+
+ void received(Message& msg)
+ {
+ for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
+ receivers[i].received(msg);
+ }
+ }
+
+ virtual int getCount()
+ {
+ count = 0;
+ for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
+ count += receivers[i].getCount();
+ }
+ return count;
+ }
+ virtual void stop()
+ {
+ for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
+ receivers[i].stop();
+ }
+ }
+};
+
+void BasicPubSubTest::assign(const std::string& role, framing::FieldTable& params, TestOptions& options)
+{
+ std::string key = params.getString("PUBSUB_KEY");
+ int messages = params.getInt("PUBSUB_NUM_MESSAGES");
+ int receivers = params.getInt("PUBSUB_NUM_RECEIVERS");
+ if (role == "SENDER") {
+ worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_TOPIC_EXCHANGE, key, messages));
+ } else if(role == "RECEIVER"){
+ worker = std::auto_ptr<Worker>(new MultiReceiver(options, Exchange::STANDARD_TOPIC_EXCHANGE, key, messages, receivers));
+ } else {
+ throw Exception("unrecognised role");
+ }
+ worker->init();
+}
+
diff --git a/cpp/src/tests/BasicPubSubTest.h b/cpp/src/tests/BasicPubSubTest.h
new file mode 100644
index 0000000000..b7ccba1a81
--- /dev/null
+++ b/cpp/src/tests/BasicPubSubTest.h
@@ -0,0 +1,51 @@
+#ifndef _BasicPubSubTest_
+#define _BasicPubSubTest_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <sstream>
+
+#include "qpid/Exception.h"
+#include "qpid/client/ClientChannel.h"
+#include "qpid/client/ClientMessage.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/MessageListener.h"
+#include "SimpleTestCaseBase.h"
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/format.hpp>
+
+
+namespace qpid {
+
+using namespace qpid::client;
+
+class BasicPubSubTest : public SimpleTestCaseBase
+{
+ class Receiver;
+ class MultiReceiver;
+public:
+ void assign(const std::string& role, framing::FieldTable& params, TestOptions& options);
+};
+
+}
+
+#endif
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 3b2119be13..19841dc18d 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -57,6 +57,7 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_TEST(testDeliveryAndRecovery);
CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testQueuePolicy);
+ CPPUNIT_TEST(testFlow);
CPPUNIT_TEST_SUITE_END();
Broker::shared_ptr broker;
@@ -333,6 +334,42 @@ class BrokerChannelTest : public CppUnit::TestCase
store.check();
}
+ void testFlow(){
+ Channel channel(connection, 7, 10000);
+ channel.open();
+ //there will always be a connection-start frame
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
+ CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
+ handler.frames[0].getBody().get()));
+
+ const string data("abcdefghijklmn");
+
+ Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+ addContent(msg, data);
+ Queue::shared_ptr queue(new Queue("my_queue"));
+ ConnectionToken* owner(0);
+ string tag("no_ack");
+ channel.consume(tag, queue, false, false, owner);
+ channel.flow(false);
+ queue->deliver(msg);
+ //ensure no more frames have been delivered
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount());
+ channel.flow(true);
+ CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1].getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2].getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[3].getChannel());
+ BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[1].getBody()));
+ AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[2].getBody()));
+ AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3].getBody()));
+ CPPUNIT_ASSERT(deliver);
+ CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT(contentBody);
+ CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
+ }
+
Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
{
BasicMessage* msg = new BasicMessage(
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 699a2f073c..c351408988 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -63,7 +63,7 @@ testprogs = \
topic_listener \
topic_publisher
-check_PROGRAMS = $(UNIT_TESTS) $(testprogs)
+check_PROGRAMS = $(UNIT_TESTS) $(testprogs) interop_runner
# FIXME aconway 2007-05-30: TESTS_ENVIRONMENT should have ./run_test
# as below to run valgrind on all test programs.
@@ -137,3 +137,15 @@ check-unit:
CLEANFILES=valgrind.out qpidd.log .valgrindrc .valgrind.supp
MAINTAINERCLEANFILES=gen.mk
+
+interop_runner_SOURCES = \
+ interop_runner.cpp \
+ SimpleTestCaseBase.cpp \
+ BasicP2PTest.cpp \
+ BasicPubSubTest.cpp \
+ SimpleTestCaseBase.h \
+ BasicP2PTest.h \
+ BasicPubSubTest.h \
+ TestCase.h \
+ TestOptions.h
+interop_runner_LDADD = $(lib_client) $(lib_common) $(extra_libs)
diff --git a/cpp/src/tests/SimpleTestCaseBase.cpp b/cpp/src/tests/SimpleTestCaseBase.cpp
new file mode 100644
index 0000000000..5f37fd0eb3
--- /dev/null
+++ b/cpp/src/tests/SimpleTestCaseBase.cpp
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SimpleTestCaseBase.h"
+
+using namespace qpid;
+
+void SimpleTestCaseBase::start()
+{
+ if (worker.get()) {
+ worker->start();
+ }
+}
+
+void SimpleTestCaseBase::stop()
+{
+ if (worker.get()) {
+ worker->stop();
+ }
+}
+
+void SimpleTestCaseBase::report(client::Message& report)
+{
+ if (worker.get()) {
+ report.getHeaders().setInt("MESSAGE_COUNT", worker->getCount());
+ //add number of messages sent or received
+ std::stringstream reportstr;
+ reportstr << worker->getCount();
+ report.setData(reportstr.str());
+ }
+}
+
+SimpleTestCaseBase::Sender::Sender(TestOptions& options,
+ const Exchange& _exchange,
+ const std::string& _key,
+ const int _messages)
+ : Worker(options, _messages), exchange(_exchange), key(_key) {}
+
+void SimpleTestCaseBase::Sender::init()
+{
+ channel.start();
+}
+
+void SimpleTestCaseBase::Sender::start(){
+ Message msg;
+ while (count < messages) {
+ channel.publish(msg, exchange, key);
+ count++;
+ }
+ stop();
+}
+
+SimpleTestCaseBase::Worker::Worker(TestOptions& options, const int _messages) :
+ connection(options.trace), messages(_messages), count(0)
+{
+ connection.open(options.broker, options.port);
+ connection.openChannel(channel);
+}
+
+void SimpleTestCaseBase::Worker::stop()
+{
+ channel.close();
+ connection.close();
+}
+
+int SimpleTestCaseBase::Worker::getCount()
+{
+ return count;
+}
+
diff --git a/cpp/src/tests/SimpleTestCaseBase.h b/cpp/src/tests/SimpleTestCaseBase.h
new file mode 100644
index 0000000000..ee1742a7f7
--- /dev/null
+++ b/cpp/src/tests/SimpleTestCaseBase.h
@@ -0,0 +1,88 @@
+#ifndef _SimpleTestCaseBase_
+#define _SimpleTestCaseBase_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <sstream>
+
+#include "qpid/Exception.h"
+#include "qpid/client/ClientChannel.h"
+#include "qpid/client/ClientMessage.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/MessageListener.h"
+#include "TestCase.h"
+
+
+namespace qpid {
+
+using namespace qpid::client;
+
+class SimpleTestCaseBase : public TestCase
+{
+protected:
+ class Worker
+ {
+ protected:
+ client::Connection connection;
+ client::Channel channel;
+ const int messages;
+ int count;
+
+ public:
+
+ Worker(TestOptions& options, const int messages);
+ virtual ~Worker(){}
+
+ virtual void stop();
+ virtual int getCount();
+ virtual void init() = 0;
+ virtual void start() = 0;
+ };
+
+ class Sender : public Worker
+ {
+ const Exchange& exchange;
+ const std::string key;
+ public:
+ Sender(TestOptions& options,
+ const Exchange& exchange,
+ const std::string& key,
+ const int messages);
+ void init();
+ void start();
+ };
+
+ std::auto_ptr<Worker> worker;
+
+public:
+ virtual void assign(const std::string& role, framing::FieldTable& params, TestOptions& options) = 0;
+
+ virtual ~SimpleTestCaseBase() {}
+
+ void start();
+ void stop();
+ void report(client::Message& report);
+};
+
+}
+
+#endif
diff --git a/cpp/src/tests/TestCase.h b/cpp/src/tests/TestCase.h
new file mode 100644
index 0000000000..71e1d1118c
--- /dev/null
+++ b/cpp/src/tests/TestCase.h
@@ -0,0 +1,64 @@
+#ifndef _TestCase_
+#define _TestCase_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ClientMessage.h"
+#include "TestOptions.h"
+
+
+namespace qpid {
+
+/**
+ * Interface to be implemented by test cases for use with the test
+ * runner.
+ */
+class TestCase
+{
+public:
+ /**
+ * Directs the test case to act in a particular role. Some roles
+ * may be 'activated' at this stage others may require an explicit
+ * start request.
+ */
+ virtual void assign(const std::string& role, framing::FieldTable& params, TestOptions& options) = 0;
+ /**
+ * Each test will be started on its own thread, which should block
+ * until the test completes (this may or may not require an
+ * explicit stop() request).
+ */
+ virtual void start() = 0;
+ /**
+ * Requests that the test be stopped if still running.
+ */
+ virtual void stop() = 0;
+ /**
+ * Allows the test to fill in details on the final report
+ * message. Will be called only after start has returned.
+ */
+ virtual void report(client::Message& report) = 0;
+
+ virtual ~TestCase() {}
+};
+
+}
+
+#endif
diff --git a/cpp/src/tests/TestOptions.h b/cpp/src/tests/TestOptions.h
new file mode 100644
index 0000000000..45575ba450
--- /dev/null
+++ b/cpp/src/tests/TestOptions.h
@@ -0,0 +1,69 @@
+#ifndef _TestOptions_
+#define _TestOptions_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/CommonOptions.h"
+
+namespace qpid {
+
+struct TestOptions : public qpid::CommonOptions
+{
+ TestOptions() : desc("Options"), broker("localhost"), virtualhost(""), clientid("cpp"), help(false)
+ {
+ using namespace qpid::program_options;
+ using namespace boost::program_options;
+ CommonOptions::addTo(desc);
+ desc.add_options()
+ ("broker,b", optValue(broker, "HOSTNAME"), "the hostname to connect to")
+ ("virtualhost,v", optValue(virtualhost, "VIRTUAL_HOST"), "virtual host")
+ ("clientname,n", optValue(clientid, "ID"), "unique client identifier")
+ ("help,h", optValue(help), "print this usage statement");
+ }
+
+ void parse(int argc, char** argv)
+ {
+ using namespace boost::program_options;
+ try {
+ variables_map vm;
+ store(parse_command_line(argc, argv, desc), vm);
+ notify(vm);
+ } catch(const error& e) {
+ std::cerr << "Error: " << e.what() << std::endl
+ << "Specify '--help' for usage." << std::endl;
+ }
+ }
+
+ void usage()
+ {
+ std::cout << desc << std::endl;
+ }
+
+ boost::program_options::options_description desc;
+ std::string broker;
+ std::string virtualhost;
+ std::string clientid;
+ bool help;
+};
+
+}
+
+#endif
diff --git a/cpp/src/tests/interop_runner.cpp b/cpp/src/tests/interop_runner.cpp
new file mode 100644
index 0000000000..26639ede17
--- /dev/null
+++ b/cpp/src/tests/interop_runner.cpp
@@ -0,0 +1,252 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/CommonOptions.h"
+#include "qpid/Exception.h"
+#include "qpid/QpidError.h"
+#include "qpid/client/ClientChannel.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ClientExchange.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/ClientQueue.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Time.h"
+#include <iostream>
+#include <memory>
+#include "BasicP2PTest.h"
+#include "BasicPubSubTest.h"
+#include "TestCase.h"
+#include <boost/ptr_container/ptr_map.hpp>
+
+/**
+ * Framework for interop tests.
+ *
+ * [see http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification for details].
+ */
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using qpid::TestCase;
+using qpid::TestOptions;
+using qpid::framing::FieldTable;
+using std::string;
+
+class DummyRun : public TestCase
+{
+public:
+ DummyRun() {}
+ void assign(const std::string&, FieldTable&, TestOptions&) {}
+ void start() {}
+ void stop() {}
+ void report(qpid::client::Message&) {}
+};
+
+string parse_next_word(const string& input, const string& delims, string::size_type& position);
+
+/**
+ */
+class Listener : public MessageListener, private Runnable{
+ typedef boost::ptr_map<std::string, TestCase> TestMap;
+
+ Channel& channel;
+ TestOptions& options;
+ TestMap tests;
+ const string name;
+ const string topic;
+ TestMap::iterator test;
+ std::auto_ptr<Thread> runner;
+ string reportTo;
+ string reportCorrelator;
+
+ void shutdown();
+ bool invite(const string& name);
+ void run();
+
+ void sendResponse(Message& response, string replyTo);
+ void sendResponse(Message& response, Message& request);
+ void sendSimpleResponse(const string& type, Message& request);
+ void sendReport();
+public:
+ Listener(Channel& channel, TestOptions& options);
+ void received(Message& msg);
+ void bindAndConsume();
+ void registerTest(std::string name, TestCase* test);
+};
+
+/**
+ */
+int main(int argc, char** argv){
+ TestOptions options;
+ options.parse(argc, argv);
+
+ if (options.help) {
+ options.usage();
+ } else {
+ try{
+ Connection connection(options.trace);
+ connection.open(options.broker, options.port, "guest", "guest", options.virtualhost);
+
+ Channel channel;
+ connection.openChannel(channel);
+
+ Listener listener(channel, options);
+ listener.registerTest("TC1_DummyRun", new DummyRun());
+ listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest());
+ listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest());
+
+ listener.bindAndConsume();
+
+ channel.run();
+ connection.close();
+ } catch(qpid::Exception error) {
+ std::cout << error.what() << std::endl;
+ }
+ }
+}
+
+Listener::Listener(Channel& _channel, TestOptions& _options) : channel(_channel), options(_options), name(options.clientid), topic("iop.control." + name)
+{}
+
+void Listener::registerTest(std::string name, TestCase* test)
+{
+ tests.insert(name, test);
+}
+
+void Listener::bindAndConsume()
+{
+ Queue control(name, true);
+ channel.declareQueue(control);
+ qpid::framing::FieldTable bindArgs;
+ //replace these separate binds with a wildcard once that is supported on java broker
+ channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "iop.control", bindArgs);
+ channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, topic, bindArgs);
+
+ std::string tag;
+ channel.consume(control, tag, this);
+}
+
+void Listener::sendSimpleResponse(const string& type, Message& request)
+{
+ Message response;
+ response.getHeaders().setString("CONTROL_TYPE", type);
+ response.getHeaders().setString("CLIENT_NAME", name);
+ response.getHeaders().setString("CLIENT_PRIVATE_CONTROL_KEY", topic);
+ response.setCorrelationId(request.getCorrelationId());
+ sendResponse(response, request);
+}
+
+void Listener::sendResponse(Message& response, Message& request)
+{
+ sendResponse(response, request.getReplyTo());
+}
+
+void Listener::sendResponse(Message& response, string replyTo)
+{
+ //Exchange and routing key need to be extracted from the reply-to
+ //field. Format is assumed to be:
+ //
+ // <exchange type>://<exchange name>/<routing key>?<options>
+ //
+ //and all we need is the exchange name and routing key
+ //
+ if (replyTo.empty()) throw qpid::Exception("Reply address not set!");
+ const string delims(":/?=");
+
+ string::size_type start = replyTo.find(':');//skip exchange type
+ string exchange = parse_next_word(replyTo, delims, start);
+ string routingKey = parse_next_word(replyTo, delims, start);
+ channel.publish(response, exchange, routingKey);
+}
+
+void Listener::received(Message& message)
+{
+ std::string type(message.getHeaders().getString("CONTROL_TYPE"));
+
+ if (type == "INVITE") {
+ std::string name(message.getHeaders().getString("TEST_NAME"));
+ if (name.empty() || invite(name)) {
+ sendSimpleResponse("ENLIST", message);
+ } else {
+ std::cout << "Can't take part in '" << name << "'" << std::endl;
+ }
+ } else if (type == "ASSIGN_ROLE") {
+ test->assign(message.getHeaders().getString("ROLE"), message.getHeaders(), options);
+ sendSimpleResponse("ACCEPT_ROLE", message);
+ } else if (type == "START") {
+ reportTo = message.getReplyTo();
+ reportCorrelator = message.getCorrelationId();
+ runner = std::auto_ptr<Thread>(new Thread(this));
+ } else if (type == "STATUS_REQUEST") {
+ reportTo = message.getReplyTo();
+ reportCorrelator = message.getCorrelationId();
+ test->stop();
+ sendReport();
+ } else if (type == "TERMINATE") {
+ if (test != tests.end()) test->stop();
+ shutdown();
+ } else {
+ std::cerr <<"ERROR!: Received unknown control message: " << type << std::endl;
+ shutdown();
+ }
+}
+
+void Listener::shutdown()
+{
+ channel.close();
+}
+
+bool Listener::invite(const string& name)
+{
+ test = tests.find(name);
+ return test != tests.end();
+}
+
+void Listener::run()
+{
+ //NB: this method will be called in its own thread
+ //start test and when start returns...
+ test->start();
+ sendReport();
+}
+
+void Listener::sendReport()
+{
+ Message report;
+ report.getHeaders().setString("CONTROL_TYPE", "REPORT");
+ test->report(report);
+ report.setCorrelationId(reportCorrelator);
+ sendResponse(report, reportTo);
+}
+
+string parse_next_word(const string& input, const string& delims, string::size_type& position)
+{
+ string::size_type start = input.find_first_not_of(delims, position);
+ if (start == string::npos) {
+ return "";
+ } else {
+ string::size_type end = input.find_first_of(delims, start);
+ if (end == string::npos) {
+ end = input.length();
+ }
+ position = end;
+ return input.substr(start, end - start);
+ }
+}