summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/interop_runner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/interop_runner.cpp')
-rw-r--r--qpid/cpp/src/tests/interop_runner.cpp240
1 files changed, 240 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/interop_runner.cpp b/qpid/cpp/src/tests/interop_runner.cpp
new file mode 100644
index 0000000000..824af7f3b7
--- /dev/null
+++ b/qpid/cpp/src/tests/interop_runner.cpp
@@ -0,0 +1,240 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Options.h"
+#include "qpid/ptr_map.h"
+#include "qpid/Exception.h"
+#include "qpid/client/Channel.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Exchange.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/Queue.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Time.h"
+#include <iostream>
+#include <memory>
+#include "BasicP2PTest.h"
+#include "BasicPubSubTest.h"
+#include "TestCase.h"
+#include <boost/ptr_container/ptr_map.hpp>
+
+/**
+ * Framework for interop tests.
+ *
+ * [see http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification for details].
+ */
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using qpid::TestCase;
+using qpid::TestOptions;
+using qpid::framing::FieldTable;
+using qpid::framing::ReplyTo;
+using namespace std;
+
+class DummyRun : public TestCase
+{
+public:
+ DummyRun() {}
+ void assign(const string&, FieldTable&, TestOptions&) {}
+ void start() {}
+ void stop() {}
+ void report(qpid::client::Message&) {}
+};
+
+string parse_next_word(const string& input, const string& delims, string::size_type& position);
+
+/**
+ */
+class Listener : public MessageListener, private Runnable{
+ typedef boost::ptr_map<string, TestCase> TestMap;
+
+ Channel& channel;
+ TestOptions& options;
+ TestMap tests;
+ const string name;
+ const string topic;
+ TestCase* test;
+ auto_ptr<Thread> runner;
+ ReplyTo reportTo;
+ string reportCorrelator;
+
+ void shutdown();
+ bool invite(const string& name);
+ void run();
+
+ void sendResponse(Message& response, ReplyTo replyTo);
+ void sendResponse(Message& response, Message& request);
+ void sendSimpleResponse(const string& type, Message& request);
+ void sendReport();
+public:
+ Listener(Channel& channel, TestOptions& options);
+ void received(Message& msg);
+ void bindAndConsume();
+ void registerTest(string name, TestCase* test);
+};
+
+int main(int argc, char** argv) {
+ try {
+ TestOptions options;
+ options.parse(argc, argv);
+ if (options.help)
+ cout << options;
+ else {
+ Connection connection(options.trace);
+ connection.open(options.host, options.port, "guest", "guest", options.virtualhost);
+
+ Channel channel;
+ connection.openChannel(channel);
+
+ Listener listener(channel, options);
+ listener.registerTest("TC1_DummyRun", new DummyRun());
+ listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest());
+ listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest());
+
+ listener.bindAndConsume();
+
+ channel.run();
+ connection.close();
+ }
+ } catch(const exception& error) {
+ cout << error.what() << endl << "Type " << argv[0] << " --help for help" << endl;
+ }
+}
+
+Listener::Listener(Channel& _channel, TestOptions& _options) : channel(_channel), options(_options), name(options.clientid), topic("iop.control." + name)
+{}
+
+void Listener::registerTest(string name, TestCase* test)
+{
+ tests.insert(name, test);
+}
+
+void Listener::bindAndConsume()
+{
+ Queue control(name, true);
+ channel.declareQueue(control);
+ qpid::framing::FieldTable bindArgs;
+ //replace these separate binds with a wildcard once that is supported on java broker
+ channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "iop.control", bindArgs);
+ channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, topic, bindArgs);
+
+ string tag;
+ channel.consume(control, tag, this);
+}
+
+void Listener::sendSimpleResponse(const string& type, Message& request)
+{
+ Message response;
+ response.getHeaders().setString("CONTROL_TYPE", type);
+ response.getHeaders().setString("CLIENT_NAME", name);
+ response.getHeaders().setString("CLIENT_PRIVATE_CONTROL_KEY", topic);
+ response.getMessageProperties().setCorrelationId(request.getMessageProperties().getCorrelationId());
+ sendResponse(response, request);
+}
+
+void Listener::sendResponse(Message& response, Message& request)
+{
+ sendResponse(response, request.getMessageProperties().getReplyTo());
+}
+
+void Listener::sendResponse(Message& response, ReplyTo replyTo)
+{
+ string exchange = replyTo.getExchangeName();
+ string routingKey = replyTo.getRoutingKey();
+ channel.publish(response, exchange, routingKey);
+}
+
+void Listener::received(Message& message)
+{
+ string type(message.getHeaders().getString("CONTROL_TYPE"));
+
+ if (type == "INVITE") {
+ string name(message.getHeaders().getString("TEST_NAME"));
+ if (name.empty() || invite(name)) {
+ sendSimpleResponse("ENLIST", message);
+ } else {
+ cout << "Can't take part in '" << name << "'" << endl;
+ }
+ } else if (type == "ASSIGN_ROLE") {
+ test->assign(message.getHeaders().getString("ROLE"), message.getHeaders(), options);
+ sendSimpleResponse("ACCEPT_ROLE", message);
+ } else if (type == "START") {
+ reportTo = message.getMessageProperties().getReplyTo();
+ reportCorrelator = message.getMessageProperties().getCorrelationId();
+ runner = auto_ptr<Thread>(new Thread(this));
+ } else if (type == "STATUS_REQUEST") {
+ reportTo = message.getMessageProperties().getReplyTo();
+ reportCorrelator = message.getMessageProperties().getCorrelationId();
+ test->stop();
+ sendReport();
+ } else if (type == "TERMINATE") {
+ if (test) test->stop();
+ shutdown();
+ } else {
+ cerr <<"ERROR!: Received unknown control message: " << type << endl;
+ shutdown();
+ }
+}
+
+void Listener::shutdown()
+{
+ channel.close();
+}
+
+bool Listener::invite(const string& name)
+{
+ TestMap::iterator i = tests.find(name);
+ test = (i != tests.end()) ? qpid::ptr_map::get_pointer(i) : 0;
+ return test;
+}
+
+void Listener::run()
+{
+ //NB: this method will be called in its own thread
+ //start test and when start returns...
+ test->start();
+ sendReport();
+}
+
+void Listener::sendReport()
+{
+ Message report;
+ report.getHeaders().setString("CONTROL_TYPE", "REPORT");
+ test->report(report);
+ report.getMessageProperties().setCorrelationId(reportCorrelator);
+ sendResponse(report, reportTo);
+}
+
+string parse_next_word(const string& input, const string& delims, string::size_type& position)
+{
+ string::size_type start = input.find_first_not_of(delims, position);
+ if (start == string::npos) {
+ return "";
+ } else {
+ string::size_type end = input.find_first_of(delims, start);
+ if (end == string::npos) {
+ end = input.length();
+ }
+ position = end;
+ return input.substr(start, end - start);
+ }
+}