summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/MessagingFixture.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/MessagingFixture.h')
-rw-r--r--qpid/cpp/src/tests/MessagingFixture.h345
1 files changed, 345 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/MessagingFixture.h b/qpid/cpp/src/tests/MessagingFixture.h
new file mode 100644
index 0000000000..2312a87e9d
--- /dev/null
+++ b/qpid/cpp/src/tests/MessagingFixture.h
@@ -0,0 +1,345 @@
+#ifndef TESTS_MESSAGINGFIXTURE_H
+#define TESTS_MESSAGINGFIXTURE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "BrokerFixture.h"
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace tests {
+
+using qpid::types::Variant;
+
+struct BrokerAdmin
+{
+ qpid::client::Connection connection;
+ qpid::client::Session session;
+
+ BrokerAdmin(uint16_t port)
+ {
+ connection.open("localhost", port);
+ session = connection.newSession();
+ }
+
+ void createQueue(const std::string& name)
+ {
+ session.queueDeclare(qpid::client::arg::queue=name);
+ }
+
+ void deleteQueue(const std::string& name)
+ {
+ session.queueDelete(qpid::client::arg::queue=name);
+ }
+
+ void createExchange(const std::string& name, const std::string& type)
+ {
+ session.exchangeDeclare(qpid::client::arg::exchange=name, qpid::client::arg::type=type);
+ }
+
+ void deleteExchange(const std::string& name)
+ {
+ session.exchangeDelete(qpid::client::arg::exchange=name);
+ }
+
+ bool checkQueueExists(const std::string& name)
+ {
+ return session.queueQuery(name).getQueue() == name;
+ }
+
+ bool checkExchangeExists(const std::string& name, std::string& type)
+ {
+ qpid::framing::ExchangeQueryResult result = session.exchangeQuery(name);
+ type = result.getType();
+ return !result.getNotFound();
+ }
+
+ void send(qpid::client::Message& message, const std::string& exchange=std::string())
+ {
+ session.messageTransfer(qpid::client::arg::destination=exchange, qpid::client::arg::content=message);
+ }
+
+ ~BrokerAdmin()
+ {
+ session.close();
+ connection.close();
+ }
+};
+
+struct MessagingFixture : public BrokerFixture
+{
+ messaging::Connection connection;
+ messaging::Session session;
+ BrokerAdmin admin;
+
+ MessagingFixture(Broker::Options opts = Broker::Options(), bool mgmtEnabled=false) :
+ BrokerFixture(opts, mgmtEnabled),
+ connection(open(broker->getPort(Broker::TCP_TRANSPORT))),
+ session(connection.createSession()),
+ admin(broker->getPort(Broker::TCP_TRANSPORT))
+ {
+ }
+
+ static messaging::Connection open(uint16_t port)
+ {
+ messaging::Connection connection(
+ (boost::format("amqp:tcp:localhost:%1%") % (port)).str());
+ connection.open();
+ return connection;
+ }
+
+ /** Open a connection to the broker. */
+ qpid::messaging::Connection newConnection()
+ {
+ qpid::messaging::Connection connection(
+ (boost::format("amqp:tcp:localhost:%1%") % (broker->getPort(qpid::broker::Broker::TCP_TRANSPORT))).str());
+ return connection;
+ }
+
+ void ping(const qpid::messaging::Address& address)
+ {
+ messaging::Receiver r = session.createReceiver(address);
+ messaging::Sender s = session.createSender(address);
+ messaging::Message out(framing::Uuid(true).str());
+ s.send(out);
+ messaging::Message in;
+ BOOST_CHECK(r.fetch(in, 5*messaging::Duration::SECOND));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ r.close();
+ s.close();
+ }
+
+ ~MessagingFixture()
+ {
+ session.close();
+ connection.close();
+ }
+};
+
+struct QueueFixture : MessagingFixture
+{
+ std::string queue;
+
+ QueueFixture(const std::string& name = "test-queue") : queue(name)
+ {
+ admin.createQueue(queue);
+ }
+
+ ~QueueFixture()
+ {
+ admin.deleteQueue(queue);
+ }
+
+};
+
+struct TopicFixture : MessagingFixture
+{
+ std::string topic;
+
+ TopicFixture(const std::string& name = "test-topic", const std::string& type="fanout") : topic(name)
+ {
+ admin.createExchange(topic, type);
+ }
+
+ ~TopicFixture()
+ {
+ admin.deleteExchange(topic);
+ }
+
+};
+
+struct MultiQueueFixture : MessagingFixture
+{
+ typedef std::vector<std::string>::const_iterator const_iterator;
+ std::vector<std::string> queues;
+
+ MultiQueueFixture(const std::vector<std::string>& names = boost::assign::list_of<std::string>("q1")("q2")("q3")) : queues(names)
+ {
+ for (const_iterator i = queues.begin(); i != queues.end(); ++i) {
+ admin.createQueue(*i);
+ }
+ }
+
+ ~MultiQueueFixture()
+ {
+ connection.close();
+ for (const_iterator i = queues.begin(); i != queues.end(); ++i) {
+ admin.deleteQueue(*i);
+ }
+ }
+
+};
+
+inline std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messaging::Duration timeout=messaging::Duration::SECOND*5)
+{
+ std::vector<std::string> data;
+ messaging::Message message;
+ for (int i = 0; i < count && receiver.fetch(message, timeout); i++) {
+ data.push_back(message.getContent());
+ }
+ return data;
+}
+
+
+inline void send(messaging::Sender& sender, uint count = 1, uint start = 1,
+ const std::string& base = "Message")
+{
+ for (uint i = start; i < start + count; ++i) {
+ sender.send(messaging::Message((boost::format("%1%_%2%") % base % i).str()));
+ }
+}
+
+inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = 1,
+ const std::string& base = "Message",
+ messaging::Duration timeout=messaging::Duration::SECOND*5)
+{
+ for (uint i = start; i < start + count; ++i) {
+ BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str());
+ }
+}
+
+
+class MethodInvoker
+{
+ public:
+ MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
+ sender(session.createSender("qmf.default.direct/broker")),
+ receiver(session.createReceiver(replyTo)) {}
+
+ void createExchange(const std::string& name, const std::string& type, bool durable=false)
+ {
+ Variant::Map params;
+ params["name"]=name;
+ params["type"]="exchange";
+ params["properties"] = Variant::Map();
+ params["properties"].asMap()["exchange-type"] = type;
+ params["properties"].asMap()["durable"] = durable;
+ methodRequest("create", params);
+ }
+
+ void deleteExchange(const std::string& name)
+ {
+ Variant::Map params;
+ params["name"]=name;
+ params["type"]="exchange";
+ methodRequest("delete", params);
+ }
+
+ void createQueue(const std::string& name, bool durable=false, bool autodelete=false,
+ const Variant::Map& options=Variant::Map())
+ {
+ Variant::Map params;
+ params["name"]=name;
+ params["type"]="queue";
+ params["properties"] = options;
+ params["properties"].asMap()["durable"] = durable;
+ params["properties"].asMap()["auto-delete"] = autodelete;
+ methodRequest("create", params);
+ }
+
+ void deleteQueue(const std::string& name)
+ {
+ Variant::Map params;
+ params["name"]=name;
+ params["type"]="queue";
+ methodRequest("delete", params);
+ }
+
+ void bind(const std::string& exchange, const std::string& queue, const std::string& key,
+ const Variant::Map& options=Variant::Map())
+ {
+ Variant::Map params;
+ params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str();
+ params["type"]="binding";
+ params["properties"] = options;
+ methodRequest("create", params);
+ }
+
+ void unbind(const std::string& exchange, const std::string& queue, const std::string& key)
+ {
+ Variant::Map params;
+ params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str();
+ params["type"]="binding";
+ methodRequest("delete", params);
+ }
+
+ void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0)
+ {
+ Variant::Map content;
+ Variant::Map objectId;
+ objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
+ content["_object_id"] = objectId;
+ content["_method_name"] = method;
+ content["_arguments"] = inParams;
+
+ messaging::Message request;
+ request.setReplyTo(replyTo);
+ request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
+ request.getProperties()["qmf.opcode"] = "_method_request";
+ encode(content, request);
+
+ sender.send(request);
+
+ messaging::Message response;
+ if (receiver.fetch(response, messaging::Duration::SECOND*5)) {
+ if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
+ std::string opcode = response.getProperties()["qmf.opcode"];
+ if (opcode == "_method_response") {
+ if (outParams) {
+ Variant::Map m;
+ decode(response, m);
+ *outParams = m["_arguments"].asMap();
+ }
+ } else if (opcode == "_exception") {
+ Variant::Map m;
+ decode(response, m);
+ throw Exception(QPID_MSG("Error: " << m["_values"]));
+ } else {
+ throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode));
+ }
+ } else {
+ throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id="
+ << response.getProperties()["x-amqp-0-10.app-id"]));
+ }
+ } else {
+ throw Exception(QPID_MSG("No response received"));
+ }
+ }
+ private:
+ messaging::Address replyTo;
+ messaging::Sender sender;
+ messaging::Receiver receiver;
+};
+
+}} // namespace qpid::tests
+
+#endif /*!TESTS_MESSAGINGFIXTURE_H*/