summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/examples/messaging/client.cpp2
-rw-r--r--cpp/examples/messaging/topic_listener.cpp11
-rw-r--r--cpp/examples/messaging/topic_receiver.cpp15
-rw-r--r--cpp/include/qpid/messaging/Address.h62
-rw-r--r--cpp/include/qpid/messaging/Filter.h48
-rw-r--r--cpp/include/qpid/messaging/Message.h2
-rw-r--r--cpp/include/qpid/messaging/Session.h16
-rw-r--r--cpp/include/qpid/messaging/Variant.h4
-rw-r--r--cpp/src/CMakeLists.txt1
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp726
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.h9
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.h2
-rw-r--r--cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp2
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp8
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.h7
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.cpp7
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.h4
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp53
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h16
-rw-r--r--cpp/src/qpid/messaging/Address.cpp291
-rw-r--r--cpp/src/qpid/messaging/Filter.cpp39
-rw-r--r--cpp/src/qpid/messaging/Session.cpp25
-rw-r--r--cpp/src/qpid/messaging/SessionImpl.h6
-rw-r--r--cpp/src/qpid/messaging/Variant.cpp1
-rw-r--r--cpp/src/tests/Address.cpp91
-rw-r--r--cpp/src/tests/Makefile.am3
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp303
28 files changed, 1296 insertions, 460 deletions
diff --git a/cpp/examples/messaging/client.cpp b/cpp/examples/messaging/client.cpp
index de6d7768df..aaac554675 100644
--- a/cpp/examples/messaging/client.cpp
+++ b/cpp/examples/messaging/client.cpp
@@ -46,7 +46,7 @@ int main(int argc, char** argv) {
Sender sender = session.createSender("service_queue");
//create temp queue & receiver...
- Address responseQueue = session.createTempQueue();
+ Address responseQueue("#response-queue {create:always, type:queue, node-properties:{x-amqp0-10-auto-delete:true}}");
Receiver receiver = session.createReceiver(responseQueue);
// Now send some messages ...
diff --git a/cpp/examples/messaging/topic_listener.cpp b/cpp/examples/messaging/topic_listener.cpp
index ba999c03a7..4c97caef7c 100644
--- a/cpp/examples/messaging/topic_listener.cpp
+++ b/cpp/examples/messaging/topic_listener.cpp
@@ -20,11 +20,11 @@
*/
#include <qpid/messaging/Connection.h>
-#include <qpid/messaging/Filter.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/MessageListener.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Variant.h>
#include <cstdlib>
#include <iostream>
@@ -57,15 +57,14 @@ void Listener::received(Message& message)
}
int main(int argc, char** argv) {
- const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
- const char* pattern = argc>2 ? argv[2] : "#.#";
+ const std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ const std::string pattern = argc>2 ? argv[2] : "#.#";
try {
Connection connection = Connection::open(url);
Session session = connection.newSession();
- Filter filter(Filter::WILDCARD, pattern, "control");
- Receiver receiver = session.createReceiver("news_service", filter);
+ Receiver receiver = session.createReceiver("news_service {filter:[control, " + pattern + "]}");
Listener listener(receiver);
receiver.setListener(&listener);
receiver.setCapacity(1);
@@ -78,5 +77,3 @@ int main(int argc, char** argv) {
}
return 1;
}
-
-
diff --git a/cpp/examples/messaging/topic_receiver.cpp b/cpp/examples/messaging/topic_receiver.cpp
index 7352a91b30..6f6c1a5677 100644
--- a/cpp/examples/messaging/topic_receiver.cpp
+++ b/cpp/examples/messaging/topic_receiver.cpp
@@ -19,32 +19,25 @@
*
*/
-#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
-#include <qpid/messaging/Filter.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Variant.h>
#include <cstdlib>
#include <iostream>
-#include <sstream>
-
using namespace qpid::messaging;
-using std::stringstream;
-using std::string;
-
int main(int argc, char** argv) {
- const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
- const char* pattern = argc>2 ? argv[2] : "#.#";
+ const std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ const std::string pattern = argc>2 ? argv[2] : "#.#";
try {
Connection connection = Connection::open(url);
Session session = connection.newSession();
- Filter filter(Filter::WILDCARD, pattern, "control");
- Receiver receiver = session.createReceiver(Address("news_service", "topic"), filter);
+ Receiver receiver = session.createReceiver("news_service {filter:[control, " + pattern + "]}");
while (true) {
Message message = receiver.fetch();
std::cout << "Message: " << message.getContent() << std::endl;
diff --git a/cpp/include/qpid/messaging/Address.h b/cpp/include/qpid/messaging/Address.h
index e66c52f4c2..f232af5d56 100644
--- a/cpp/include/qpid/messaging/Address.h
+++ b/cpp/include/qpid/messaging/Address.h
@@ -22,33 +22,71 @@
*
*/
#include <string>
+#include "qpid/Exception.h"
+#include "qpid/messaging/Variant.h"
#include "qpid/client/ClientImportExport.h"
#include <ostream>
namespace qpid {
-namespace client {
-}
-
namespace messaging {
+struct InvalidAddress : public qpid::Exception
+{
+ InvalidAddress(const std::string& msg);
+};
+
+struct MalformedAddress : public qpid::Exception
+{
+ MalformedAddress(const std::string& msg);
+};
+
+class AddressImpl;
+
/**
* Represents an address to which messages can be sent and from which
* messages can be received. Often a simple name is sufficient for
- * this. However this struct allows the type of address to be
- * specified allowing more sophisticated treatment if necessary.
+ * this, however this can be augmented with a subject pattern and
+ * options.
+ *
+ * All parts of an address can be specified in a string of the
+ * following form:
+ *
+ * <address> [ / <subject> ] [ { <key> : <value> , ... } ]
+ *
+ * Here the <address> is a simple name for the addressed entity and
+ * <subject> is a subject or subject pattern for messages sent to or
+ * received from this address. The options are specified as a series
+ * of key value pairs enclosed in curly brackets (denoting a map).
*/
-struct Address
+class Address
{
- std::string value;
- std::string type;
-
+ public:
QPID_CLIENT_EXTERN Address();
QPID_CLIENT_EXTERN Address(const std::string& address);
- QPID_CLIENT_EXTERN Address(const std::string& address, const std::string& type);
- QPID_CLIENT_EXTERN operator const std::string&() const;
- QPID_CLIENT_EXTERN const std::string& toStr() const;
+ QPID_CLIENT_EXTERN Address(const std::string& name, const std::string& subject,
+ const Variant::Map& options, const std::string& type = "");
+ QPID_CLIENT_EXTERN Address(const Address& address);
+ QPID_CLIENT_EXTERN ~Address();
+ Address& operator=(const Address&);
+ QPID_CLIENT_EXTERN const std::string& getName() const;
+ QPID_CLIENT_EXTERN void setName(const std::string&);
+ QPID_CLIENT_EXTERN const std::string& getSubject() const;
+ QPID_CLIENT_EXTERN void setSubject(const std::string&);
+ QPID_CLIENT_EXTERN bool hasSubject() const;
+ QPID_CLIENT_EXTERN const Variant::Map& getOptions() const;
+ QPID_CLIENT_EXTERN Variant::Map& getOptions();
+ QPID_CLIENT_EXTERN void setOptions(const Variant::Map&);
+
+ QPID_CLIENT_EXTERN std::string getType() const;
+ QPID_CLIENT_EXTERN void setType(const std::string&);
+
+ QPID_CLIENT_EXTERN const Variant& getOption(const std::string& key) const;
+
+ QPID_CLIENT_EXTERN std::string toStr() const;
QPID_CLIENT_EXTERN operator bool() const;
QPID_CLIENT_EXTERN bool operator !() const;
+ private:
+ AddressImpl* impl;
};
QPID_CLIENT_EXTERN std::ostream& operator<<(std::ostream& out, const Address& address);
diff --git a/cpp/include/qpid/messaging/Filter.h b/cpp/include/qpid/messaging/Filter.h
deleted file mode 100644
index 5cd844cf73..0000000000
--- a/cpp/include/qpid/messaging/Filter.h
+++ /dev/null
@@ -1,48 +0,0 @@
-#ifndef QPID_MESSAGING_FILTER_H
-#define QPID_MESSAGING_FILTER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-#include <vector>
-#include "qpid/client/ClientImportExport.h"
-
-namespace qpid {
-namespace client {
-}
-
-namespace messaging {
-
-struct Filter
-{
- std::string type;
- std::vector<std::string> patterns;
-
- QPID_CLIENT_EXTERN Filter(std::string type, std::string pattern);
- QPID_CLIENT_EXTERN Filter(std::string type, std::string pattern1, std::string pattern2);
-
- static QPID_CLIENT_EXTERN const std::string WILDCARD;
- static QPID_CLIENT_EXTERN const std::string EXACT_MATCH;
-};
-
-}} // namespace qpid::messaging
-
-#endif /*!QPID_MESSAGING_FILTER_H*/
diff --git a/cpp/include/qpid/messaging/Message.h b/cpp/include/qpid/messaging/Message.h
index 4477d5a2e9..1acccecad0 100644
--- a/cpp/include/qpid/messaging/Message.h
+++ b/cpp/include/qpid/messaging/Message.h
@@ -32,7 +32,7 @@ namespace client {
namespace messaging {
-struct Address;
+class Address;
class Codec;
struct MessageImpl;
diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h
index 979e27adae..4e3f950ef3 100644
--- a/cpp/include/qpid/messaging/Session.h
+++ b/cpp/include/qpid/messaging/Session.h
@@ -24,7 +24,7 @@
#include "qpid/client/ClientImportExport.h"
#include "qpid/client/Handle.h"
#include "qpid/sys/Time.h"
-#include "Variant.h"
+#include <string>
namespace qpid {
namespace client {
@@ -35,8 +35,7 @@ template <class> class PrivateImplRef;
namespace messaging {
-struct Address;
-struct Filter;
+class Address;
class Message;
class MessageListener;
class Sender;
@@ -90,13 +89,10 @@ class Session : public qpid::client::Handle<SessionImpl>
QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
- QPID_CLIENT_EXTERN Sender createSender(const Address& address, const VariantMap& options = VariantMap());
- QPID_CLIENT_EXTERN Sender createSender(const std::string& address, const VariantMap& options = VariantMap());
-
- QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address, const VariantMap& options = VariantMap());
- QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options = VariantMap());
- QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const VariantMap& options = VariantMap());
- QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const Filter& filter, const VariantMap& options = VariantMap());
+ QPID_CLIENT_EXTERN Sender createSender(const Address& address);
+ QPID_CLIENT_EXTERN Sender createSender(const std::string& address);
+ QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address);
+ QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address);
QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = std::string());
private:
diff --git a/cpp/include/qpid/messaging/Variant.h b/cpp/include/qpid/messaging/Variant.h
index 1e51914794..c63138178b 100644
--- a/cpp/include/qpid/messaging/Variant.h
+++ b/cpp/include/qpid/messaging/Variant.h
@@ -30,9 +30,6 @@
#include "qpid/client/ClientImportExport.h"
namespace qpid {
-namespace client {
-}
-
namespace messaging {
/**
@@ -93,6 +90,7 @@ class Variant
QPID_CLIENT_EXTERN ~Variant();
QPID_CLIENT_EXTERN VariantType getType() const;
+ QPID_CLIENT_EXTERN bool isVoid() const;
QPID_CLIENT_EXTERN Variant& operator=(bool);
QPID_CLIENT_EXTERN Variant& operator=(uint8_t);
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index df2a4ed6e7..3b81430852 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -594,7 +594,6 @@ set (qpidclient_SOURCES
qpid/messaging/Address.cpp
qpid/messaging/Connection.cpp
qpid/messaging/ConnectionImpl.h
- qpid/messaging/Filter.cpp
qpid/messaging/ListContent.cpp
qpid/messaging/ListView.cpp
qpid/messaging/MapContent.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 6434b96e7b..4b859cda47 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -686,7 +686,6 @@ libqpidclient_la_SOURCES = \
qpid/client/SubscriptionManagerImpl.h \
qpid/messaging/Address.cpp \
qpid/messaging/Connection.cpp \
- qpid/messaging/Filter.cpp \
qpid/messaging/ListContent.cpp \
qpid/messaging/ListView.cpp \
qpid/messaging/MapContent.cpp \
@@ -795,7 +794,6 @@ nobase_include_HEADERS += \
../include/qpid/messaging/Address.h \
../include/qpid/messaging/Connection.h \
../include/qpid/messaging/Codec.h \
- ../include/qpid/messaging/Filter.h \
../include/qpid/messaging/ListContent.h \
../include/qpid/messaging/ListView.h \
../include/qpid/messaging/MapContent.h \
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index f51a96efd9..14b5448a34 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -20,18 +20,24 @@
*/
#include "qpid/client/amqp0_10/AddressResolution.h"
#include "qpid/client/amqp0_10/Codecs.h"
+#include "qpid/client/amqp0_10/CodecsInternal.h"
#include "qpid/client/amqp0_10/MessageSource.h"
#include "qpid/client/amqp0_10/MessageSink.h"
#include "qpid/client/amqp0_10/OutgoingMessage.h"
#include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Variant.h"
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/enum.h"
+#include "qpid/framing/ExchangeQueryResult.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/QueueQueryResult.h"
#include "qpid/framing/ReplyTo.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/Uuid.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
namespace qpid {
namespace client {
@@ -40,61 +46,145 @@ namespace amqp0_10 {
using qpid::Exception;
using qpid::messaging::Address;
using qpid::messaging::Filter;
+using qpid::messaging::InvalidAddress;
using qpid::messaging::Variant;
+using qpid::framing::ExchangeQueryResult;
using qpid::framing::FieldTable;
+using qpid::framing::QueueQueryResult;
using qpid::framing::ReplyTo;
+using qpid::framing::Uuid;
using namespace qpid::framing::message;
+using namespace boost::assign;
namespace{
-const Variant EMPTY_VARIANT;
const FieldTable EMPTY_FIELD_TABLE;
const std::string EMPTY_STRING;
//option names
const std::string BROWSE("browse");
const std::string EXCLUSIVE("exclusive");
-const std::string MODE("mode");
-const std::string NAME("name");
-const std::string UNACKNOWLEDGED("unacknowledged");
+const std::string NO_LOCAL("no-local");
+const std::string FILTER("filter");
+const std::string RELIABILITY("reliability");
+const std::string NAME("subscription-name");
+const std::string NODE_PROPERTIES("node-properties");
+
+//policy types
+const std::string CREATE("create");
+const std::string ASSERT("assert");
+const std::string DELETE("delete");
+//policy values
+const std::string ALWAYS("always");
+const std::string NEVER("never");
+const std::string RECEIVER("receiver");
+const std::string SENDER("sender");
const std::string QUEUE_ADDRESS("queue");
const std::string TOPIC_ADDRESS("topic");
-const std::string TOPIC_ADDRESS_AND_SUBJECT("topic+");
-const std::string DIVIDER("/");
-const std::string SIMPLE_SUBSCRIPTION("simple");
-const std::string RELIABLE_SUBSCRIPTION("reliable");
+const std::string UNRELIABLE("unreliable");
+const std::string AT_MOST_ONCE("at-most-once");
+const std::string AT_LEAST_ONCE("at-least-once");
+const std::string EXACTLY_ONCE("exactly-once");
const std::string DURABLE_SUBSCRIPTION("durable");
+const std::string DURABLE("durable");
+
+const std::string TOPIC_EXCHANGE("topic");
+const std::string FANOUT_EXCHANGE("fanout");
+const std::string DIRECT_EXCHANGE("direct");
+const std::string HEADERS_EXCHANGE("headers");
+const std::string XML_EXCHANGE("xml");
+const std::string WILDCARD_ANY("*");
}
-class QueueSource : public MessageSource
+//some amqp 0-10 specific options
+namespace xamqp{
+const std::string AUTO_DELETE("x-amqp0-10-auto-delete");
+const std::string EXCHANGE_TYPE("x-amqp0-10-exchange-type");
+const std::string EXCLUSIVE("x-amqp0-10-exclusive");
+const std::string ALTERNATE_EXCHANGE("x-amqp0-10-alternate-exchange");
+const std::string ARGUMENTS("x-amqp0-10-arguments");
+const std::string QUEUE_ARGUMENTS("x-amqp0-10-queue-arguments");
+const std::string SUBSCRIBE_ARGUMENTS("x-amqp0-10-queue-arguments");
+}
+
+class Node
+{
+ protected:
+ enum CheckMode {FOR_RECEIVER, FOR_SENDER};
+
+ Node(const Address& address);
+
+ const std::string name;
+ Variant createPolicy;
+ Variant assertPolicy;
+ Variant deletePolicy;
+
+ static bool enabled(const Variant& policy, CheckMode mode);
+ static bool createEnabled(const Address& address, CheckMode mode);
+ static void convert(const Variant& option, FieldTable& arguments);
+ static std::vector<std::string> RECEIVER_MODES;
+ static std::vector<std::string> SENDER_MODES;
+};
+
+class Queue : protected Node
{
public:
- QueueSource(const std::string& name, AcceptMode=ACCEPT_MODE_EXPLICIT, AcquireMode=ACQUIRE_MODE_PRE_ACQUIRED,
- bool exclusive = false, const FieldTable& options = EMPTY_FIELD_TABLE);
+ Queue(const Address& address);
+ protected:
+ void checkCreate(qpid::client::AsyncSession&, CheckMode);
+ void checkAssert(qpid::client::AsyncSession&, CheckMode);
+ void checkDelete(qpid::client::AsyncSession&, CheckMode);
+ private:
+ bool durable;
+ bool autoDelete;
+ bool exclusive;
+ std::string alternateExchange;
+ FieldTable arguments;
+
+ void configure(const Address&);
+};
+
+class Exchange : protected Node
+{
+ public:
+ Exchange(const Address& address);
+ protected:
+ void checkCreate(qpid::client::AsyncSession&, CheckMode);
+ void checkAssert(qpid::client::AsyncSession&, CheckMode);
+ void checkDelete(qpid::client::AsyncSession&, CheckMode);
+ const std::string& getDesiredExchangeType() { return type; }
+
+ private:
+ std::string type;
+ bool durable;
+ bool autoDelete;
+ std::string alternateExchange;
+ FieldTable arguments;
+
+ void configure(const Address&);
+};
+
+class QueueSource : public Queue, public MessageSource
+{
+ public:
+ QueueSource(const Address& address);
void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
void cancel(qpid::client::AsyncSession& session, const std::string& destination);
private:
- const std::string name;
const AcceptMode acceptMode;
const AcquireMode acquireMode;
const bool exclusive;
- const FieldTable options;
+ FieldTable options;
};
-class Subscription : public MessageSource
+class Subscription : public Exchange, public MessageSource
{
public:
- enum SubscriptionMode {SIMPLE, RELIABLE, DURABLE};
-
- Subscription(const std::string& name, SubscriptionMode mode = SIMPLE,
- const FieldTable& queueOptions = EMPTY_FIELD_TABLE, const FieldTable& subscriptionOptions = EMPTY_FIELD_TABLE);
- void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+ Subscription(const Address&, const std::string& exchangeType);
void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
void cancel(qpid::client::AsyncSession& session, const std::string& destination);
-
- static SubscriptionMode getMode(const std::string& mode);
private:
struct Binding
{
@@ -107,155 +197,138 @@ class Subscription : public MessageSource
typedef std::vector<Binding> Bindings;
- const std::string name;
- const bool autoDelete;
+ const std::string queue;
+ const bool reliable;
const bool durable;
- const FieldTable queueOptions;
- const FieldTable subscriptionOptions;
+ FieldTable queueOptions;
+ FieldTable subscriptionOptions;
Bindings bindings;
- std::string queue;
+
+ void bindSpecial(const std::string& exchangeType);
+ void bind(const Variant& filter);
+ void bind(const Variant::Map& filter);
+ void bind(const Variant::List& filter);
+ void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
+ static std::string getSubscriptionName(const std::string& base, const Variant& name);
};
-class Exchange : public MessageSink
+class ExchangeSink : public Exchange, public MessageSink
{
public:
- Exchange(const std::string& name, const std::string& defaultSubject = EMPTY_STRING,
- bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false,
- const FieldTable& options = EMPTY_FIELD_TABLE);
+ ExchangeSink(const Address& name);
void declare(qpid::client::AsyncSession& session, const std::string& name);
void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
void cancel(qpid::client::AsyncSession& session, const std::string& name);
private:
- const std::string name;
const std::string defaultSubject;
- const bool passive;
- const std::string type;
- const bool durable;
- const FieldTable options;
};
-class QueueSink : public MessageSink
+class QueueSink : public Queue, public MessageSink
{
public:
- QueueSink(const std::string& name, bool passive=true, bool exclusive=false,
- bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE);
+ QueueSink(const Address& name);
void declare(qpid::client::AsyncSession& session, const std::string& name);
void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
void cancel(qpid::client::AsyncSession& session, const std::string& name);
private:
- const std::string name;
- const bool passive;
- const bool exclusive;
- const bool autoDelete;
- const bool durable;
- const FieldTable options;
};
bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address);
-bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject);
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address);
-const Variant& getOption(const std::string& key, const Variant::Map& options)
+bool in(const Variant& value, const std::vector<std::string>& choices)
{
- Variant::Map::const_iterator i = options.find(key);
- if (i == options.end()) return EMPTY_VARIANT;
- else return i->second;
+ if (!value.isVoid()) {
+ for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) {
+ if (value.asString() == *i) return true;
+ }
+ }
+ return false;
+}
+
+bool getReceiverPolicy(const Address& address, const std::string& key)
+{
+ return in(address.getOption(key), list_of<std::string>(ALWAYS)(RECEIVER));
+}
+
+bool getSenderPolicy(const Address& address, const std::string& key)
+{
+ return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER));
+}
+
+bool is_unreliable(const Address& address)
+{
+ return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
+}
+
+bool is_reliable(const Address& address)
+{
+ return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
}
std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session,
- const Address& address,
- const Filter* filter,
- const Variant::Map& options)
+ const Address& address)
{
//TODO: handle case where there exists a queue and an exchange of
//the same name (hence an unqualified address is ambiguous)
//TODO: make sure specified address type gives sane error message
- //if it does npt match the configuration on server
+ //if it does not match the configuration on server
+
+ /**
+ if (Node::createEnabled(address, FOR_RECEIVER)) {
+ } else {
+ }
+ **/
if (isQueue(session, address)) {
- //TODO: support auto-created queue as source, if requested by specific option
-
- AcceptMode accept = getOption(UNACKNOWLEDGED, options).asBool() ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
- AcquireMode acquire = getOption(BROWSE, options).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED;
- bool exclusive = getOption(EXCLUSIVE, options).asBool();
- FieldTable arguments;
- //TODO: extract subscribe arguments from options (e.g. either
- //filter out already processed keys and send the rest, or have
- //a nested map)
-
- std::auto_ptr<MessageSource> source =
- std::auto_ptr<MessageSource>(new QueueSource(address.value, accept, acquire, exclusive, arguments));
+ std::auto_ptr<MessageSource> source(new QueueSource(address));
+ QPID_LOG(debug, "resolved source address as queue: " << address);
return source;
} else {
- //TODO: extract queue options (e.g. no-local) and subscription options (e.g. less important)
- std::auto_ptr<Subscription> bindings =
- std::auto_ptr<Subscription>(new Subscription(getOption(NAME, options).asString(),
- Subscription::getMode(getOption(MODE, options).asString())));
-
- qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.value);
- if (result.getNotFound()) {
- throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
- } else if (result.getType() == "topic") {
- if (filter) {
- if (filter->type != Filter::WILDCARD) {
- throw qpid::framing::NotImplementedException(
- QPID_MSG("Filters of type " << filter->type << " not supported by address " << address));
-
- }
- for (std::vector<std::string>::const_iterator i = filter->patterns.begin(); i != filter->patterns.end(); i++) {
- bindings->add(address.value, *i, qpid::framing::FieldTable());
- }
- } else {
- //default is to receive all messages
- bindings->add(address.value, "*", qpid::framing::FieldTable());
- }
- } else if (result.getType() == "fanout") {
- if (filter) {
- throw qpid::framing::NotImplementedException(QPID_MSG("Filters are not supported by address " << address));
- }
- bindings->add(address.value, address.value, qpid::framing::FieldTable());
- } else if (result.getType() == "direct") {
- //TODO: ????
- } else {
- //TODO: xml and headers exchanges
- throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised for " << address));
- }
- std::auto_ptr<MessageSource> source = std::auto_ptr<MessageSource>(bindings.release());
+ qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName());
+ std::auto_ptr<MessageSource> source(new Subscription(address, result.getType()));
+ QPID_LOG(debug, "resolved source address as topic: " << address);
return source;
}
}
std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session,
- const qpid::messaging::Address& address,
- const qpid::messaging::Variant::Map& /*options*/)
+ const qpid::messaging::Address& address)
{
std::auto_ptr<MessageSink> sink;
if (isQueue(session, address)) {
- //TODO: support for auto-created queues as sink
- sink = std::auto_ptr<MessageSink>(new QueueSink(address.value));
+ sink = std::auto_ptr<MessageSink>(new QueueSink(address));
} else {
- std::string subject;
- if (isTopic(session, address, subject)) {
- //TODO: support for auto-created exchanges as sink
- sink = std::auto_ptr<MessageSink>(new Exchange(address.value, subject));
+ if (isTopic(session, address)) {
+ sink = std::auto_ptr<MessageSink>(new ExchangeSink(address));
} else {
- if (address.type.empty()) {
- throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address));
+ if (address.getType().empty()) {
+ throw InvalidAddress(QPID_MSG("Address not known: " << address));
} else {
- throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised: " << address.type));
+ throw InvalidAddress(QPID_MSG("Address type not recognised: " << address.getType()));
}
}
}
return sink;
}
-QueueSource::QueueSource(const std::string& _name, AcceptMode _acceptMode, AcquireMode _acquireMode, bool _exclusive, const FieldTable& _options) :
- name(_name), acceptMode(_acceptMode), acquireMode(_acquireMode), exclusive(_exclusive), options(_options) {}
+QueueSource::QueueSource(const Address& address) :
+ Queue(address),
+ acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT),
+ acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
+ exclusive(address.getOption(EXCLUSIVE).asBool())
+{
+ //extract subscription arguments from address options
+ convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), options);
+}
void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
{
+ checkCreate(session, FOR_RECEIVER);
+ checkAssert(session, FOR_RECEIVER);
session.messageSubscribe(arg::queue=name,
arg::destination=destination,
arg::acceptMode=acceptMode,
@@ -267,11 +340,48 @@ void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::stri
void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
session.messageCancel(destination);
+ checkDelete(session, FOR_RECEIVER);
}
-Subscription::Subscription(const std::string& _name, SubscriptionMode mode, const FieldTable& qOptions, const FieldTable& sOptions)
- : name(_name), autoDelete(mode == SIMPLE), durable(mode == DURABLE),
- queueOptions(qOptions), subscriptionOptions(sOptions) {}
+std::string Subscription::getSubscriptionName(const std::string& base, const Variant& name)
+{
+ if (name.isVoid()) {
+ return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
+ } else {
+ return (boost::format("%1%_%2%") % base % name.asString()).str();
+ }
+}
+
+Subscription::Subscription(const Address& address, const std::string& exchangeType)
+ : Exchange(address),
+ queue(getSubscriptionName(name, address.getOption(NAME))),
+ reliable(is_reliable(address)),
+ durable(address.getOption(DURABLE_SUBSCRIPTION).asBool())
+{
+ if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1);
+ convert(address.getOption(xamqp::QUEUE_ARGUMENTS), queueOptions);
+ convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), subscriptionOptions);
+
+ const Variant& filter = address.getOption(FILTER);
+ if (!filter.isVoid()) {
+ //TODO: if both subject _and_ filter are specified,
+ //combine in some way; for now we just ignore the
+ //subject in that case.
+ bind(filter);
+ } else if (address.hasSubject()) {
+ //Note: This will not work for headers- or xml- exchange;
+ //fanout exchange will do no filtering.
+ //TODO: for headers- or xml- exchange can construct a match
+ //for the subject in the application-headers
+ bind(address.getSubject());
+ } else {
+ //Neither a subject nor a filter has been defined, treat this
+ //as wanting to match all messages (Note: direct exchange is
+ //currently unable to support this case).
+ if (!exchangeType.empty()) bindSpecial(exchangeType);
+ else if (!getDesiredExchangeType().empty()) bindSpecial(getDesiredExchangeType());
+ }
+}
void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options)
{
@@ -280,18 +390,19 @@ void Subscription::add(const std::string& exchange, const std::string& key, cons
void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
{
- if (name.empty()) {
- //TODO: use same scheme as JMS client for subscription queue name generation?
- queue = session.getId().getName() + destination;
- } else {
- queue = name;
- }
+ //create exchange if required and specified by policy:
+ checkCreate(session, FOR_RECEIVER);
+ checkAssert(session, FOR_RECEIVER);
+
+ //create subscription queue:
session.queueDeclare(arg::queue=queue, arg::exclusive=true,
- arg::autoDelete=autoDelete, arg::durable=durable, arg::arguments=queueOptions);
+ arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
+ //bind subscription queue to exchange:
for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options);
}
- AcceptMode accept = autoDelete ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT;
+ //subscribe to subscription queue:
+ AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
session.messageSubscribe(arg::queue=queue, arg::destination=destination,
arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
}
@@ -300,36 +411,23 @@ void Subscription::cancel(qpid::client::AsyncSession& session, const std::string
{
session.messageCancel(destination);
session.queueDelete(arg::queue=queue);
+ checkDelete(session, FOR_RECEIVER);
}
Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
exchange(e), key(k), options(o) {}
-Subscription::SubscriptionMode Subscription::getMode(const std::string& s)
-{
- if (s.empty() || s == SIMPLE_SUBSCRIPTION) return SIMPLE;
- else if (s == RELIABLE_SUBSCRIPTION) return RELIABLE;
- else if (s == DURABLE_SUBSCRIPTION) return DURABLE;
- else throw Exception(QPID_MSG("Unrecognised subscription mode: " << s));
-}
-
void convert(qpid::messaging::Message& from, qpid::client::Message& to);
-Exchange::Exchange(const std::string& _name, const std::string& _defaultSubject,
- bool _passive, const std::string& _type, bool _durable, const FieldTable& _options) :
- name(_name), defaultSubject(_defaultSubject), passive(_passive), type(_type), durable(_durable), options(_options) {}
+ExchangeSink::ExchangeSink(const Address& address) : Exchange(address), defaultSubject(address.getSubject()) {}
-void Exchange::declare(qpid::client::AsyncSession& session, const std::string&)
+void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&)
{
- //TODO: should this really by synchronous? want to get error if not valid...
- if (passive) {
- sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
- } else {
- sync(session).exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::arguments=options);
- }
+ checkCreate(session, FOR_SENDER);
+ checkAssert(session, FOR_SENDER);
}
-void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
+void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
m.message.getDeliveryProperties().setRoutingKey(defaultSubject);
@@ -337,22 +435,17 @@ void Exchange::send(qpid::client::AsyncSession& session, const std::string&, Out
m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
}
-void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {}
+void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
+{
+ checkDelete(session, FOR_SENDER);
+}
-QueueSink::QueueSink(const std::string& _name, bool _passive, bool _exclusive,
- bool _autoDelete, bool _durable, const FieldTable& _options) :
- name(_name), passive(_passive), exclusive(_exclusive),
- autoDelete(_autoDelete), durable(_durable), options(_options) {}
+QueueSink::QueueSink(const Address& address) : Queue(address) {}
void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&)
{
- //TODO: should this really by synchronous?
- if (passive) {
- sync(session).queueDeclare(arg::queue=name, arg::passive=true);
- } else {
- sync(session).queueDeclare(arg::queue=name, arg::exclusive=exclusive, arg::durable=durable,
- arg::autoDelete=autoDelete, arg::arguments=options);
- }
+ checkCreate(session, FOR_SENDER);
+ checkAssert(session, FOR_SENDER);
}
void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
@@ -360,9 +453,10 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou
m.status = session.messageTransfer(arg::content=m.message);
}
-void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {}
-
-void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp
+void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
+{
+ checkDelete(session, FOR_SENDER);
+}
void convert(qpid::messaging::Message& from, qpid::client::Message& to)
{
@@ -372,7 +466,7 @@ void convert(qpid::messaging::Message& from, qpid::client::Message& to)
//TODO: set other delivery properties
to.getMessageProperties().setContentType(from.getContentType());
const Address& address = from.getReplyTo();
- if (!address.value.empty()) {
+ if (!address.getName().empty()) {
to.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders());
@@ -381,72 +475,292 @@ void convert(qpid::messaging::Message& from, qpid::client::Message& to)
Address AddressResolution::convert(const qpid::framing::ReplyTo& rt)
{
- if (rt.getExchange().empty()) {
- if (rt.getRoutingKey().empty()) {
- return Address();//empty address
- } else {
- return Address(rt.getRoutingKey(), QUEUE_ADDRESS);
- }
+ Address address;
+ if (rt.getExchange().empty()) {//if default exchange, treat as queue
+ address.setName(rt.getRoutingKey());
+ address.setType(QUEUE_ADDRESS);
} else {
- if (rt.getRoutingKey().empty()) {
- return Address(rt.getExchange(), TOPIC_ADDRESS);
- } else {
- return Address(rt.getExchange() + DIVIDER + rt.getRoutingKey(), TOPIC_ADDRESS_AND_SUBJECT);
- }
- }
+ address.setName(rt.getExchange());
+ address.setSubject(rt.getRoutingKey());
+ address.setType(TOPIC_ADDRESS);
+ }
+ return address;
}
qpid::framing::ReplyTo AddressResolution::convert(const Address& address)
{
- if (address.type == QUEUE_ADDRESS || address.type.empty()) {
- return ReplyTo(EMPTY_STRING, address.value);
- } else if (address.type == TOPIC_ADDRESS) {
- return ReplyTo(address.value, EMPTY_STRING);
- } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
- //need to split the value
- string::size_type i = address.value.find(DIVIDER);
- if (i != string::npos) {
- std::string exchange = address.value.substr(0, i);
- std::string routingKey;
- if (i+1 < address.value.size()) {
- routingKey = address.value.substr(i+1);
- }
- return ReplyTo(exchange, routingKey);
- } else {
- return ReplyTo(address.value, EMPTY_STRING);
- }
+ if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) {
+ return ReplyTo(EMPTY_STRING, address.getName());
+ } else if (address.getType() == TOPIC_ADDRESS) {
+ return ReplyTo(address.getName(), address.getSubject());
} else {
- QPID_LOG(notice, "Unrecognised type for reply-to: " << address.type);
- //treat as queue
- return ReplyTo(EMPTY_STRING, address.value);
+ QPID_LOG(notice, "Unrecognised type for reply-to: " << address.getType());
+ return ReplyTo(EMPTY_STRING, address.getName());//treat as queue
}
}
bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address)
{
- return address.type == QUEUE_ADDRESS ||
- (address.type.empty() && session.queueQuery(address.value).getQueue() == address.value);
+ return address.getType() == QUEUE_ADDRESS ||
+ (address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName());
}
-bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject)
+bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address)
{
- if (address.type.empty()) {
- return !session.exchangeQuery(address.value).getNotFound();
- } else if (address.type == TOPIC_ADDRESS) {
- return true;
- } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) {
- string::size_type i = address.value.find(DIVIDER);
- if (i != string::npos) {
- std::string exchange = address.value.substr(0, i);
- if (i+1 < address.value.size()) {
- subject = address.value.substr(i+1);
- }
- }
+ if (address.getType().empty()) {
+ return !session.exchangeQuery(address.getName()).getNotFound();
+ } else if (address.getType() == TOPIC_ADDRESS) {
return true;
} else {
return false;
}
}
+void Subscription::bind(const Variant& filter)
+{
+ switch (filter.getType()) {
+ case qpid::messaging::VAR_MAP:
+ bind(filter.asMap());
+ break;
+ case qpid::messaging::VAR_LIST:
+ bind(filter.asList());
+ break;
+ default:
+ add(name, filter.asString());
+ break;
+ }
+}
+
+void Subscription::bind(const Variant::Map& filter)
+{
+ qpid::framing::FieldTable arguments;
+ translate(filter, arguments);
+ add(name, queue, arguments);
+}
+
+void Subscription::bind(const Variant::List& filter)
+{
+ for (Variant::List::const_iterator i = filter.begin(); i != filter.end(); ++i) {
+ bind(*i);
+ }
+}
+
+void Subscription::bindSpecial(const std::string& exchangeType)
+{
+ if (exchangeType == TOPIC_EXCHANGE) {
+ add(name, WILDCARD_ANY);
+ } else if (exchangeType == FANOUT_EXCHANGE) {
+ add(name, queue);
+ } else if (exchangeType == HEADERS_EXCHANGE) {
+ //TODO: add special binding for headers exchange to match all messages
+ } else if (exchangeType == XML_EXCHANGE) {
+ //TODO: add special binding for xml exchange to match all messages
+ } else { //E.g. direct
+ throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << exchangeType));
+ }
+}
+
+Node::Node(const Address& address) : name(address.getName()),
+ createPolicy(address.getOption(CREATE)),
+ assertPolicy(address.getOption(ASSERT)),
+ deletePolicy(address.getOption(DELETE)) {}
+
+Queue::Queue(const Address& a) : Node(a),
+ durable(false),
+ autoDelete(false),
+ exclusive(false)
+{
+ configure(a);
+}
+
+void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(createPolicy, mode)) {
+ QPID_LOG(debug, "Auto-creating queue '" << name << "'");
+ try {
+ sync(session).queueDeclare(arg::queue=name,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::exclusive=exclusive,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str());
+ }
+ } else {
+ try {
+ sync(session).queueDeclare(arg::queue=name, arg::passive=true);
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Queue %1% does not exist; %2%") % name % e.what()).str());
+ }
+ }
+}
+
+void Queue::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(deletePolicy, mode)) {
+ QPID_LOG(debug, "Auto-deleting queue '" << name << "'");
+ sync(session).queueDelete(arg::queue=name);
+ }
+}
+
+void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(assertPolicy, mode)) {
+ QueueQueryResult result = sync(session).queueQuery(name);
+ if (result.getQueue() != name) {
+ throw InvalidAddress((boost::format("Queue not found: %1%") % name).str());
+ } else {
+ if (durable && !result.getDurable()) {
+ throw InvalidAddress((boost::format("Queue not durable: %1%") % name).str());
+ }
+ if (autoDelete && !result.getAutoDelete()) {
+ throw InvalidAddress((boost::format("Queue not set to auto-delete: %1%") % name).str());
+ }
+ if (exclusive && !result.getExclusive()) {
+ throw InvalidAddress((boost::format("Queue not exclusive: %1%") % name).str());
+ }
+ if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) {
+ throw InvalidAddress((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
+ % name % alternateExchange % result.getAlternateExchange()).str());
+ }
+ for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
+ FieldTable::ValuePtr v = result.getArguments().get(i->first);
+ if (!v) {
+ throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ } else if (*i->second != *v) {
+ throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ % i->first % name % *(i->second) % *v).str());
+ }
+ }
+ }
+ }
+}
+
+void Queue::configure(const Address& address)
+{
+ const Variant& properties = address.getOption(NODE_PROPERTIES);
+ if (!properties.isVoid()) {
+ Variant::Map p = properties.asMap();
+ durable = p[DURABLE];
+ autoDelete = p[xamqp::AUTO_DELETE];
+ exclusive = p[xamqp::EXCLUSIVE];
+ alternateExchange = p[xamqp::ALTERNATE_EXCHANGE].asString();
+ if (!p[xamqp::ARGUMENTS].isVoid()) {
+ translate(p[xamqp::ARGUMENTS].asMap(), arguments);
+ }
+ }
+}
+
+Exchange::Exchange(const Address& a) : Node(a),
+ durable(false),
+ autoDelete(false)
+{
+ configure(a);
+}
+
+void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(createPolicy, mode)) {
+ try {
+ sync(session).exchangeDeclare(arg::exchange=name,
+ arg::type=type,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str());
+ }
+ } else {
+ try {
+ sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Exchange %1% does not exist; %2%") % name % e.what()).str());
+ }
+ }
+}
+
+void Exchange::checkDelete(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(deletePolicy, mode)) {
+ sync(session).exchangeDelete(arg::exchange=name);
+ }
+}
+
+void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
+{
+ if (enabled(assertPolicy, mode)) {
+ ExchangeQueryResult result = sync(session).exchangeQuery(arg::exchange=name);
+ if (result.getNotFound()) {
+ throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
+ } else {
+ if (!type.empty() && result.getType() != type) {
+ throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
+ % name % type % result.getType()).str());
+ }
+ if (durable && !result.getDurable()) {
+ throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str());
+ }
+ //Note: Can't check auto-delete or alternate-exchange via
+ //exchange-query-result as these are not returned
+ //TODO: could use a passive declare to check alternate-exchange
+ for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
+ FieldTable::ValuePtr v = result.getArguments().get(i->first);
+ if (!v) {
+ throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ } else if (i->second != v) {
+ throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ % i->first % name % *(i->second) % *v).str());
+ }
+ }
+ }
+ }
+}
+
+void Exchange::configure(const Address& address)
+{
+ const Variant& properties = address.getOption(NODE_PROPERTIES);
+ if (!properties.isVoid()) {
+ Variant::Map p = properties.asMap();
+ durable = p[DURABLE];
+ autoDelete = p[xamqp::AUTO_DELETE];
+ type = p[xamqp::EXCHANGE_TYPE].asString();
+ alternateExchange = p[xamqp::ALTERNATE_EXCHANGE].asString();
+ if (!p[xamqp::ARGUMENTS].isVoid()) {
+ translate(p[xamqp::ARGUMENTS].asMap(), arguments);
+ }
+ }
+}
+
+
+bool Node::enabled(const Variant& policy, CheckMode mode)
+{
+ bool result;
+ switch (mode) {
+ case FOR_RECEIVER:
+ result = in(policy, RECEIVER_MODES);
+ break;
+ case FOR_SENDER:
+ result = in(policy, SENDER_MODES);
+ break;
+ }
+ return result;
+}
+
+bool Node::createEnabled(const Address& address, CheckMode mode)
+{
+ const Variant& policy = address.getOption(CREATE);
+ return enabled(policy, mode);
+}
+
+void Node::convert(const Variant& options, FieldTable& arguments)
+{
+ if (!options.isVoid()) {
+ translate(options.asMap(), arguments);
+ }
+}
+std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER);
+std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER);
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.h b/cpp/src/qpid/client/amqp0_10/AddressResolution.h
index 9d5657450d..46d3f96243 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.h
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.h
@@ -50,13 +50,10 @@ class AddressResolution
{
public:
std::auto_ptr<MessageSource> resolveSource(qpid::client::Session session,
- const qpid::messaging::Address& address,
- const qpid::messaging::Filter* filter,
- const qpid::messaging::Variant::Map& options);
-
+ const qpid::messaging::Address& address);
+
std::auto_ptr<MessageSink> resolveSink(qpid::client::Session session,
- const qpid::messaging::Address& address,
- const qpid::messaging::Variant::Map& options);
+ const qpid::messaging::Address& address);
static qpid::messaging::Address convert(const qpid::framing::ReplyTo&);
static qpid::framing::ReplyTo convert(const qpid::messaging::Address&);
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index f4bc09594d..a8754778f0 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -50,11 +50,11 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
qpid::sys::Mutex lock;//used to protect data structures
qpid::sys::Semaphore semaphore;//used to coordinate reconnection
+ Sessions sessions;
qpid::client::Connection connection;
std::auto_ptr<FailoverListener> failoverListener;
qpid::Url url;
qpid::client::ConnectionSettings settings;
- Sessions sessions;
bool reconnectionEnabled;
int timeout;
int minRetryInterval;
diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
index cbc95b44fb..d3410ad76e 100644
--- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
+++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -39,7 +39,7 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from)
message.setData(from.getContent());
message.getMessageProperties().setContentType(from.getContentType());
const Address& address = from.getReplyTo();
- if (!address.value.empty()) {
+ if (address) {
message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders());
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index da91c4a160..f294d7e273 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -103,7 +103,7 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve
session = s;
if (state == UNRESOLVED) {
- source = resolver.resolveSource(session, address, filter, options);
+ source = resolver.resolveSource(session, address);
state = STOPPED;//TODO: if session is started, go straight to started
}
if (state == CANCELLED) {
@@ -136,11 +136,9 @@ uint32_t ReceiverImpl::pendingAck()
}
ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
- const qpid::messaging::Address& a,
- const qpid::messaging::Filter* f,
- const qpid::messaging::Variant::Map& o) :
+ const qpid::messaging::Address& a) :
- parent(p), destination(name), address(a), filter(f), options(o), byteCredit(0xFFFFFFFF),
+ parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF),
state(UNRESOLVED), capacity(0), listener(0), window(0) {}
bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index b941348fc8..d05fd3d045 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -22,7 +22,6 @@
*
*/
#include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/ReceiverImpl.h"
#include "qpid/messaging/Variant.h"
@@ -48,9 +47,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED};
ReceiverImpl(SessionImpl& parent, const std::string& name,
- const qpid::messaging::Address& address,
- const qpid::messaging::Filter* filter,
- const qpid::messaging::Variant::Map& options);
+ const qpid::messaging::Address& address);
void init(qpid::client::AsyncSession session, AddressResolution& resolver);
bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
@@ -72,8 +69,6 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
SessionImpl& parent;
const std::string destination;
const qpid::messaging::Address address;
- const qpid::messaging::Filter* filter;
- const qpid::messaging::Variant::Map options;
const uint32_t byteCredit;
State state;
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index 4cd2dc0521..9d168725e6 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -29,9 +29,8 @@ namespace client {
namespace amqp0_10 {
SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
- const qpid::messaging::Address& _address,
- const qpid::messaging::Variant::Map& _options) :
- parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED),
+ const qpid::messaging::Address& _address) :
+ parent(_parent), name(_name), address(_address), state(UNRESOLVED),
capacity(50), window(0), flushed(false) {}
void SenderImpl::send(const qpid::messaging::Message& message)
@@ -63,7 +62,7 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
session = s;
if (state == UNRESOLVED) {
- sink = resolver.resolveSink(session, address, options);
+ sink = resolver.resolveSink(session, address);
state = ACTIVE;
}
if (state == CANCELLED) {
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index 4faa3fc292..60b196b21b 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -47,8 +47,7 @@ class SenderImpl : public qpid::messaging::SenderImpl
enum State {UNRESOLVED, ACTIVE, CANCELLED};
SenderImpl(SessionImpl& parent, const std::string& name,
- const qpid::messaging::Address& address,
- const qpid::messaging::Variant::Map& options);
+ const qpid::messaging::Address& address);
void send(const qpid::messaging::Message&);
void cancel();
void setCapacity(uint32_t);
@@ -60,7 +59,6 @@ class SenderImpl : public qpid::messaging::SenderImpl
SessionImpl& parent;
const std::string name;
const qpid::messaging::Address address;
- const qpid::messaging::Variant::Map options;
State state;
std::auto_ptr<MessageSink> sink;
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index bc6289d84b..101bc5ce0a 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -28,7 +28,6 @@
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
#include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/messaging/MessageListener.h"
@@ -132,36 +131,22 @@ struct SessionImpl::CreateReceiver : Command
{
qpid::messaging::Receiver result;
const qpid::messaging::Address& address;
- const Filter* filter;
- const qpid::messaging::Variant::Map& options;
- CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a, const Filter* f,
- const qpid::messaging::Variant::Map& o) :
- Command(i), address(a), filter(f), options(o) {}
- void operator()() { result = impl.createReceiverImpl(address, filter, options); }
+ CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) :
+ Command(i), address(a) {}
+ void operator()() { result = impl.createReceiverImpl(address); }
};
-Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const VariantMap& options)
-{
- CreateReceiver f(*this, address, 0, options);
- while (!execute(f)) {}
- return f.result;
-}
-
-Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address,
- const Filter& filter, const VariantMap& options)
-{
- CreateReceiver f(*this, address, &filter, options);
- while (!execute(f)) {}
- return f.result;
+Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address)
+{
+ return get1<CreateReceiver, Receiver>(address);
}
-Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address,
- const Filter* filter, const VariantMap& options)
+Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address)
{
- std::string name = address;
+ std::string name = address.getName();
getFreeKey(name, receivers);
- Receiver receiver(new ReceiverImpl(*this, name, address, filter, options));
+ Receiver receiver(new ReceiverImpl(*this, name, address));
getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver);
receivers[name] = receiver;
return receiver;
@@ -171,26 +156,22 @@ struct SessionImpl::CreateSender : Command
{
qpid::messaging::Sender result;
const qpid::messaging::Address& address;
- const qpid::messaging::Variant::Map& options;
- CreateSender(SessionImpl& i, const qpid::messaging::Address& a,
- const qpid::messaging::Variant::Map& o) :
- Command(i), address(a), options(o) {}
- void operator()() { result = impl.createSenderImpl(address, options); }
+ CreateSender(SessionImpl& i, const qpid::messaging::Address& a) :
+ Command(i), address(a) {}
+ void operator()() { result = impl.createSenderImpl(address); }
};
-Sender SessionImpl::createSender(const qpid::messaging::Address& address, const VariantMap& options)
+Sender SessionImpl::createSender(const qpid::messaging::Address& address)
{
- CreateSender f(*this, address, options);
- while (!execute(f)) {}
- return f.result;
+ return get1<CreateSender, Sender>(address);
}
-Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address, const VariantMap& options)
+Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address)
{
- std::string name = address;
+ std::string name = address.getName();
getFreeKey(name, senders);
- Sender sender(new SenderImpl(*this, name, address, options));
+ Sender sender(new SenderImpl(*this, name, address));
getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver);
senders[name] = sender;
return sender;
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index b453f3f08f..9a7918d473 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -63,13 +63,8 @@ class SessionImpl : public qpid::messaging::SessionImpl
void sync();
void flush();
qpid::messaging::Address createTempQueue(const std::string& baseName);
- qpid::messaging::Sender createSender(const qpid::messaging::Address& address,
- const qpid::messaging::VariantMap& options);
- qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address,
- const qpid::messaging::VariantMap& options);
- qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address,
- const qpid::messaging::Filter& filter,
- const qpid::messaging::VariantMap& options);
+ qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
+ qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address);
void* getLastConfirmedSent();
void* getLastConfirmedAcknowledged();
@@ -129,11 +124,8 @@ class SessionImpl : public qpid::messaging::SessionImpl
void closeImpl();
void syncImpl();
void flushImpl();
- qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address,
- const qpid::messaging::VariantMap& options);
- qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address,
- const qpid::messaging::Filter* filter,
- const qpid::messaging::VariantMap& options);
+ qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address);
+ qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address);
uint32_t availableImpl(const std::string* destination);
uint32_t pendingAckImpl(const std::string* destination);
diff --git a/cpp/src/qpid/messaging/Address.cpp b/cpp/src/qpid/messaging/Address.cpp
index 813a8e1377..edb1ddc79d 100644
--- a/cpp/src/qpid/messaging/Address.cpp
+++ b/cpp/src/qpid/messaging/Address.cpp
@@ -19,28 +19,293 @@
*
*/
#include "qpid/messaging/Address.h"
+#include "qpid/framing/Uuid.h"
+#include <sstream>
+#include <boost/format.hpp>
namespace qpid {
namespace messaging {
-Address::Address() {}
-Address::Address(const std::string& address) : value(address) {}
-Address::Address(const std::string& address, const std::string& t) : value(address), type(t) {}
-Address::operator const std::string&() const { return value; }
-const std::string& Address::toStr() const { return value; }
-Address::operator bool() const { return !value.empty(); }
-bool Address::operator !() const { return value.empty(); }
+namespace {
+const std::string SUBJECT_DIVIDER = "/";
+const std::string SPACE = " ";
+const std::string TYPE = "type";
+}
+class AddressImpl
+{
+ public:
+ std::string name;
+ std::string subject;
+ Variant::Map options;
+
+ AddressImpl() {}
+ AddressImpl(const std::string& n, const std::string& s, const Variant::Map& o) :
+ name(n), subject(s), options(o) {}
+};
+
+class AddressParser
+{
+ public:
+ AddressParser(const std::string&);
+ bool parse(Address& address);
+ private:
+ const std::string& input;
+ std::string::size_type current;
+ static const std::string RESERVED;
+
+ bool readChar(char c);
+ bool readQuotedString(Variant& value);
+ bool readString(Variant& value, char delimiter);
+ bool readWord(std::string& word);
+ bool readSimpleValue(Variant& word);
+ bool readKey(std::string& key);
+ bool readValue(Variant& value);
+ bool readKeyValuePair(Variant::Map& map);
+ bool readMap(Variant& value);
+ bool readList(Variant& value);
+ bool error(const std::string& message);
+ bool eos();
+ bool iswhitespace();
+ bool isreserved();
+};
+
+Address::Address() : impl(new AddressImpl()) {}
+Address::Address(const std::string& address) : impl(new AddressImpl())
+{
+ AddressParser parser(address);
+ parser.parse(*this);
+}
+Address::Address(const std::string& name, const std::string& subject, const Variant::Map& options,
+ const std::string& type)
+ : impl(new AddressImpl(name, subject, options)) { setType(type); }
+Address::Address(const Address& a) :
+ impl(new AddressImpl(a.impl->name, a.impl->subject, a.impl->options)) {}
+Address::~Address() { delete impl; }
+
+Address& Address::operator=(const Address& a) { *impl = *a.impl; return *this; }
+
+
+std::string Address::toStr() const
+{
+ std::stringstream out;
+ out << impl->name;
+ if (!impl->subject.empty()) out << SUBJECT_DIVIDER << impl->subject;
+ if (!impl->options.empty()) out << " {" << impl->options << "}";
+ return out.str();
+}
+Address::operator bool() const { return !impl->name.empty(); }
+bool Address::operator !() const { return impl->name.empty(); }
+
+const std::string& Address::getName() const { return impl->name; }
+void Address::setName(const std::string& name) { impl->name = name; }
+const std::string& Address::getSubject() const { return impl->subject; }
+bool Address::hasSubject() const { return !(impl->subject.empty()); }
+void Address::setSubject(const std::string& subject) { impl->subject = subject; }
+const Variant::Map& Address::getOptions() const { return impl->options; }
+Variant::Map& Address::getOptions() { return impl->options; }
+void Address::setOptions(const Variant::Map& options) { impl->options = options; }
+
+
+namespace{
+const Variant EMPTY_VARIANT;
+const std::string EMPTY_STRING;
+}
-const std::string TYPE_SEPARATOR(":");
+std::string Address::getType() const
+{
+ const Variant& type = getOption(TYPE);
+ return type.isVoid() ? EMPTY_STRING : type.asString();
+}
+void Address::setType(const std::string& type) { impl->options[TYPE] = type; }
+
+const Variant& Address::getOption(const std::string& key) const
+{
+ Variant::Map::const_iterator i = impl->options.find(key);
+ if (i == impl->options.end()) return EMPTY_VARIANT;
+ else return i->second;
+}
std::ostream& operator<<(std::ostream& out, const Address& address)
{
- if (!address.type.empty()) {
- out << address.type;
- out << TYPE_SEPARATOR;
- }
- out << address.value;
+ out << address.toStr();
return out;
}
+InvalidAddress::InvalidAddress(const std::string& msg) : Exception(msg) {}
+
+MalformedAddress::MalformedAddress(const std::string& msg) : Exception(msg) {}
+
+AddressParser::AddressParser(const std::string& s) : input(s), current(0) {}
+
+bool AddressParser::error(const std::string& message)
+{
+ throw MalformedAddress(message);//TODO: add more debug detail to error message (position etc)
+}
+
+bool AddressParser::parse(Address& address)
+{
+ std::string name;
+ if (readWord(name)) {
+ if (name.find('#') == 0) name = qpid::framing::Uuid(true).str() + name;
+ address.setName(name);
+ if (readChar('/')) {
+ std::string subject;
+ if (readWord(subject)) {
+ address.setSubject(subject);
+ } else {
+ return error("Expected subject after /");
+ }
+ }
+ Variant options = Variant::Map();
+ if (readMap(options)) {
+ address.setOptions(options.asMap());
+ }
+ return true;
+ } else {
+ return input.empty() || error("Expected name");
+ }
+}
+
+bool AddressParser::readList(Variant& value)
+{
+ if (readChar('[')) {
+ value = Variant::List();
+ Variant item;
+ while (readValue(item)) {
+ value.asList().push_back(item);
+ if (!readChar(',')) break;
+ }
+ return readChar(']') || error("Unmatched '['!");
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readMap(Variant& value)
+{
+ if (readChar('{')) {
+ value = Variant::Map();
+ while (readKeyValuePair(value.asMap()) && readChar(',')) {}
+ return readChar('}') || error("Unmatched '{'!");
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readKeyValuePair(Variant::Map& map)
+{
+ std::string key;
+ Variant value;
+ if (readKey(key)) {
+ if (readChar(':') && readValue(value)) {
+ map[key] = value;
+ return true;
+ } else {
+ return error("Bad key-value pair!");
+ }
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readKey(std::string& key)
+{
+ return readWord(key);
+}
+
+bool AddressParser::readValue(Variant& value)
+{
+ return readSimpleValue(value) || readQuotedString(value) ||
+ readMap(value) || readList(value) || error("Expected value");
+}
+
+bool AddressParser::readString(Variant& value, char delimiter)
+{
+ if (readChar(delimiter)) {
+ std::string::size_type start = current++;
+ while (!eos()) {
+ if (input.at(current) == delimiter) {
+ if (current > start) {
+ value = input.substr(start, current - start);
+ } else {
+ value = "";
+ }
+ ++current;
+ return true;
+ } else {
+ ++current;
+ }
+ }
+ return error("Unmatched delimiter");
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readQuotedString(Variant& value)
+{
+ return readString(value, '"') || readString(value, '\'');
+}
+
+bool AddressParser::readSimpleValue(Variant& value)
+{
+ std::string s;
+ if (readWord(s)) {
+ value = s;
+ try { value = value.asInt64(); return true; } catch (const InvalidConversion&) {}
+ try { value = value.asDouble(); return true; } catch (const InvalidConversion&) {}
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readWord(std::string& value)
+{
+ //skip leading whitespace
+ while (!eos() && iswhitespace()) ++current;
+
+ //read any number of non-whitespace, non-reserved chars into value
+ std::string::size_type start = current;
+ while (!eos() && !iswhitespace() && !isreserved()) ++current;
+
+ if (current > start) {
+ value = input.substr(start, current - start);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool AddressParser::readChar(char c)
+{
+ while (!eos()) {
+ if (iswhitespace()) {
+ ++current;
+ } else if (input.at(current) == c) {
+ ++current;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ return false;
+}
+
+bool AddressParser::iswhitespace()
+{
+ return ::isspace(input.at(current));
+}
+
+bool AddressParser::isreserved()
+{
+ return RESERVED.find(input.at(current)) != std::string::npos;
+}
+
+bool AddressParser::eos()
+{
+ return current >= input.size();
+}
+
+const std::string AddressParser::RESERVED = "\'\"{}[],:/";
}} // namespace qpid::messaging
diff --git a/cpp/src/qpid/messaging/Filter.cpp b/cpp/src/qpid/messaging/Filter.cpp
deleted file mode 100644
index b06cbdb373..0000000000
--- a/cpp/src/qpid/messaging/Filter.cpp
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/messaging/Filter.h"
-
-namespace qpid {
-namespace client {
-}
-
-namespace messaging {
-
-Filter::Filter(std::string t, std::string pattern) : type(t) { patterns.push_back(pattern); }
-Filter::Filter(std::string t, std::string pattern1, std::string pattern2) : type(t)
-{
- patterns.push_back(pattern1);
- patterns.push_back(pattern2);
-}
-
-const std::string Filter::WILDCARD("WILDCARD");
-const std::string Filter::EXACT_MATCH("EXACT_MATCH");
-
-}} // namespace qpid::messaging
diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp
index 62b1ca0dcf..b69b575b26 100644
--- a/cpp/src/qpid/messaging/Session.cpp
+++ b/cpp/src/qpid/messaging/Session.cpp
@@ -20,7 +20,6 @@
*/
#include "qpid/messaging/Session.h"
#include "qpid/messaging/Address.h"
-#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Receiver.h"
@@ -48,30 +47,22 @@ void Session::acknowledge() { impl->acknowledge(); }
void Session::reject(Message& m) { impl->reject(m); }
void Session::close() { impl->close(); }
-Sender Session::createSender(const Address& address, const VariantMap& options)
+Sender Session::createSender(const Address& address)
{
- return impl->createSender(address, options);
+ return impl->createSender(address);
}
-Receiver Session::createReceiver(const Address& address, const VariantMap& options)
+Receiver Session::createReceiver(const Address& address)
{
- return impl->createReceiver(address, options);
-}
-Receiver Session::createReceiver(const Address& address, const Filter& filter, const VariantMap& options)
-{
- return impl->createReceiver(address, filter, options);
+ return impl->createReceiver(address);
}
-Sender Session::createSender(const std::string& address, const VariantMap& options)
-{
- return impl->createSender(Address(address), options);
-}
-Receiver Session::createReceiver(const std::string& address, const VariantMap& options)
+Sender Session::createSender(const std::string& address)
{
- return impl->createReceiver(Address(address), options);
+ return impl->createSender(Address(address));
}
-Receiver Session::createReceiver(const std::string& address, const Filter& filter, const VariantMap& options)
+Receiver Session::createReceiver(const std::string& address)
{
- return impl->createReceiver(Address(address), filter, options);
+ return impl->createReceiver(Address(address));
}
Address Session::createTempQueue(const std::string& baseName)
diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h
index 0933cea9c8..e48e7a4d02 100644
--- a/cpp/src/qpid/messaging/SessionImpl.h
+++ b/cpp/src/qpid/messaging/SessionImpl.h
@@ -23,7 +23,6 @@
*/
#include "qpid/RefCounted.h"
#include <string>
-#include "qpid/messaging/Variant.h"
#include "qpid/sys/Time.h"
namespace qpid {
@@ -53,9 +52,8 @@ class SessionImpl : public virtual qpid::RefCounted
virtual Message fetch(qpid::sys::Duration timeout) = 0;
virtual bool dispatch(qpid::sys::Duration timeout) = 0;
virtual Address createTempQueue(const std::string& baseName) = 0;
- virtual Sender createSender(const Address& address, const VariantMap& options) = 0;
- virtual Receiver createReceiver(const Address& address, const VariantMap& options) = 0;
- virtual Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options) = 0;
+ virtual Sender createSender(const Address& address) = 0;
+ virtual Receiver createReceiver(const Address& address) = 0;
virtual uint32_t available() = 0;
virtual uint32_t pendingAck() = 0;
private:
diff --git a/cpp/src/qpid/messaging/Variant.cpp b/cpp/src/qpid/messaging/Variant.cpp
index 4e37134b39..3b0c3312ca 100644
--- a/cpp/src/qpid/messaging/Variant.cpp
+++ b/cpp/src/qpid/messaging/Variant.cpp
@@ -529,6 +529,7 @@ Variant& Variant::operator=(const Variant& v)
}
VariantType Variant::getType() const { return impl->getType(); }
+bool Variant::isVoid() const { return impl->getType() == VAR_VOID; }
bool Variant::asBool() const { return impl->asBool(); }
uint8_t Variant::asUint8() const { return impl->asUint8(); }
uint16_t Variant::asUint16() const { return impl->asUint16(); }
diff --git a/cpp/src/tests/Address.cpp b/cpp/src/tests/Address.cpp
new file mode 100644
index 0000000000..ab4017a788
--- /dev/null
+++ b/cpp/src/tests/Address.cpp
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Variant.h"
+
+#include "unit_test.h"
+
+using namespace qpid::messaging;
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(AddressSuite)
+
+QPID_AUTO_TEST_CASE(testParseNameOnly)
+{
+ Address address("my-topic");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+}
+
+QPID_AUTO_TEST_CASE(testParseSubject)
+{
+ Address address("my-topic/my-subject");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+ BOOST_CHECK_EQUAL(std::string("my-subject"), address.getSubject());
+}
+
+QPID_AUTO_TEST_CASE(testParseOptions)
+{
+ Address address("my-topic {a:bc, x:101, y:'a string'}");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+ BOOST_CHECK_EQUAL(std::string("bc"), address.getOption("a").asString());
+ BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64());
+ BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString());
+}
+
+QPID_AUTO_TEST_CASE(testParseSubjectAndOptions)
+{
+ Address address("my-topic/my-subject {a:bc, x:101, y:'a string'}");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+ BOOST_CHECK_EQUAL(std::string("my-subject"), address.getSubject());
+ BOOST_CHECK_EQUAL(std::string("bc"), address.getOption("a").asString());
+ BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64());
+ BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString());
+}
+
+QPID_AUTO_TEST_CASE(testParseNestedOptions)
+{
+ Address address("my-topic {a:{p:202, q:'another string'}, x:101, y:'a string'}");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+ BOOST_CHECK_EQUAL((uint16_t) 202, address.getOptions()["a"].asMap()["p"].asInt64());
+ BOOST_CHECK_EQUAL(std::string("another string"), address.getOptions()["a"].asMap()["q"].asString());
+ BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64());
+ BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString());
+}
+
+QPID_AUTO_TEST_CASE(testParseOptionsWithList)
+{
+ Address address("my-topic {a:[202, 'another string'], x:101}");
+ BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName());
+ Variant::List& list = address.getOptions()["a"].asList();
+ Variant::List::const_iterator i = list.begin();
+ BOOST_CHECK(i != list.end());
+ BOOST_CHECK_EQUAL((uint16_t) 202, i->asInt64());
+ BOOST_CHECK(++i != list.end());
+ BOOST_CHECK_EQUAL(std::string("another string"), i->asString());
+ BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64());
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}}
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index e1935dfcc6..b84251831d 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -114,7 +114,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
ReplicationTest.cpp \
ClientMessageTest.cpp \
PollableCondition.cpp \
- Variant.cpp
+ Variant.cpp \
+ Address.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index 206f5ba691..fc39557a0e 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -21,6 +21,7 @@
#include "unit_test.h"
#include "test_tools.h"
#include "BrokerFixture.h"
+#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
#include "qpid/messaging/ListContent.h"
#include "qpid/messaging/ListView.h"
@@ -33,7 +34,9 @@
#include "qpid/messaging/Session.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Session.h"
+#include "qpid/framing/ExchangeQueryResult.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/sys/Time.h"
#include <boost/assign.hpp>
#include <boost/format.hpp>
@@ -48,6 +51,7 @@ QPID_AUTO_TEST_SUITE(MessagingSessionTests)
using namespace qpid::messaging;
using namespace qpid;
using qpid::broker::Broker;
+using qpid::framing::Uuid;
struct BrokerAdmin
{
@@ -80,6 +84,18 @@ struct BrokerAdmin
session.exchangeDelete(qpid::client::arg::exchange=name);
}
+ bool checkQueueExists(const std::string& name)
+ {
+ return session.queueQuery(name).getQueue() == name;
+ }
+
+ bool checkExchangeExists(const std::string& name, std::string& type)
+ {
+ qpid::framing::ExchangeQueryResult result = session.exchangeQuery(name);
+ type = result.getType();
+ return !result.getNotFound();
+ }
+
~BrokerAdmin()
{
session.close();
@@ -99,6 +115,19 @@ struct MessagingFixture : public BrokerFixture
session(connection.newSession()),
admin(broker->getPort(Broker::TCP_TRANSPORT)) {}
+ void ping(const qpid::messaging::Address& address)
+ {
+ Receiver r = session.createReceiver(address);
+ Sender s = session.createSender(address);
+ Message out(Uuid(true).str());
+ s.send(out);
+ Message in;
+ BOOST_CHECK(r.fetch(in, 5*qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ r.cancel();
+ s.cancel();
+ }
+
~MessagingFixture()
{
session.close();
@@ -178,6 +207,22 @@ std::vector<std::string> fetch(Receiver& receiver, int count, qpid::sys::Duratio
return data;
}
+
+void send(Sender& sender, uint count = 1, uint start = 1, const std::string& base = "Message")
+{
+ for (uint i = start; i < start + count; ++i) {
+ sender.send(Message((boost::format("%1%_%2%") % base % i).str()));
+ }
+}
+
+void receive(Receiver& receiver, uint count = 1, uint start = 1,
+ const std::string& base = "Message", qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5)
+{
+ for (uint i = start; i < start + count; ++i) {
+ BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str());
+ }
+}
+
QPID_AUTO_TEST_CASE(testSimpleSendReceive)
{
QueueFixture fix;
@@ -212,15 +257,19 @@ QPID_AUTO_TEST_CASE(testSendReceiveHeaders)
QPID_AUTO_TEST_CASE(testSenderError)
{
MessagingFixture fix;
- //TODO: this is the wrong type for the exception; define explicit set in messaging namespace
- BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::framing::NotFoundException);
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::InvalidAddress);
+ BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress {create:receiver, type:queue}"),
+ qpid::messaging::InvalidAddress);
}
QPID_AUTO_TEST_CASE(testReceiverError)
{
MessagingFixture fix;
- //TODO: this is the wrong type for the exception; define explicit set in messaging namespace
- BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::framing::NotFoundException);
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::InvalidAddress);
+ BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress {create:sender, type:queue}"),
+ qpid::messaging::InvalidAddress);
}
QPID_AUTO_TEST_CASE(testSimpleTopic)
@@ -433,9 +482,7 @@ QPID_AUTO_TEST_CASE(testPendingSend)
{
QueueFixture fix;
Sender sender = fix.session.createSender(fix.queue);
- for (uint i = 0; i < 10; ++i) {
- sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
- }
+ send(sender, 10);
//Note: this test relies on 'inside knowledge' of the sender
//implementation and the fact that the simple test case makes it
//possible to predict when completion information will be sent to
@@ -445,12 +492,248 @@ QPID_AUTO_TEST_CASE(testPendingSend)
BOOST_CHECK_EQUAL(sender.pending(), 0u);
Receiver receiver = fix.session.createReceiver(fix.queue);
- for (uint i = 0; i < 10; ++i) {
- BOOST_CHECK_EQUAL(receiver.fetch().getContent(), (boost::format("Message_%1%") % (i+1)).str());
- }
+ receive(receiver, 10);
+ fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testBrowse)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ send(sender, 10);
+ Receiver browser1 = fix.session.createReceiver(fix.queue + " {browse:true}");
+ receive(browser1, 10);
+ Receiver browser2 = fix.session.createReceiver(fix.queue + " {browse:true}");
+ receive(browser2, 10);
+ Receiver consumer = fix.session.createReceiver(fix.queue);
+ receive(consumer, 10);
fix.session.acknowledge();
}
+struct QueueCreatePolicyFixture : public MessagingFixture
+{
+ qpid::messaging::Address address;
+
+ QueueCreatePolicyFixture(const std::string& a) : address(a) {}
+
+ void test()
+ {
+ ping(address);
+ BOOST_CHECK(admin.checkQueueExists(address.getName()));
+ }
+
+ ~QueueCreatePolicyFixture()
+ {
+ admin.deleteQueue(address.getName());
+ }
+};
+
+QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways)
+{
+ QueueCreatePolicyFixture fix("# {create:always, type:queue}");
+ fix.test();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver)
+{
+ QueueCreatePolicyFixture fix("# {create:receiver, type:queue}");
+ Receiver r = fix.session.createReceiver(fix.address);
+ fix.test();
+ r.cancel();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender)
+{
+ QueueCreatePolicyFixture fix("# {create:sender, type:queue}");
+ Sender s = fix.session.createSender(fix.address);
+ fix.test();
+ s.cancel();
+}
+
+struct ExchangeCreatePolicyFixture : public MessagingFixture
+{
+ qpid::messaging::Address address;
+ const std::string exchangeType;
+
+ ExchangeCreatePolicyFixture(const std::string& a, const std::string& t) :
+ address(a), exchangeType(t) {}
+
+ void test()
+ {
+ ping(address);
+ std::string actualType;
+ BOOST_CHECK(admin.checkExchangeExists(address.getName(), actualType));
+ BOOST_CHECK_EQUAL(exchangeType, actualType);
+ }
+
+ ~ExchangeCreatePolicyFixture()
+ {
+ admin.deleteExchange(address.getName());
+ }
+};
+
+QPID_AUTO_TEST_CASE(testCreatePolicyTopic)
+{
+ ExchangeCreatePolicyFixture fix("# {create:always, type:topic, node-properties:{x-amqp0-10-exchange-type:topic}}",
+ "topic");
+ fix.test();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout)
+{
+ ExchangeCreatePolicyFixture fix("#/my-subject {create:receiver, type:topic, node-properties:{x-amqp0-10-exchange-type:fanout}}", "fanout");
+ Receiver r = fix.session.createReceiver(fix.address);
+ fix.test();
+ r.cancel();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect)
+{
+ ExchangeCreatePolicyFixture fix("#/my-subject {create:sender, type:topic, node-properties:{x-amqp0-10-exchange-type:direct}}", "direct");
+ Sender s = fix.session.createSender(fix.address);
+ fix.test();
+ s.cancel();
+}
+
+struct DeletePolicyFixture : public MessagingFixture
+{
+ enum Mode {RECEIVER, SENDER, ALWAYS, NEVER};
+
+ std::string getPolicy(Mode mode)
+ {
+ switch (mode) {
+ case SENDER:
+ return "{delete:sender}";
+ case RECEIVER:
+ return "{delete:receiver}";
+ case ALWAYS:
+ return "{delete:always}";
+ case NEVER:
+ return "{delete:never}";
+ }
+ }
+
+ void testAll()
+ {
+ test(RECEIVER);
+ test(SENDER);
+ test(ALWAYS);
+ test(NEVER);
+ }
+
+ virtual ~DeletePolicyFixture() {}
+ virtual void create(const qpid::messaging::Address&) = 0;
+ virtual void destroy(const qpid::messaging::Address&) = 0;
+ virtual bool exists(const qpid::messaging::Address&) = 0;
+
+ void test(Mode mode)
+ {
+ qpid::messaging::Address address("# " + getPolicy(mode));
+ create(address);
+
+ Sender s = session.createSender(address);
+ Receiver r = session.createReceiver(address);
+ switch (mode) {
+ case RECEIVER:
+ s.cancel();
+ BOOST_CHECK(exists(address));
+ r.cancel();
+ BOOST_CHECK(!exists(address));
+ break;
+ case SENDER:
+ r.cancel();
+ BOOST_CHECK(exists(address));
+ s.cancel();
+ BOOST_CHECK(!exists(address));
+ break;
+ case ALWAYS:
+ //Problematic case at present; multiple attempts to delete
+ //will result in all but one attempt failing and killing
+ //the session which is not desirable. TODO: better
+ //implementation of delete policy.
+ s.cancel();
+ BOOST_CHECK(!exists(address));
+ break;
+ case NEVER:
+ r.cancel();
+ BOOST_CHECK(exists(address));
+ s.cancel();
+ BOOST_CHECK(exists(address));
+ destroy(address);
+ }
+ }
+};
+
+struct QueueDeletePolicyFixture : DeletePolicyFixture
+{
+ void create(const qpid::messaging::Address& address)
+ {
+ admin.createQueue(address.getName());
+ }
+ void destroy(const qpid::messaging::Address& address)
+ {
+ admin.deleteQueue(address.getName());
+ }
+ bool exists(const qpid::messaging::Address& address)
+ {
+ return admin.checkQueueExists(address.getName());
+ }
+};
+
+struct ExchangeDeletePolicyFixture : DeletePolicyFixture
+{
+ const std::string exchangeType;
+ ExchangeDeletePolicyFixture(const std::string type = "topic") : exchangeType(type) {}
+
+ void create(const qpid::messaging::Address& address)
+ {
+ admin.createExchange(address.getName(), exchangeType);
+ }
+ void destroy(const qpid::messaging::Address& address)
+ {
+ admin.deleteExchange(address.getName());
+ }
+ bool exists(const qpid::messaging::Address& address)
+ {
+ std::string actualType;
+ return admin.checkExchangeExists(address.getName(), actualType) && actualType == exchangeType;
+ }
+};
+
+QPID_AUTO_TEST_CASE(testDeletePolicyQueue)
+{
+ QueueDeletePolicyFixture fix;
+ fix.testAll();
+}
+
+QPID_AUTO_TEST_CASE(testDeletePolicyExchange)
+{
+ ExchangeDeletePolicyFixture fix;
+ fix.testAll();
+}
+
+QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
+{
+ MessagingFixture fix;
+ std::string a1 = "q {create:always, assert:always, type:queue, node-properties:{durable:false, x-amqp0-10-arguments:{qpid.max-count:100}}}";
+ Sender s1 = fix.session.createSender(a1);
+ s1.cancel();
+ Receiver r1 = fix.session.createReceiver(a1);
+ r1.cancel();
+
+ std::string a2 = "q {assert:receiver, node-properties:{durable:true, x-amqp0-10-arguments:{qpid.max-count:100}}}";
+ Sender s2 = fix.session.createSender(a2);
+ s2.cancel();
+ BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress);
+
+ std::string a3 = "q {assert:sender, node-properties:{x-amqp0-10-arguments:{qpid.max-count:99}}}";
+ BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress);
+ Receiver r3 = fix.session.createReceiver(a3);
+ r3.cancel();
+
+ fix.admin.deleteQueue("q");
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests