summaryrefslogtreecommitdiff
path: root/cpp/src/tests/MessagingSessionTests.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/MessagingSessionTests.cpp')
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp778
1 files changed, 778 insertions, 0 deletions
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
new file mode 100644
index 0000000000..4125c51698
--- /dev/null
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -0,0 +1,778 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "unit_test.h"
+#include "test_tools.h"
+#include "BrokerFixture.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/ListContent.h"
+#include "qpid/messaging/ListView.h"
+#include "qpid/messaging/MapContent.h"
+#include "qpid/messaging/MapView.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/framing/ExchangeQueryResult.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Time.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
+#include <string>
+#include <vector>
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(MessagingSessionTests)
+
+using namespace qpid::messaging;
+using namespace qpid;
+using qpid::broker::Broker;
+using qpid::framing::Uuid;
+
+struct BrokerAdmin
+{
+ qpid::client::Connection connection;
+ qpid::client::Session session;
+
+ BrokerAdmin(uint16_t port)
+ {
+ connection.open("localhost", port);
+ session = connection.newSession();
+ }
+
+ void createQueue(const std::string& name)
+ {
+ session.queueDeclare(qpid::client::arg::queue=name);
+ }
+
+ void deleteQueue(const std::string& name)
+ {
+ session.queueDelete(qpid::client::arg::queue=name);
+ }
+
+ void createExchange(const std::string& name, const std::string& type)
+ {
+ session.exchangeDeclare(qpid::client::arg::exchange=name, qpid::client::arg::type=type);
+ }
+
+ void deleteExchange(const std::string& name)
+ {
+ session.exchangeDelete(qpid::client::arg::exchange=name);
+ }
+
+ bool checkQueueExists(const std::string& name)
+ {
+ return session.queueQuery(name).getQueue() == name;
+ }
+
+ bool checkExchangeExists(const std::string& name, std::string& type)
+ {
+ qpid::framing::ExchangeQueryResult result = session.exchangeQuery(name);
+ type = result.getType();
+ return !result.getNotFound();
+ }
+
+ ~BrokerAdmin()
+ {
+ session.close();
+ connection.close();
+ }
+};
+
+struct MessagingFixture : public BrokerFixture
+{
+ Connection connection;
+ Session session;
+ BrokerAdmin admin;
+
+ MessagingFixture(Broker::Options opts = Broker::Options()) :
+ BrokerFixture(opts),
+ connection(Connection::open((boost::format("amqp:tcp:localhost:%1%") % (broker->getPort(Broker::TCP_TRANSPORT))).str())),
+ session(connection.newSession()),
+ admin(broker->getPort(Broker::TCP_TRANSPORT)) {}
+
+ void ping(const qpid::messaging::Address& address)
+ {
+ Receiver r = session.createReceiver(address);
+ Sender s = session.createSender(address);
+ Message out(Uuid(true).str());
+ s.send(out);
+ Message in;
+ BOOST_CHECK(r.fetch(in, 5*qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ r.cancel();
+ s.cancel();
+ }
+
+ ~MessagingFixture()
+ {
+ session.close();
+ connection.close();
+ }
+};
+
+struct QueueFixture : MessagingFixture
+{
+ std::string queue;
+
+ QueueFixture(const std::string& name = "test-queue") : queue(name)
+ {
+ admin.createQueue(queue);
+ }
+
+ ~QueueFixture()
+ {
+ admin.deleteQueue(queue);
+ }
+
+};
+
+struct TopicFixture : MessagingFixture
+{
+ std::string topic;
+
+ TopicFixture(const std::string& name = "test-topic", const std::string& type="fanout") : topic(name)
+ {
+ admin.createExchange(topic, type);
+ }
+
+ ~TopicFixture()
+ {
+ admin.deleteExchange(topic);
+ }
+
+};
+
+struct MultiQueueFixture : MessagingFixture
+{
+ typedef std::vector<std::string>::const_iterator const_iterator;
+ std::vector<std::string> queues;
+
+ MultiQueueFixture(const std::vector<std::string>& names = boost::assign::list_of<std::string>("q1")("q2")("q3")) : queues(names)
+ {
+ for (const_iterator i = queues.begin(); i != queues.end(); ++i) {
+ admin.createQueue(*i);
+ }
+ }
+
+ ~MultiQueueFixture()
+ {
+ for (const_iterator i = queues.begin(); i != queues.end(); ++i) {
+ admin.deleteQueue(*i);
+ }
+ }
+
+};
+std::vector<std::string> fetch(Receiver& receiver, int count, qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5)
+{
+ std::vector<std::string> data;
+ Message message;
+ for (int i = 0; i < count && receiver.fetch(message, timeout); i++) {
+ data.push_back(message.getContent());
+ }
+ return data;
+}
+
+
+void send(Sender& sender, uint count = 1, uint start = 1, const std::string& base = "Message")
+{
+ for (uint i = start; i < start + count; ++i) {
+ sender.send(Message((boost::format("%1%_%2%") % base % i).str()));
+ }
+}
+
+void receive(Receiver& receiver, uint count = 1, uint start = 1,
+ const std::string& base = "Message", qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5)
+{
+ for (uint i = start; i < start + count; ++i) {
+ BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str());
+ }
+}
+
+QPID_AUTO_TEST_CASE(testSimpleSendReceive)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Message out("test-message");
+ sender.send(out);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ fix.session.acknowledge();
+ BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+}
+
+QPID_AUTO_TEST_CASE(testSendReceiveHeaders)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Message out("test-message");
+ for (uint i = 0; i < 10; ++i) {
+ out.getHeaders()["a"] = i;
+ sender.send(out);
+ }
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in;
+ for (uint i = 0; i < 10; ++i) {
+ BOOST_CHECK(receiver.fetch(in, 5 * qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+ BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i);
+ fix.session.acknowledge();
+ }
+}
+
+QPID_AUTO_TEST_CASE(testSenderError)
+{
+ MessagingFixture fix;
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::InvalidAddress);
+ fix.session = fix.connection.newSession();
+ BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress; {create:receiver}"),
+ qpid::messaging::InvalidAddress);
+}
+
+QPID_AUTO_TEST_CASE(testReceiverError)
+{
+ MessagingFixture fix;
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::InvalidAddress);
+ fix.session = fix.connection.newSession();
+ BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress; {create:sender}"),
+ qpid::messaging::InvalidAddress);
+}
+
+QPID_AUTO_TEST_CASE(testSimpleTopic)
+{
+ TopicFixture fix;
+
+ Sender sender = fix.session.createSender(fix.topic);
+ Message msg("one");
+ sender.send(msg);
+ Receiver sub1 = fix.session.createReceiver(fix.topic);
+ sub1.setCapacity(10u);
+ msg.setContent("two");
+ sender.send(msg);
+ Receiver sub2 = fix.session.createReceiver(fix.topic);
+ sub2.setCapacity(10u);
+ msg.setContent("three");
+ sender.send(msg);
+ Receiver sub3 = fix.session.createReceiver(fix.topic);
+ sub3.setCapacity(10u);
+ msg.setContent("four");
+ sender.send(msg);
+ BOOST_CHECK_EQUAL(fetch(sub2, 2), boost::assign::list_of<std::string>("three")("four"));
+ sub2.cancel();
+
+ msg.setContent("five");
+ sender.send(msg);
+ BOOST_CHECK_EQUAL(fetch(sub1, 4), boost::assign::list_of<std::string>("two")("three")("four")("five"));
+ BOOST_CHECK_EQUAL(fetch(sub3, 2), boost::assign::list_of<std::string>("four")("five"));
+ Message in;
+ BOOST_CHECK(!sub2.fetch(in, 0));//TODO: or should this raise an error?
+
+
+ //TODO: check pending messages...
+}
+
+QPID_AUTO_TEST_CASE(testNextReceiver)
+{
+ MultiQueueFixture fix;
+
+ for (uint i = 0; i < fix.queues.size(); i++) {
+ Receiver r = fix.session.createReceiver(fix.queues[i]);
+ r.setCapacity(10u);
+ }
+
+ for (uint i = 0; i < fix.queues.size(); i++) {
+ Sender s = fix.session.createSender(fix.queues[i]);
+ Message msg((boost::format("Message_%1%") % (i+1)).str());
+ s.send(msg);
+ }
+
+ for (uint i = 0; i < fix.queues.size(); i++) {
+ Message msg;
+ BOOST_CHECK(fix.session.nextReceiver().fetch(msg, qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str());
+ }
+}
+
+QPID_AUTO_TEST_CASE(testMapMessage)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Message out;
+ MapContent content(out);
+ content["abc"] = "def";
+ content["pi"] = 3.14f;
+ content.encode();
+ sender.send(out);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ MapView view(in);
+ BOOST_CHECK_EQUAL(view["abc"].asString(), "def");
+ BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f);
+ fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testListMessage)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Message out;
+ ListContent content(out);
+ content.push_back(Variant("abc"));
+ content.push_back(Variant(1234));
+ content.push_back(Variant("def"));
+ content.push_back(Variant(56.789));
+ content.encode();
+ sender.send(out);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ ListView view(in);
+ BOOST_CHECK_EQUAL(view.size(), content.size());
+ BOOST_CHECK_EQUAL(view.front().asString(), "abc");
+ BOOST_CHECK_EQUAL(view.back().asDouble(), 56.789);
+
+ ListView::const_iterator i = view.begin();
+ BOOST_CHECK(i != view.end());
+ BOOST_CHECK_EQUAL(i->asString(), "abc");
+ BOOST_CHECK(++i != view.end());
+ BOOST_CHECK_EQUAL(i->asInt64(), 1234);
+ BOOST_CHECK(++i != view.end());
+ BOOST_CHECK_EQUAL(i->asString(), "def");
+ BOOST_CHECK(++i != view.end());
+ BOOST_CHECK_EQUAL(i->asDouble(), 56.789);
+ BOOST_CHECK(++i == view.end());
+
+ fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testReject)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Message m1("reject-me");
+ sender.send(m1);
+ Message m2("accept-me");
+ sender.send(m2);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ BOOST_CHECK_EQUAL(in.getContent(), m1.getContent());
+ fix.session.reject(in);
+ in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ BOOST_CHECK_EQUAL(in.getContent(), m2.getContent());
+ fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testAvailable)
+{
+ MultiQueueFixture fix;
+
+ Receiver r1 = fix.session.createReceiver(fix.queues[0]);
+ r1.setCapacity(100);
+
+ Receiver r2 = fix.session.createReceiver(fix.queues[1]);
+ r2.setCapacity(100);
+
+ Sender s1 = fix.session.createSender(fix.queues[0]);
+ Sender s2 = fix.session.createSender(fix.queues[1]);
+
+ for (uint i = 0; i < 10; ++i) {
+ s1.send(Message((boost::format("A_%1%") % (i+1)).str()));
+ }
+ for (uint i = 0; i < 5; ++i) {
+ s2.send(Message((boost::format("B_%1%") % (i+1)).str()));
+ }
+ qpid::sys::sleep(1);//is there any avoid an arbitrary sleep while waiting for messages to be dispatched?
+ for (uint i = 0; i < 5; ++i) {
+ BOOST_CHECK_EQUAL(fix.session.available(), 15u - 2*i);
+ BOOST_CHECK_EQUAL(r1.available(), 10u - i);
+ BOOST_CHECK_EQUAL(r1.fetch().getContent(), (boost::format("A_%1%") % (i+1)).str());
+ BOOST_CHECK_EQUAL(r2.available(), 5u - i);
+ BOOST_CHECK_EQUAL(r2.fetch().getContent(), (boost::format("B_%1%") % (i+1)).str());
+ fix.session.acknowledge();
+ }
+ for (uint i = 5; i < 10; ++i) {
+ BOOST_CHECK_EQUAL(fix.session.available(), 10u - i);
+ BOOST_CHECK_EQUAL(r1.available(), 10u - i);
+ BOOST_CHECK_EQUAL(r1.fetch().getContent(), (boost::format("A_%1%") % (i+1)).str());
+ }
+}
+
+QPID_AUTO_TEST_CASE(testPendingAck)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ for (uint i = 0; i < 10; ++i) {
+ sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+ }
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ for (uint i = 0; i < 10; ++i) {
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), (boost::format("Message_%1%") % (i+1)).str());
+ }
+ BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u);
+ fix.session.acknowledge();
+ BOOST_CHECK_EQUAL(fix.session.pendingAck(), 10u);
+ fix.session.sync();
+ BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u);
+}
+
+QPID_AUTO_TEST_CASE(testPendingSend)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ send(sender, 10);
+ //Note: this test relies on 'inside knowledge' of the sender
+ //implementation and the fact that the simple test case makes it
+ //possible to predict when completion information will be sent to
+ //the client. TODO: is there a better way of testing this?
+ BOOST_CHECK_EQUAL(sender.pending(), 10u);
+ fix.session.sync();
+ BOOST_CHECK_EQUAL(sender.pending(), 0u);
+
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ receive(receiver, 10);
+ fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testBrowse)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ send(sender, 10);
+ Receiver browser1 = fix.session.createReceiver(fix.queue + "; {browse:true}");
+ receive(browser1, 10);
+ Receiver browser2 = fix.session.createReceiver(fix.queue + "; {browse:true}");
+ receive(browser2, 10);
+ Receiver consumer = fix.session.createReceiver(fix.queue);
+ receive(consumer, 10);
+ fix.session.acknowledge();
+}
+
+struct QueueCreatePolicyFixture : public MessagingFixture
+{
+ qpid::messaging::Address address;
+
+ QueueCreatePolicyFixture(const std::string& a) : address(a) {}
+
+ void test()
+ {
+ ping(address);
+ BOOST_CHECK(admin.checkQueueExists(address.getName()));
+ }
+
+ ~QueueCreatePolicyFixture()
+ {
+ admin.deleteQueue(address.getName());
+ }
+};
+
+QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways)
+{
+ QueueCreatePolicyFixture fix("#; {create:always, node-properties:{type:queue}}");
+ fix.test();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver)
+{
+ QueueCreatePolicyFixture fix("#; {create:receiver, node-properties:{type:queue}}");
+ Receiver r = fix.session.createReceiver(fix.address);
+ fix.test();
+ r.cancel();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender)
+{
+ QueueCreatePolicyFixture fix("#; {create:sender, node-properties:{type:queue}}");
+ Sender s = fix.session.createSender(fix.address);
+ fix.test();
+ s.cancel();
+}
+
+struct ExchangeCreatePolicyFixture : public MessagingFixture
+{
+ qpid::messaging::Address address;
+ const std::string exchangeType;
+
+ ExchangeCreatePolicyFixture(const std::string& a, const std::string& t) :
+ address(a), exchangeType(t) {}
+
+ void test()
+ {
+ ping(address);
+ std::string actualType;
+ BOOST_CHECK(admin.checkExchangeExists(address.getName(), actualType));
+ BOOST_CHECK_EQUAL(exchangeType, actualType);
+ }
+
+ ~ExchangeCreatePolicyFixture()
+ {
+ admin.deleteExchange(address.getName());
+ }
+};
+
+QPID_AUTO_TEST_CASE(testCreatePolicyTopic)
+{
+ ExchangeCreatePolicyFixture fix("#; {create:always, node-properties:{type:topic}}",
+ "topic");
+ fix.test();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout)
+{
+ ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node-properties:{type:topic, x-properties:{type:fanout}}}", "fanout");
+ Receiver r = fix.session.createReceiver(fix.address);
+ fix.test();
+ r.cancel();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect)
+{
+ ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node-properties:{type:topic, x-properties:{type:direct}}}", "direct");
+ Sender s = fix.session.createSender(fix.address);
+ fix.test();
+ s.cancel();
+}
+
+struct DeletePolicyFixture : public MessagingFixture
+{
+ enum Mode {RECEIVER, SENDER, ALWAYS, NEVER};
+
+ std::string getPolicy(Mode mode)
+ {
+ switch (mode) {
+ case SENDER:
+ return "{delete:sender}";
+ case RECEIVER:
+ return "{delete:receiver}";
+ case ALWAYS:
+ return "{delete:always}";
+ case NEVER:
+ return "{delete:never}";
+ }
+ return "";
+ }
+
+ void testAll()
+ {
+ test(RECEIVER);
+ test(SENDER);
+ test(ALWAYS);
+ test(NEVER);
+ }
+
+ virtual ~DeletePolicyFixture() {}
+ virtual void create(const qpid::messaging::Address&) = 0;
+ virtual void destroy(const qpid::messaging::Address&) = 0;
+ virtual bool exists(const qpid::messaging::Address&) = 0;
+
+ void test(Mode mode)
+ {
+ qpid::messaging::Address address("#; " + getPolicy(mode));
+ create(address);
+
+ Sender s = session.createSender(address);
+ Receiver r = session.createReceiver(address);
+ switch (mode) {
+ case RECEIVER:
+ s.cancel();
+ BOOST_CHECK(exists(address));
+ r.cancel();
+ BOOST_CHECK(!exists(address));
+ break;
+ case SENDER:
+ r.cancel();
+ BOOST_CHECK(exists(address));
+ s.cancel();
+ BOOST_CHECK(!exists(address));
+ break;
+ case ALWAYS:
+ s.cancel();
+ BOOST_CHECK(!exists(address));
+ break;
+ case NEVER:
+ r.cancel();
+ BOOST_CHECK(exists(address));
+ s.cancel();
+ BOOST_CHECK(exists(address));
+ destroy(address);
+ }
+ }
+};
+
+struct QueueDeletePolicyFixture : DeletePolicyFixture
+{
+ void create(const qpid::messaging::Address& address)
+ {
+ admin.createQueue(address.getName());
+ }
+ void destroy(const qpid::messaging::Address& address)
+ {
+ admin.deleteQueue(address.getName());
+ }
+ bool exists(const qpid::messaging::Address& address)
+ {
+ return admin.checkQueueExists(address.getName());
+ }
+};
+
+struct ExchangeDeletePolicyFixture : DeletePolicyFixture
+{
+ const std::string exchangeType;
+ ExchangeDeletePolicyFixture(const std::string type = "topic") : exchangeType(type) {}
+
+ void create(const qpid::messaging::Address& address)
+ {
+ admin.createExchange(address.getName(), exchangeType);
+ }
+ void destroy(const qpid::messaging::Address& address)
+ {
+ admin.deleteExchange(address.getName());
+ }
+ bool exists(const qpid::messaging::Address& address)
+ {
+ std::string actualType;
+ return admin.checkExchangeExists(address.getName(), actualType) && actualType == exchangeType;
+ }
+};
+
+QPID_AUTO_TEST_CASE(testDeletePolicyQueue)
+{
+ QueueDeletePolicyFixture fix;
+ fix.testAll();
+}
+
+QPID_AUTO_TEST_CASE(testDeletePolicyExchange)
+{
+ ExchangeDeletePolicyFixture fix;
+ fix.testAll();
+}
+
+QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
+{
+ MessagingFixture fix;
+ std::string a1 = "q; {create:always, assert:always, node-properties:{type:queue, durable:false, x-properties:{qpid.max-count:100}}}";
+ Sender s1 = fix.session.createSender(a1);
+ s1.cancel();
+ Receiver r1 = fix.session.createReceiver(a1);
+ r1.cancel();
+
+ std::string a2 = "q; {assert:receiver, node-properties:{durable:true, x-properties:{qpid.max-count:100}}}";
+ Sender s2 = fix.session.createSender(a2);
+ s2.cancel();
+ BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress);
+
+ std::string a3 = "q; {assert:sender, node-properties:{x-properties:{qpid.max-count:99}}}";
+ BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress);
+ Receiver r3 = fix.session.createReceiver(a3);
+ r3.cancel();
+
+ fix.admin.deleteQueue("q");
+}
+
+QPID_AUTO_TEST_CASE(testGetSender)
+{
+ QueueFixture fix;
+ std::string name = fix.session.createSender(fix.queue).getName();
+ Sender sender = fix.session.getSender(name);
+ BOOST_CHECK_EQUAL(name, sender.getName());
+ Message out(Uuid(true).str());
+ sender.send(out);
+ Message in;
+ BOOST_CHECK(fix.session.createReceiver(fix.queue).fetch(in));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ BOOST_CHECK_THROW(fix.session.getSender("UnknownSender"), qpid::messaging::KeyError);
+}
+
+QPID_AUTO_TEST_CASE(testGetReceiver)
+{
+ QueueFixture fix;
+ std::string name = fix.session.createReceiver(fix.queue).getName();
+ Receiver receiver = fix.session.getReceiver(name);
+ BOOST_CHECK_EQUAL(name, receiver.getName());
+ Message out(Uuid(true).str());
+ fix.session.createSender(fix.queue).send(out);
+ Message in;
+ BOOST_CHECK(receiver.fetch(in));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ BOOST_CHECK_THROW(fix.session.getReceiver("UnknownReceiver"), qpid::messaging::KeyError);
+}
+
+QPID_AUTO_TEST_CASE(testGetSessionFromConnection)
+{
+ QueueFixture fix;
+ fix.connection.newSession("my-session");
+ Session session = fix.connection.getSession("my-session");
+ Message out(Uuid(true).str());
+ session.createSender(fix.queue).send(out);
+ Message in;
+ BOOST_CHECK(session.createReceiver(fix.queue).fetch(in));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ BOOST_CHECK_THROW(fix.connection.getSession("UnknownSession"), qpid::messaging::KeyError);
+}
+
+QPID_AUTO_TEST_CASE(testGetConnectionFromSession)
+{
+ QueueFixture fix;
+ Message out(Uuid(true).str());
+ Sender sender = fix.session.createSender(fix.queue);
+ sender.send(out);
+ Message in;
+ sender.getSession().getConnection().newSession("incoming");
+ BOOST_CHECK(fix.connection.getSession("incoming").createReceiver(fix.queue).fetch(in));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+}
+
+QPID_AUTO_TEST_CASE(testTx)
+{
+ QueueFixture fix;
+ Session ssn1 = fix.connection.newSession(true);
+ Session ssn2 = fix.connection.newSession(true);
+ Sender sender1 = ssn1.createSender(fix.queue);
+ Sender sender2 = ssn2.createSender(fix.queue);
+ Receiver receiver1 = ssn1.createReceiver(fix.queue);
+ Receiver receiver2 = ssn2.createReceiver(fix.queue);
+ Message in;
+
+ send(sender1, 5, 1, "A");
+ send(sender2, 5, 1, "B");
+ ssn2.commit();
+ receive(receiver1, 5, 1, "B");//(only those from sender2 should be received)
+ BOOST_CHECK(!receiver1.fetch(in, 0));//check there are no more messages
+ ssn1.rollback();
+ receive(receiver2, 5, 1, "B");
+ BOOST_CHECK(!receiver2.fetch(in, 0));//check there are no more messages
+ ssn2.rollback();
+ receive(receiver1, 5, 1, "B");
+ BOOST_CHECK(!receiver1.fetch(in, 0));//check there are no more messages
+ ssn1.commit();
+ //check neither receiver gets any more messages:
+ BOOST_CHECK(!receiver1.fetch(in, 0));
+ BOOST_CHECK(!receiver2.fetch(in, 0));
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests