diff options
28 files changed, 1296 insertions, 460 deletions
diff --git a/cpp/examples/messaging/client.cpp b/cpp/examples/messaging/client.cpp index de6d7768df..aaac554675 100644 --- a/cpp/examples/messaging/client.cpp +++ b/cpp/examples/messaging/client.cpp @@ -46,7 +46,7 @@ int main(int argc, char** argv) { Sender sender = session.createSender("service_queue"); //create temp queue & receiver... - Address responseQueue = session.createTempQueue(); + Address responseQueue("#response-queue {create:always, type:queue, node-properties:{x-amqp0-10-auto-delete:true}}"); Receiver receiver = session.createReceiver(responseQueue); // Now send some messages ... diff --git a/cpp/examples/messaging/topic_listener.cpp b/cpp/examples/messaging/topic_listener.cpp index ba999c03a7..4c97caef7c 100644 --- a/cpp/examples/messaging/topic_listener.cpp +++ b/cpp/examples/messaging/topic_listener.cpp @@ -20,11 +20,11 @@ */ #include <qpid/messaging/Connection.h> -#include <qpid/messaging/Filter.h> #include <qpid/messaging/Message.h> #include <qpid/messaging/MessageListener.h> #include <qpid/messaging/Session.h> #include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Variant.h> #include <cstdlib> #include <iostream> @@ -57,15 +57,14 @@ void Listener::received(Message& message) } int main(int argc, char** argv) { - const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; - const char* pattern = argc>2 ? argv[2] : "#.#"; + const std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + const std::string pattern = argc>2 ? argv[2] : "#.#"; try { Connection connection = Connection::open(url); Session session = connection.newSession(); - Filter filter(Filter::WILDCARD, pattern, "control"); - Receiver receiver = session.createReceiver("news_service", filter); + Receiver receiver = session.createReceiver("news_service {filter:[control, " + pattern + "]}"); Listener listener(receiver); receiver.setListener(&listener); receiver.setCapacity(1); @@ -78,5 +77,3 @@ int main(int argc, char** argv) { } return 1; } - - diff --git a/cpp/examples/messaging/topic_receiver.cpp b/cpp/examples/messaging/topic_receiver.cpp index 7352a91b30..6f6c1a5677 100644 --- a/cpp/examples/messaging/topic_receiver.cpp +++ b/cpp/examples/messaging/topic_receiver.cpp @@ -19,32 +19,25 @@ * */ -#include <qpid/messaging/Address.h> #include <qpid/messaging/Connection.h> -#include <qpid/messaging/Filter.h> #include <qpid/messaging/Message.h> #include <qpid/messaging/Receiver.h> #include <qpid/messaging/Session.h> +#include <qpid/messaging/Variant.h> #include <cstdlib> #include <iostream> -#include <sstream> - using namespace qpid::messaging; -using std::stringstream; -using std::string; - int main(int argc, char** argv) { - const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; - const char* pattern = argc>2 ? argv[2] : "#.#"; + const std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + const std::string pattern = argc>2 ? argv[2] : "#.#"; try { Connection connection = Connection::open(url); Session session = connection.newSession(); - Filter filter(Filter::WILDCARD, pattern, "control"); - Receiver receiver = session.createReceiver(Address("news_service", "topic"), filter); + Receiver receiver = session.createReceiver("news_service {filter:[control, " + pattern + "]}"); while (true) { Message message = receiver.fetch(); std::cout << "Message: " << message.getContent() << std::endl; diff --git a/cpp/include/qpid/messaging/Address.h b/cpp/include/qpid/messaging/Address.h index e66c52f4c2..f232af5d56 100644 --- a/cpp/include/qpid/messaging/Address.h +++ b/cpp/include/qpid/messaging/Address.h @@ -22,33 +22,71 @@ * */ #include <string> +#include "qpid/Exception.h" +#include "qpid/messaging/Variant.h" #include "qpid/client/ClientImportExport.h" #include <ostream> namespace qpid { -namespace client { -} - namespace messaging { +struct InvalidAddress : public qpid::Exception +{ + InvalidAddress(const std::string& msg); +}; + +struct MalformedAddress : public qpid::Exception +{ + MalformedAddress(const std::string& msg); +}; + +class AddressImpl; + /** * Represents an address to which messages can be sent and from which * messages can be received. Often a simple name is sufficient for - * this. However this struct allows the type of address to be - * specified allowing more sophisticated treatment if necessary. + * this, however this can be augmented with a subject pattern and + * options. + * + * All parts of an address can be specified in a string of the + * following form: + * + * <address> [ / <subject> ] [ { <key> : <value> , ... } ] + * + * Here the <address> is a simple name for the addressed entity and + * <subject> is a subject or subject pattern for messages sent to or + * received from this address. The options are specified as a series + * of key value pairs enclosed in curly brackets (denoting a map). */ -struct Address +class Address { - std::string value; - std::string type; - + public: QPID_CLIENT_EXTERN Address(); QPID_CLIENT_EXTERN Address(const std::string& address); - QPID_CLIENT_EXTERN Address(const std::string& address, const std::string& type); - QPID_CLIENT_EXTERN operator const std::string&() const; - QPID_CLIENT_EXTERN const std::string& toStr() const; + QPID_CLIENT_EXTERN Address(const std::string& name, const std::string& subject, + const Variant::Map& options, const std::string& type = ""); + QPID_CLIENT_EXTERN Address(const Address& address); + QPID_CLIENT_EXTERN ~Address(); + Address& operator=(const Address&); + QPID_CLIENT_EXTERN const std::string& getName() const; + QPID_CLIENT_EXTERN void setName(const std::string&); + QPID_CLIENT_EXTERN const std::string& getSubject() const; + QPID_CLIENT_EXTERN void setSubject(const std::string&); + QPID_CLIENT_EXTERN bool hasSubject() const; + QPID_CLIENT_EXTERN const Variant::Map& getOptions() const; + QPID_CLIENT_EXTERN Variant::Map& getOptions(); + QPID_CLIENT_EXTERN void setOptions(const Variant::Map&); + + QPID_CLIENT_EXTERN std::string getType() const; + QPID_CLIENT_EXTERN void setType(const std::string&); + + QPID_CLIENT_EXTERN const Variant& getOption(const std::string& key) const; + + QPID_CLIENT_EXTERN std::string toStr() const; QPID_CLIENT_EXTERN operator bool() const; QPID_CLIENT_EXTERN bool operator !() const; + private: + AddressImpl* impl; }; QPID_CLIENT_EXTERN std::ostream& operator<<(std::ostream& out, const Address& address); diff --git a/cpp/include/qpid/messaging/Filter.h b/cpp/include/qpid/messaging/Filter.h deleted file mode 100644 index 5cd844cf73..0000000000 --- a/cpp/include/qpid/messaging/Filter.h +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef QPID_MESSAGING_FILTER_H -#define QPID_MESSAGING_FILTER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <vector> -#include "qpid/client/ClientImportExport.h" - -namespace qpid { -namespace client { -} - -namespace messaging { - -struct Filter -{ - std::string type; - std::vector<std::string> patterns; - - QPID_CLIENT_EXTERN Filter(std::string type, std::string pattern); - QPID_CLIENT_EXTERN Filter(std::string type, std::string pattern1, std::string pattern2); - - static QPID_CLIENT_EXTERN const std::string WILDCARD; - static QPID_CLIENT_EXTERN const std::string EXACT_MATCH; -}; - -}} // namespace qpid::messaging - -#endif /*!QPID_MESSAGING_FILTER_H*/ diff --git a/cpp/include/qpid/messaging/Message.h b/cpp/include/qpid/messaging/Message.h index 4477d5a2e9..1acccecad0 100644 --- a/cpp/include/qpid/messaging/Message.h +++ b/cpp/include/qpid/messaging/Message.h @@ -32,7 +32,7 @@ namespace client { namespace messaging { -struct Address; +class Address; class Codec; struct MessageImpl; diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h index 979e27adae..4e3f950ef3 100644 --- a/cpp/include/qpid/messaging/Session.h +++ b/cpp/include/qpid/messaging/Session.h @@ -24,7 +24,7 @@ #include "qpid/client/ClientImportExport.h" #include "qpid/client/Handle.h" #include "qpid/sys/Time.h" -#include "Variant.h" +#include <string> namespace qpid { namespace client { @@ -35,8 +35,7 @@ template <class> class PrivateImplRef; namespace messaging { -struct Address; -struct Filter; +class Address; class Message; class MessageListener; class Sender; @@ -90,13 +89,10 @@ class Session : public qpid::client::Handle<SessionImpl> QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); - QPID_CLIENT_EXTERN Sender createSender(const Address& address, const VariantMap& options = VariantMap()); - QPID_CLIENT_EXTERN Sender createSender(const std::string& address, const VariantMap& options = VariantMap()); - - QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address, const VariantMap& options = VariantMap()); - QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options = VariantMap()); - QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const VariantMap& options = VariantMap()); - QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const Filter& filter, const VariantMap& options = VariantMap()); + QPID_CLIENT_EXTERN Sender createSender(const Address& address); + QPID_CLIENT_EXTERN Sender createSender(const std::string& address); + QPID_CLIENT_EXTERN Receiver createReceiver(const Address& address); + QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address); QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = std::string()); private: diff --git a/cpp/include/qpid/messaging/Variant.h b/cpp/include/qpid/messaging/Variant.h index 1e51914794..c63138178b 100644 --- a/cpp/include/qpid/messaging/Variant.h +++ b/cpp/include/qpid/messaging/Variant.h @@ -30,9 +30,6 @@ #include "qpid/client/ClientImportExport.h" namespace qpid { -namespace client { -} - namespace messaging { /** @@ -93,6 +90,7 @@ class Variant QPID_CLIENT_EXTERN ~Variant(); QPID_CLIENT_EXTERN VariantType getType() const; + QPID_CLIENT_EXTERN bool isVoid() const; QPID_CLIENT_EXTERN Variant& operator=(bool); QPID_CLIENT_EXTERN Variant& operator=(uint8_t); diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index df2a4ed6e7..3b81430852 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -594,7 +594,6 @@ set (qpidclient_SOURCES qpid/messaging/Address.cpp qpid/messaging/Connection.cpp qpid/messaging/ConnectionImpl.h - qpid/messaging/Filter.cpp qpid/messaging/ListContent.cpp qpid/messaging/ListView.cpp qpid/messaging/MapContent.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 6434b96e7b..4b859cda47 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -686,7 +686,6 @@ libqpidclient_la_SOURCES = \ qpid/client/SubscriptionManagerImpl.h \ qpid/messaging/Address.cpp \ qpid/messaging/Connection.cpp \ - qpid/messaging/Filter.cpp \ qpid/messaging/ListContent.cpp \ qpid/messaging/ListView.cpp \ qpid/messaging/MapContent.cpp \ @@ -795,7 +794,6 @@ nobase_include_HEADERS += \ ../include/qpid/messaging/Address.h \ ../include/qpid/messaging/Connection.h \ ../include/qpid/messaging/Codec.h \ - ../include/qpid/messaging/Filter.h \ ../include/qpid/messaging/ListContent.h \ ../include/qpid/messaging/ListView.h \ ../include/qpid/messaging/MapContent.h \ diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index f51a96efd9..14b5448a34 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -20,18 +20,24 @@ */ #include "qpid/client/amqp0_10/AddressResolution.h" #include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/client/amqp0_10/CodecsInternal.h" #include "qpid/client/amqp0_10/MessageSource.h" #include "qpid/client/amqp0_10/MessageSink.h" #include "qpid/client/amqp0_10/OutgoingMessage.h" #include "qpid/messaging/Address.h" -#include "qpid/messaging/Filter.h" #include "qpid/messaging/Message.h" +#include "qpid/messaging/Variant.h" #include "qpid/Exception.h" #include "qpid/log/Statement.h" #include "qpid/framing/enum.h" +#include "qpid/framing/ExchangeQueryResult.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/QueueQueryResult.h" #include "qpid/framing/ReplyTo.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/Uuid.h" +#include <boost/assign.hpp> +#include <boost/format.hpp> namespace qpid { namespace client { @@ -40,61 +46,145 @@ namespace amqp0_10 { using qpid::Exception; using qpid::messaging::Address; using qpid::messaging::Filter; +using qpid::messaging::InvalidAddress; using qpid::messaging::Variant; +using qpid::framing::ExchangeQueryResult; using qpid::framing::FieldTable; +using qpid::framing::QueueQueryResult; using qpid::framing::ReplyTo; +using qpid::framing::Uuid; using namespace qpid::framing::message; +using namespace boost::assign; namespace{ -const Variant EMPTY_VARIANT; const FieldTable EMPTY_FIELD_TABLE; const std::string EMPTY_STRING; //option names const std::string BROWSE("browse"); const std::string EXCLUSIVE("exclusive"); -const std::string MODE("mode"); -const std::string NAME("name"); -const std::string UNACKNOWLEDGED("unacknowledged"); +const std::string NO_LOCAL("no-local"); +const std::string FILTER("filter"); +const std::string RELIABILITY("reliability"); +const std::string NAME("subscription-name"); +const std::string NODE_PROPERTIES("node-properties"); + +//policy types +const std::string CREATE("create"); +const std::string ASSERT("assert"); +const std::string DELETE("delete"); +//policy values +const std::string ALWAYS("always"); +const std::string NEVER("never"); +const std::string RECEIVER("receiver"); +const std::string SENDER("sender"); const std::string QUEUE_ADDRESS("queue"); const std::string TOPIC_ADDRESS("topic"); -const std::string TOPIC_ADDRESS_AND_SUBJECT("topic+"); -const std::string DIVIDER("/"); -const std::string SIMPLE_SUBSCRIPTION("simple"); -const std::string RELIABLE_SUBSCRIPTION("reliable"); +const std::string UNRELIABLE("unreliable"); +const std::string AT_MOST_ONCE("at-most-once"); +const std::string AT_LEAST_ONCE("at-least-once"); +const std::string EXACTLY_ONCE("exactly-once"); const std::string DURABLE_SUBSCRIPTION("durable"); +const std::string DURABLE("durable"); + +const std::string TOPIC_EXCHANGE("topic"); +const std::string FANOUT_EXCHANGE("fanout"); +const std::string DIRECT_EXCHANGE("direct"); +const std::string HEADERS_EXCHANGE("headers"); +const std::string XML_EXCHANGE("xml"); +const std::string WILDCARD_ANY("*"); } -class QueueSource : public MessageSource +//some amqp 0-10 specific options +namespace xamqp{ +const std::string AUTO_DELETE("x-amqp0-10-auto-delete"); +const std::string EXCHANGE_TYPE("x-amqp0-10-exchange-type"); +const std::string EXCLUSIVE("x-amqp0-10-exclusive"); +const std::string ALTERNATE_EXCHANGE("x-amqp0-10-alternate-exchange"); +const std::string ARGUMENTS("x-amqp0-10-arguments"); +const std::string QUEUE_ARGUMENTS("x-amqp0-10-queue-arguments"); +const std::string SUBSCRIBE_ARGUMENTS("x-amqp0-10-queue-arguments"); +} + +class Node +{ + protected: + enum CheckMode {FOR_RECEIVER, FOR_SENDER}; + + Node(const Address& address); + + const std::string name; + Variant createPolicy; + Variant assertPolicy; + Variant deletePolicy; + + static bool enabled(const Variant& policy, CheckMode mode); + static bool createEnabled(const Address& address, CheckMode mode); + static void convert(const Variant& option, FieldTable& arguments); + static std::vector<std::string> RECEIVER_MODES; + static std::vector<std::string> SENDER_MODES; +}; + +class Queue : protected Node { public: - QueueSource(const std::string& name, AcceptMode=ACCEPT_MODE_EXPLICIT, AcquireMode=ACQUIRE_MODE_PRE_ACQUIRED, - bool exclusive = false, const FieldTable& options = EMPTY_FIELD_TABLE); + Queue(const Address& address); + protected: + void checkCreate(qpid::client::AsyncSession&, CheckMode); + void checkAssert(qpid::client::AsyncSession&, CheckMode); + void checkDelete(qpid::client::AsyncSession&, CheckMode); + private: + bool durable; + bool autoDelete; + bool exclusive; + std::string alternateExchange; + FieldTable arguments; + + void configure(const Address&); +}; + +class Exchange : protected Node +{ + public: + Exchange(const Address& address); + protected: + void checkCreate(qpid::client::AsyncSession&, CheckMode); + void checkAssert(qpid::client::AsyncSession&, CheckMode); + void checkDelete(qpid::client::AsyncSession&, CheckMode); + const std::string& getDesiredExchangeType() { return type; } + + private: + std::string type; + bool durable; + bool autoDelete; + std::string alternateExchange; + FieldTable arguments; + + void configure(const Address&); +}; + +class QueueSource : public Queue, public MessageSource +{ + public: + QueueSource(const Address& address); void subscribe(qpid::client::AsyncSession& session, const std::string& destination); void cancel(qpid::client::AsyncSession& session, const std::string& destination); private: - const std::string name; const AcceptMode acceptMode; const AcquireMode acquireMode; const bool exclusive; - const FieldTable options; + FieldTable options; }; -class Subscription : public MessageSource +class Subscription : public Exchange, public MessageSource { public: - enum SubscriptionMode {SIMPLE, RELIABLE, DURABLE}; - - Subscription(const std::string& name, SubscriptionMode mode = SIMPLE, - const FieldTable& queueOptions = EMPTY_FIELD_TABLE, const FieldTable& subscriptionOptions = EMPTY_FIELD_TABLE); - void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE); + Subscription(const Address&, const std::string& exchangeType); void subscribe(qpid::client::AsyncSession& session, const std::string& destination); void cancel(qpid::client::AsyncSession& session, const std::string& destination); - - static SubscriptionMode getMode(const std::string& mode); private: struct Binding { @@ -107,155 +197,138 @@ class Subscription : public MessageSource typedef std::vector<Binding> Bindings; - const std::string name; - const bool autoDelete; + const std::string queue; + const bool reliable; const bool durable; - const FieldTable queueOptions; - const FieldTable subscriptionOptions; + FieldTable queueOptions; + FieldTable subscriptionOptions; Bindings bindings; - std::string queue; + + void bindSpecial(const std::string& exchangeType); + void bind(const Variant& filter); + void bind(const Variant::Map& filter); + void bind(const Variant::List& filter); + void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE); + static std::string getSubscriptionName(const std::string& base, const Variant& name); }; -class Exchange : public MessageSink +class ExchangeSink : public Exchange, public MessageSink { public: - Exchange(const std::string& name, const std::string& defaultSubject = EMPTY_STRING, - bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false, - const FieldTable& options = EMPTY_FIELD_TABLE); + ExchangeSink(const Address& name); void declare(qpid::client::AsyncSession& session, const std::string& name); void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); void cancel(qpid::client::AsyncSession& session, const std::string& name); private: - const std::string name; const std::string defaultSubject; - const bool passive; - const std::string type; - const bool durable; - const FieldTable options; }; -class QueueSink : public MessageSink +class QueueSink : public Queue, public MessageSink { public: - QueueSink(const std::string& name, bool passive=true, bool exclusive=false, - bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE); + QueueSink(const Address& name); void declare(qpid::client::AsyncSession& session, const std::string& name); void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); void cancel(qpid::client::AsyncSession& session, const std::string& name); private: - const std::string name; - const bool passive; - const bool exclusive; - const bool autoDelete; - const bool durable; - const FieldTable options; }; bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address); -bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject); +bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address); -const Variant& getOption(const std::string& key, const Variant::Map& options) +bool in(const Variant& value, const std::vector<std::string>& choices) { - Variant::Map::const_iterator i = options.find(key); - if (i == options.end()) return EMPTY_VARIANT; - else return i->second; + if (!value.isVoid()) { + for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) { + if (value.asString() == *i) return true; + } + } + return false; +} + +bool getReceiverPolicy(const Address& address, const std::string& key) +{ + return in(address.getOption(key), list_of<std::string>(ALWAYS)(RECEIVER)); +} + +bool getSenderPolicy(const Address& address, const std::string& key) +{ + return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER)); +} + +bool is_unreliable(const Address& address) +{ + return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE)); +} + +bool is_reliable(const Address& address) +{ + return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE)); } std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Session session, - const Address& address, - const Filter* filter, - const Variant::Map& options) + const Address& address) { //TODO: handle case where there exists a queue and an exchange of //the same name (hence an unqualified address is ambiguous) //TODO: make sure specified address type gives sane error message - //if it does npt match the configuration on server + //if it does not match the configuration on server + + /** + if (Node::createEnabled(address, FOR_RECEIVER)) { + } else { + } + **/ if (isQueue(session, address)) { - //TODO: support auto-created queue as source, if requested by specific option - - AcceptMode accept = getOption(UNACKNOWLEDGED, options).asBool() ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT; - AcquireMode acquire = getOption(BROWSE, options).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED; - bool exclusive = getOption(EXCLUSIVE, options).asBool(); - FieldTable arguments; - //TODO: extract subscribe arguments from options (e.g. either - //filter out already processed keys and send the rest, or have - //a nested map) - - std::auto_ptr<MessageSource> source = - std::auto_ptr<MessageSource>(new QueueSource(address.value, accept, acquire, exclusive, arguments)); + std::auto_ptr<MessageSource> source(new QueueSource(address)); + QPID_LOG(debug, "resolved source address as queue: " << address); return source; } else { - //TODO: extract queue options (e.g. no-local) and subscription options (e.g. less important) - std::auto_ptr<Subscription> bindings = - std::auto_ptr<Subscription>(new Subscription(getOption(NAME, options).asString(), - Subscription::getMode(getOption(MODE, options).asString()))); - - qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.value); - if (result.getNotFound()) { - throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address)); - } else if (result.getType() == "topic") { - if (filter) { - if (filter->type != Filter::WILDCARD) { - throw qpid::framing::NotImplementedException( - QPID_MSG("Filters of type " << filter->type << " not supported by address " << address)); - - } - for (std::vector<std::string>::const_iterator i = filter->patterns.begin(); i != filter->patterns.end(); i++) { - bindings->add(address.value, *i, qpid::framing::FieldTable()); - } - } else { - //default is to receive all messages - bindings->add(address.value, "*", qpid::framing::FieldTable()); - } - } else if (result.getType() == "fanout") { - if (filter) { - throw qpid::framing::NotImplementedException(QPID_MSG("Filters are not supported by address " << address)); - } - bindings->add(address.value, address.value, qpid::framing::FieldTable()); - } else if (result.getType() == "direct") { - //TODO: ???? - } else { - //TODO: xml and headers exchanges - throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised for " << address)); - } - std::auto_ptr<MessageSource> source = std::auto_ptr<MessageSource>(bindings.release()); + qpid::framing::ExchangeQueryResult result = session.exchangeQuery(address.getName()); + std::auto_ptr<MessageSource> source(new Subscription(address, result.getType())); + QPID_LOG(debug, "resolved source address as topic: " << address); return source; } } std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session session, - const qpid::messaging::Address& address, - const qpid::messaging::Variant::Map& /*options*/) + const qpid::messaging::Address& address) { std::auto_ptr<MessageSink> sink; if (isQueue(session, address)) { - //TODO: support for auto-created queues as sink - sink = std::auto_ptr<MessageSink>(new QueueSink(address.value)); + sink = std::auto_ptr<MessageSink>(new QueueSink(address)); } else { - std::string subject; - if (isTopic(session, address, subject)) { - //TODO: support for auto-created exchanges as sink - sink = std::auto_ptr<MessageSink>(new Exchange(address.value, subject)); + if (isTopic(session, address)) { + sink = std::auto_ptr<MessageSink>(new ExchangeSink(address)); } else { - if (address.type.empty()) { - throw qpid::framing::NotFoundException(QPID_MSG("Address not known: " << address)); + if (address.getType().empty()) { + throw InvalidAddress(QPID_MSG("Address not known: " << address)); } else { - throw qpid::framing::NotImplementedException(QPID_MSG("Address type not recognised: " << address.type)); + throw InvalidAddress(QPID_MSG("Address type not recognised: " << address.getType())); } } } return sink; } -QueueSource::QueueSource(const std::string& _name, AcceptMode _acceptMode, AcquireMode _acquireMode, bool _exclusive, const FieldTable& _options) : - name(_name), acceptMode(_acceptMode), acquireMode(_acquireMode), exclusive(_exclusive), options(_options) {} +QueueSource::QueueSource(const Address& address) : + Queue(address), + acceptMode(is_unreliable(address) ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT), + acquireMode(address.getOption(BROWSE).asBool() ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED), + exclusive(address.getOption(EXCLUSIVE).asBool()) +{ + //extract subscription arguments from address options + convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), options); +} void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination) { + checkCreate(session, FOR_RECEIVER); + checkAssert(session, FOR_RECEIVER); session.messageSubscribe(arg::queue=name, arg::destination=destination, arg::acceptMode=acceptMode, @@ -267,11 +340,48 @@ void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::stri void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination) { session.messageCancel(destination); + checkDelete(session, FOR_RECEIVER); } -Subscription::Subscription(const std::string& _name, SubscriptionMode mode, const FieldTable& qOptions, const FieldTable& sOptions) - : name(_name), autoDelete(mode == SIMPLE), durable(mode == DURABLE), - queueOptions(qOptions), subscriptionOptions(sOptions) {} +std::string Subscription::getSubscriptionName(const std::string& base, const Variant& name) +{ + if (name.isVoid()) { + return (boost::format("%1%_%2%") % base % Uuid(true).str()).str(); + } else { + return (boost::format("%1%_%2%") % base % name.asString()).str(); + } +} + +Subscription::Subscription(const Address& address, const std::string& exchangeType) + : Exchange(address), + queue(getSubscriptionName(name, address.getOption(NAME))), + reliable(is_reliable(address)), + durable(address.getOption(DURABLE_SUBSCRIPTION).asBool()) +{ + if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1); + convert(address.getOption(xamqp::QUEUE_ARGUMENTS), queueOptions); + convert(address.getOption(xamqp::SUBSCRIBE_ARGUMENTS), subscriptionOptions); + + const Variant& filter = address.getOption(FILTER); + if (!filter.isVoid()) { + //TODO: if both subject _and_ filter are specified, + //combine in some way; for now we just ignore the + //subject in that case. + bind(filter); + } else if (address.hasSubject()) { + //Note: This will not work for headers- or xml- exchange; + //fanout exchange will do no filtering. + //TODO: for headers- or xml- exchange can construct a match + //for the subject in the application-headers + bind(address.getSubject()); + } else { + //Neither a subject nor a filter has been defined, treat this + //as wanting to match all messages (Note: direct exchange is + //currently unable to support this case). + if (!exchangeType.empty()) bindSpecial(exchangeType); + else if (!getDesiredExchangeType().empty()) bindSpecial(getDesiredExchangeType()); + } +} void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options) { @@ -280,18 +390,19 @@ void Subscription::add(const std::string& exchange, const std::string& key, cons void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination) { - if (name.empty()) { - //TODO: use same scheme as JMS client for subscription queue name generation? - queue = session.getId().getName() + destination; - } else { - queue = name; - } + //create exchange if required and specified by policy: + checkCreate(session, FOR_RECEIVER); + checkAssert(session, FOR_RECEIVER); + + //create subscription queue: session.queueDeclare(arg::queue=queue, arg::exclusive=true, - arg::autoDelete=autoDelete, arg::durable=durable, arg::arguments=queueOptions); + arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions); + //bind subscription queue to exchange: for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) { session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options); } - AcceptMode accept = autoDelete ? ACCEPT_MODE_NONE : ACCEPT_MODE_EXPLICIT; + //subscribe to subscription queue: + AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE; session.messageSubscribe(arg::queue=queue, arg::destination=destination, arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions); } @@ -300,36 +411,23 @@ void Subscription::cancel(qpid::client::AsyncSession& session, const std::string { session.messageCancel(destination); session.queueDelete(arg::queue=queue); + checkDelete(session, FOR_RECEIVER); } Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o): exchange(e), key(k), options(o) {} -Subscription::SubscriptionMode Subscription::getMode(const std::string& s) -{ - if (s.empty() || s == SIMPLE_SUBSCRIPTION) return SIMPLE; - else if (s == RELIABLE_SUBSCRIPTION) return RELIABLE; - else if (s == DURABLE_SUBSCRIPTION) return DURABLE; - else throw Exception(QPID_MSG("Unrecognised subscription mode: " << s)); -} - void convert(qpid::messaging::Message& from, qpid::client::Message& to); -Exchange::Exchange(const std::string& _name, const std::string& _defaultSubject, - bool _passive, const std::string& _type, bool _durable, const FieldTable& _options) : - name(_name), defaultSubject(_defaultSubject), passive(_passive), type(_type), durable(_durable), options(_options) {} +ExchangeSink::ExchangeSink(const Address& address) : Exchange(address), defaultSubject(address.getSubject()) {} -void Exchange::declare(qpid::client::AsyncSession& session, const std::string&) +void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&) { - //TODO: should this really by synchronous? want to get error if not valid... - if (passive) { - sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); - } else { - sync(session).exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::arguments=options); - } + checkCreate(session, FOR_SENDER); + checkAssert(session, FOR_SENDER); } -void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) +void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) { if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) { m.message.getDeliveryProperties().setRoutingKey(defaultSubject); @@ -337,22 +435,17 @@ void Exchange::send(qpid::client::AsyncSession& session, const std::string&, Out m.status = session.messageTransfer(arg::destination=name, arg::content=m.message); } -void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {} +void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&) +{ + checkDelete(session, FOR_SENDER); +} -QueueSink::QueueSink(const std::string& _name, bool _passive, bool _exclusive, - bool _autoDelete, bool _durable, const FieldTable& _options) : - name(_name), passive(_passive), exclusive(_exclusive), - autoDelete(_autoDelete), durable(_durable), options(_options) {} +QueueSink::QueueSink(const Address& address) : Queue(address) {} void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&) { - //TODO: should this really by synchronous? - if (passive) { - sync(session).queueDeclare(arg::queue=name, arg::passive=true); - } else { - sync(session).queueDeclare(arg::queue=name, arg::exclusive=exclusive, arg::durable=durable, - arg::autoDelete=autoDelete, arg::arguments=options); - } + checkCreate(session, FOR_SENDER); + checkAssert(session, FOR_SENDER); } void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) { @@ -360,9 +453,10 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou m.status = session.messageTransfer(arg::content=m.message); } -void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {} - -void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp +void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&) +{ + checkDelete(session, FOR_SENDER); +} void convert(qpid::messaging::Message& from, qpid::client::Message& to) { @@ -372,7 +466,7 @@ void convert(qpid::messaging::Message& from, qpid::client::Message& to) //TODO: set other delivery properties to.getMessageProperties().setContentType(from.getContentType()); const Address& address = from.getReplyTo(); - if (!address.value.empty()) { + if (!address.getName().empty()) { to.getMessageProperties().setReplyTo(AddressResolution::convert(address)); } translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders()); @@ -381,72 +475,292 @@ void convert(qpid::messaging::Message& from, qpid::client::Message& to) Address AddressResolution::convert(const qpid::framing::ReplyTo& rt) { - if (rt.getExchange().empty()) { - if (rt.getRoutingKey().empty()) { - return Address();//empty address - } else { - return Address(rt.getRoutingKey(), QUEUE_ADDRESS); - } + Address address; + if (rt.getExchange().empty()) {//if default exchange, treat as queue + address.setName(rt.getRoutingKey()); + address.setType(QUEUE_ADDRESS); } else { - if (rt.getRoutingKey().empty()) { - return Address(rt.getExchange(), TOPIC_ADDRESS); - } else { - return Address(rt.getExchange() + DIVIDER + rt.getRoutingKey(), TOPIC_ADDRESS_AND_SUBJECT); - } - } + address.setName(rt.getExchange()); + address.setSubject(rt.getRoutingKey()); + address.setType(TOPIC_ADDRESS); + } + return address; } qpid::framing::ReplyTo AddressResolution::convert(const Address& address) { - if (address.type == QUEUE_ADDRESS || address.type.empty()) { - return ReplyTo(EMPTY_STRING, address.value); - } else if (address.type == TOPIC_ADDRESS) { - return ReplyTo(address.value, EMPTY_STRING); - } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) { - //need to split the value - string::size_type i = address.value.find(DIVIDER); - if (i != string::npos) { - std::string exchange = address.value.substr(0, i); - std::string routingKey; - if (i+1 < address.value.size()) { - routingKey = address.value.substr(i+1); - } - return ReplyTo(exchange, routingKey); - } else { - return ReplyTo(address.value, EMPTY_STRING); - } + if (address.getType() == QUEUE_ADDRESS || address.getType().empty()) { + return ReplyTo(EMPTY_STRING, address.getName()); + } else if (address.getType() == TOPIC_ADDRESS) { + return ReplyTo(address.getName(), address.getSubject()); } else { - QPID_LOG(notice, "Unrecognised type for reply-to: " << address.type); - //treat as queue - return ReplyTo(EMPTY_STRING, address.value); + QPID_LOG(notice, "Unrecognised type for reply-to: " << address.getType()); + return ReplyTo(EMPTY_STRING, address.getName());//treat as queue } } bool isQueue(qpid::client::Session session, const qpid::messaging::Address& address) { - return address.type == QUEUE_ADDRESS || - (address.type.empty() && session.queueQuery(address.value).getQueue() == address.value); + return address.getType() == QUEUE_ADDRESS || + (address.getType().empty() && session.queueQuery(address.getName()).getQueue() == address.getName()); } -bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address, std::string& subject) +bool isTopic(qpid::client::Session session, const qpid::messaging::Address& address) { - if (address.type.empty()) { - return !session.exchangeQuery(address.value).getNotFound(); - } else if (address.type == TOPIC_ADDRESS) { - return true; - } else if (address.type == TOPIC_ADDRESS_AND_SUBJECT) { - string::size_type i = address.value.find(DIVIDER); - if (i != string::npos) { - std::string exchange = address.value.substr(0, i); - if (i+1 < address.value.size()) { - subject = address.value.substr(i+1); - } - } + if (address.getType().empty()) { + return !session.exchangeQuery(address.getName()).getNotFound(); + } else if (address.getType() == TOPIC_ADDRESS) { return true; } else { return false; } } +void Subscription::bind(const Variant& filter) +{ + switch (filter.getType()) { + case qpid::messaging::VAR_MAP: + bind(filter.asMap()); + break; + case qpid::messaging::VAR_LIST: + bind(filter.asList()); + break; + default: + add(name, filter.asString()); + break; + } +} + +void Subscription::bind(const Variant::Map& filter) +{ + qpid::framing::FieldTable arguments; + translate(filter, arguments); + add(name, queue, arguments); +} + +void Subscription::bind(const Variant::List& filter) +{ + for (Variant::List::const_iterator i = filter.begin(); i != filter.end(); ++i) { + bind(*i); + } +} + +void Subscription::bindSpecial(const std::string& exchangeType) +{ + if (exchangeType == TOPIC_EXCHANGE) { + add(name, WILDCARD_ANY); + } else if (exchangeType == FANOUT_EXCHANGE) { + add(name, queue); + } else if (exchangeType == HEADERS_EXCHANGE) { + //TODO: add special binding for headers exchange to match all messages + } else if (exchangeType == XML_EXCHANGE) { + //TODO: add special binding for xml exchange to match all messages + } else { //E.g. direct + throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << exchangeType)); + } +} + +Node::Node(const Address& address) : name(address.getName()), + createPolicy(address.getOption(CREATE)), + assertPolicy(address.getOption(ASSERT)), + deletePolicy(address.getOption(DELETE)) {} + +Queue::Queue(const Address& a) : Node(a), + durable(false), + autoDelete(false), + exclusive(false) +{ + configure(a); +} + +void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(createPolicy, mode)) { + QPID_LOG(debug, "Auto-creating queue '" << name << "'"); + try { + sync(session).queueDeclare(arg::queue=name, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::exclusive=exclusive, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); + } catch (const qpid::Exception& e) { + throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str()); + } + } else { + try { + sync(session).queueDeclare(arg::queue=name, arg::passive=true); + } catch (const qpid::Exception& e) { + throw InvalidAddress((boost::format("Queue %1% does not exist; %2%") % name % e.what()).str()); + } + } +} + +void Queue::checkDelete(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(deletePolicy, mode)) { + QPID_LOG(debug, "Auto-deleting queue '" << name << "'"); + sync(session).queueDelete(arg::queue=name); + } +} + +void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(assertPolicy, mode)) { + QueueQueryResult result = sync(session).queueQuery(name); + if (result.getQueue() != name) { + throw InvalidAddress((boost::format("Queue not found: %1%") % name).str()); + } else { + if (durable && !result.getDurable()) { + throw InvalidAddress((boost::format("Queue not durable: %1%") % name).str()); + } + if (autoDelete && !result.getAutoDelete()) { + throw InvalidAddress((boost::format("Queue not set to auto-delete: %1%") % name).str()); + } + if (exclusive && !result.getExclusive()) { + throw InvalidAddress((boost::format("Queue not exclusive: %1%") % name).str()); + } + if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) { + throw InvalidAddress((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") + % name % alternateExchange % result.getAlternateExchange()).str()); + } + for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { + FieldTable::ValuePtr v = result.getArguments().get(i->first); + if (!v) { + throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str()); + } else if (*i->second != *v) { + throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") + % i->first % name % *(i->second) % *v).str()); + } + } + } + } +} + +void Queue::configure(const Address& address) +{ + const Variant& properties = address.getOption(NODE_PROPERTIES); + if (!properties.isVoid()) { + Variant::Map p = properties.asMap(); + durable = p[DURABLE]; + autoDelete = p[xamqp::AUTO_DELETE]; + exclusive = p[xamqp::EXCLUSIVE]; + alternateExchange = p[xamqp::ALTERNATE_EXCHANGE].asString(); + if (!p[xamqp::ARGUMENTS].isVoid()) { + translate(p[xamqp::ARGUMENTS].asMap(), arguments); + } + } +} + +Exchange::Exchange(const Address& a) : Node(a), + durable(false), + autoDelete(false) +{ + configure(a); +} + +void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(createPolicy, mode)) { + try { + sync(session).exchangeDeclare(arg::exchange=name, + arg::type=type, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); + } catch (const qpid::Exception& e) { + throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str()); + } + } else { + try { + sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); + } catch (const qpid::Exception& e) { + throw InvalidAddress((boost::format("Exchange %1% does not exist; %2%") % name % e.what()).str()); + } + } +} + +void Exchange::checkDelete(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(deletePolicy, mode)) { + sync(session).exchangeDelete(arg::exchange=name); + } +} + +void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) +{ + if (enabled(assertPolicy, mode)) { + ExchangeQueryResult result = sync(session).exchangeQuery(arg::exchange=name); + if (result.getNotFound()) { + throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str()); + } else { + if (!type.empty() && result.getType() != type) { + throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") + % name % type % result.getType()).str()); + } + if (durable && !result.getDurable()) { + throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str()); + } + //Note: Can't check auto-delete or alternate-exchange via + //exchange-query-result as these are not returned + //TODO: could use a passive declare to check alternate-exchange + for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { + FieldTable::ValuePtr v = result.getArguments().get(i->first); + if (!v) { + throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str()); + } else if (i->second != v) { + throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") + % i->first % name % *(i->second) % *v).str()); + } + } + } + } +} + +void Exchange::configure(const Address& address) +{ + const Variant& properties = address.getOption(NODE_PROPERTIES); + if (!properties.isVoid()) { + Variant::Map p = properties.asMap(); + durable = p[DURABLE]; + autoDelete = p[xamqp::AUTO_DELETE]; + type = p[xamqp::EXCHANGE_TYPE].asString(); + alternateExchange = p[xamqp::ALTERNATE_EXCHANGE].asString(); + if (!p[xamqp::ARGUMENTS].isVoid()) { + translate(p[xamqp::ARGUMENTS].asMap(), arguments); + } + } +} + + +bool Node::enabled(const Variant& policy, CheckMode mode) +{ + bool result; + switch (mode) { + case FOR_RECEIVER: + result = in(policy, RECEIVER_MODES); + break; + case FOR_SENDER: + result = in(policy, SENDER_MODES); + break; + } + return result; +} + +bool Node::createEnabled(const Address& address, CheckMode mode) +{ + const Variant& policy = address.getOption(CREATE); + return enabled(policy, mode); +} + +void Node::convert(const Variant& options, FieldTable& arguments) +{ + if (!options.isVoid()) { + translate(options.asMap(), arguments); + } +} +std::vector<std::string> Node::RECEIVER_MODES = list_of<std::string>(ALWAYS) (RECEIVER); +std::vector<std::string> Node::SENDER_MODES = list_of<std::string>(ALWAYS) (SENDER); }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.h b/cpp/src/qpid/client/amqp0_10/AddressResolution.h index 9d5657450d..46d3f96243 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.h +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.h @@ -50,13 +50,10 @@ class AddressResolution { public: std::auto_ptr<MessageSource> resolveSource(qpid::client::Session session, - const qpid::messaging::Address& address, - const qpid::messaging::Filter* filter, - const qpid::messaging::Variant::Map& options); - + const qpid::messaging::Address& address); + std::auto_ptr<MessageSink> resolveSink(qpid::client::Session session, - const qpid::messaging::Address& address, - const qpid::messaging::Variant::Map& options); + const qpid::messaging::Address& address); static qpid::messaging::Address convert(const qpid::framing::ReplyTo&); static qpid::framing::ReplyTo convert(const qpid::messaging::Address&); diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index f4bc09594d..a8754778f0 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -50,11 +50,11 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl qpid::sys::Mutex lock;//used to protect data structures qpid::sys::Semaphore semaphore;//used to coordinate reconnection + Sessions sessions; qpid::client::Connection connection; std::auto_ptr<FailoverListener> failoverListener; qpid::Url url; qpid::client::ConnectionSettings settings; - Sessions sessions; bool reconnectionEnabled; int timeout; int minRetryInterval; diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index cbc95b44fb..d3410ad76e 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -39,7 +39,7 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from) message.setData(from.getContent()); message.getMessageProperties().setContentType(from.getContentType()); const Address& address = from.getReplyTo(); - if (!address.value.empty()) { + if (address) { message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); } translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders()); diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index da91c4a160..f294d7e273 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -103,7 +103,7 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve session = s; if (state == UNRESOLVED) { - source = resolver.resolveSource(session, address, filter, options); + source = resolver.resolveSource(session, address); state = STOPPED;//TODO: if session is started, go straight to started } if (state == CANCELLED) { @@ -136,11 +136,9 @@ uint32_t ReceiverImpl::pendingAck() } ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, - const qpid::messaging::Address& a, - const qpid::messaging::Filter* f, - const qpid::messaging::Variant::Map& o) : + const qpid::messaging::Address& a) : - parent(p), destination(name), address(a), filter(f), options(o), byteCredit(0xFFFFFFFF), + parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF), state(UNRESOLVED), capacity(0), listener(0), window(0) {} bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout) diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index b941348fc8..d05fd3d045 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -22,7 +22,6 @@ * */ #include "qpid/messaging/Address.h" -#include "qpid/messaging/Filter.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/ReceiverImpl.h" #include "qpid/messaging/Variant.h" @@ -48,9 +47,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED}; ReceiverImpl(SessionImpl& parent, const std::string& name, - const qpid::messaging::Address& address, - const qpid::messaging::Filter* filter, - const qpid::messaging::Variant::Map& options); + const qpid::messaging::Address& address); void init(qpid::client::AsyncSession session, AddressResolution& resolver); bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout); @@ -72,8 +69,6 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl SessionImpl& parent; const std::string destination; const qpid::messaging::Address address; - const qpid::messaging::Filter* filter; - const qpid::messaging::Variant::Map options; const uint32_t byteCredit; State state; diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index 4cd2dc0521..9d168725e6 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -29,9 +29,8 @@ namespace client { namespace amqp0_10 { SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, - const qpid::messaging::Address& _address, - const qpid::messaging::Variant::Map& _options) : - parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED), + const qpid::messaging::Address& _address) : + parent(_parent), name(_name), address(_address), state(UNRESOLVED), capacity(50), window(0), flushed(false) {} void SenderImpl::send(const qpid::messaging::Message& message) @@ -63,7 +62,7 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) { session = s; if (state == UNRESOLVED) { - sink = resolver.resolveSink(session, address, options); + sink = resolver.resolveSink(session, address); state = ACTIVE; } if (state == CANCELLED) { diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h index 4faa3fc292..60b196b21b 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -47,8 +47,7 @@ class SenderImpl : public qpid::messaging::SenderImpl enum State {UNRESOLVED, ACTIVE, CANCELLED}; SenderImpl(SessionImpl& parent, const std::string& name, - const qpid::messaging::Address& address, - const qpid::messaging::Variant::Map& options); + const qpid::messaging::Address& address); void send(const qpid::messaging::Message&); void cancel(); void setCapacity(uint32_t); @@ -60,7 +59,6 @@ class SenderImpl : public qpid::messaging::SenderImpl SessionImpl& parent; const std::string name; const qpid::messaging::Address address; - const qpid::messaging::Variant::Map options; State state; std::auto_ptr<MessageSink> sink; diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index bc6289d84b..101bc5ce0a 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -28,7 +28,6 @@ #include "qpid/Exception.h" #include "qpid/log/Statement.h" #include "qpid/messaging/Address.h" -#include "qpid/messaging/Filter.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" #include "qpid/messaging/MessageListener.h" @@ -132,36 +131,22 @@ struct SessionImpl::CreateReceiver : Command { qpid::messaging::Receiver result; const qpid::messaging::Address& address; - const Filter* filter; - const qpid::messaging::Variant::Map& options; - CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a, const Filter* f, - const qpid::messaging::Variant::Map& o) : - Command(i), address(a), filter(f), options(o) {} - void operator()() { result = impl.createReceiverImpl(address, filter, options); } + CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) : + Command(i), address(a) {} + void operator()() { result = impl.createReceiverImpl(address); } }; -Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const VariantMap& options) -{ - CreateReceiver f(*this, address, 0, options); - while (!execute(f)) {} - return f.result; -} - -Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, - const Filter& filter, const VariantMap& options) -{ - CreateReceiver f(*this, address, &filter, options); - while (!execute(f)) {} - return f.result; +Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address) +{ + return get1<CreateReceiver, Receiver>(address); } -Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address, - const Filter* filter, const VariantMap& options) +Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address) { - std::string name = address; + std::string name = address.getName(); getFreeKey(name, receivers); - Receiver receiver(new ReceiverImpl(*this, name, address, filter, options)); + Receiver receiver(new ReceiverImpl(*this, name, address)); getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver); receivers[name] = receiver; return receiver; @@ -171,26 +156,22 @@ struct SessionImpl::CreateSender : Command { qpid::messaging::Sender result; const qpid::messaging::Address& address; - const qpid::messaging::Variant::Map& options; - CreateSender(SessionImpl& i, const qpid::messaging::Address& a, - const qpid::messaging::Variant::Map& o) : - Command(i), address(a), options(o) {} - void operator()() { result = impl.createSenderImpl(address, options); } + CreateSender(SessionImpl& i, const qpid::messaging::Address& a) : + Command(i), address(a) {} + void operator()() { result = impl.createSenderImpl(address); } }; -Sender SessionImpl::createSender(const qpid::messaging::Address& address, const VariantMap& options) +Sender SessionImpl::createSender(const qpid::messaging::Address& address) { - CreateSender f(*this, address, options); - while (!execute(f)) {} - return f.result; + return get1<CreateSender, Sender>(address); } -Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address, const VariantMap& options) +Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address) { - std::string name = address; + std::string name = address.getName(); getFreeKey(name, senders); - Sender sender(new SenderImpl(*this, name, address, options)); + Sender sender(new SenderImpl(*this, name, address)); getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver); senders[name] = sender; return sender; diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index b453f3f08f..9a7918d473 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -63,13 +63,8 @@ class SessionImpl : public qpid::messaging::SessionImpl void sync(); void flush(); qpid::messaging::Address createTempQueue(const std::string& baseName); - qpid::messaging::Sender createSender(const qpid::messaging::Address& address, - const qpid::messaging::VariantMap& options); - qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address, - const qpid::messaging::VariantMap& options); - qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address, - const qpid::messaging::Filter& filter, - const qpid::messaging::VariantMap& options); + qpid::messaging::Sender createSender(const qpid::messaging::Address& address); + qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address); void* getLastConfirmedSent(); void* getLastConfirmedAcknowledged(); @@ -129,11 +124,8 @@ class SessionImpl : public qpid::messaging::SessionImpl void closeImpl(); void syncImpl(); void flushImpl(); - qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address, - const qpid::messaging::VariantMap& options); - qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address, - const qpid::messaging::Filter* filter, - const qpid::messaging::VariantMap& options); + qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address); + qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address); uint32_t availableImpl(const std::string* destination); uint32_t pendingAckImpl(const std::string* destination); diff --git a/cpp/src/qpid/messaging/Address.cpp b/cpp/src/qpid/messaging/Address.cpp index 813a8e1377..edb1ddc79d 100644 --- a/cpp/src/qpid/messaging/Address.cpp +++ b/cpp/src/qpid/messaging/Address.cpp @@ -19,28 +19,293 @@ * */ #include "qpid/messaging/Address.h" +#include "qpid/framing/Uuid.h" +#include <sstream> +#include <boost/format.hpp> namespace qpid { namespace messaging { -Address::Address() {} -Address::Address(const std::string& address) : value(address) {} -Address::Address(const std::string& address, const std::string& t) : value(address), type(t) {} -Address::operator const std::string&() const { return value; } -const std::string& Address::toStr() const { return value; } -Address::operator bool() const { return !value.empty(); } -bool Address::operator !() const { return value.empty(); } +namespace { +const std::string SUBJECT_DIVIDER = "/"; +const std::string SPACE = " "; +const std::string TYPE = "type"; +} +class AddressImpl +{ + public: + std::string name; + std::string subject; + Variant::Map options; + + AddressImpl() {} + AddressImpl(const std::string& n, const std::string& s, const Variant::Map& o) : + name(n), subject(s), options(o) {} +}; + +class AddressParser +{ + public: + AddressParser(const std::string&); + bool parse(Address& address); + private: + const std::string& input; + std::string::size_type current; + static const std::string RESERVED; + + bool readChar(char c); + bool readQuotedString(Variant& value); + bool readString(Variant& value, char delimiter); + bool readWord(std::string& word); + bool readSimpleValue(Variant& word); + bool readKey(std::string& key); + bool readValue(Variant& value); + bool readKeyValuePair(Variant::Map& map); + bool readMap(Variant& value); + bool readList(Variant& value); + bool error(const std::string& message); + bool eos(); + bool iswhitespace(); + bool isreserved(); +}; + +Address::Address() : impl(new AddressImpl()) {} +Address::Address(const std::string& address) : impl(new AddressImpl()) +{ + AddressParser parser(address); + parser.parse(*this); +} +Address::Address(const std::string& name, const std::string& subject, const Variant::Map& options, + const std::string& type) + : impl(new AddressImpl(name, subject, options)) { setType(type); } +Address::Address(const Address& a) : + impl(new AddressImpl(a.impl->name, a.impl->subject, a.impl->options)) {} +Address::~Address() { delete impl; } + +Address& Address::operator=(const Address& a) { *impl = *a.impl; return *this; } + + +std::string Address::toStr() const +{ + std::stringstream out; + out << impl->name; + if (!impl->subject.empty()) out << SUBJECT_DIVIDER << impl->subject; + if (!impl->options.empty()) out << " {" << impl->options << "}"; + return out.str(); +} +Address::operator bool() const { return !impl->name.empty(); } +bool Address::operator !() const { return impl->name.empty(); } + +const std::string& Address::getName() const { return impl->name; } +void Address::setName(const std::string& name) { impl->name = name; } +const std::string& Address::getSubject() const { return impl->subject; } +bool Address::hasSubject() const { return !(impl->subject.empty()); } +void Address::setSubject(const std::string& subject) { impl->subject = subject; } +const Variant::Map& Address::getOptions() const { return impl->options; } +Variant::Map& Address::getOptions() { return impl->options; } +void Address::setOptions(const Variant::Map& options) { impl->options = options; } + + +namespace{ +const Variant EMPTY_VARIANT; +const std::string EMPTY_STRING; +} -const std::string TYPE_SEPARATOR(":"); +std::string Address::getType() const +{ + const Variant& type = getOption(TYPE); + return type.isVoid() ? EMPTY_STRING : type.asString(); +} +void Address::setType(const std::string& type) { impl->options[TYPE] = type; } + +const Variant& Address::getOption(const std::string& key) const +{ + Variant::Map::const_iterator i = impl->options.find(key); + if (i == impl->options.end()) return EMPTY_VARIANT; + else return i->second; +} std::ostream& operator<<(std::ostream& out, const Address& address) { - if (!address.type.empty()) { - out << address.type; - out << TYPE_SEPARATOR; - } - out << address.value; + out << address.toStr(); return out; } +InvalidAddress::InvalidAddress(const std::string& msg) : Exception(msg) {} + +MalformedAddress::MalformedAddress(const std::string& msg) : Exception(msg) {} + +AddressParser::AddressParser(const std::string& s) : input(s), current(0) {} + +bool AddressParser::error(const std::string& message) +{ + throw MalformedAddress(message);//TODO: add more debug detail to error message (position etc) +} + +bool AddressParser::parse(Address& address) +{ + std::string name; + if (readWord(name)) { + if (name.find('#') == 0) name = qpid::framing::Uuid(true).str() + name; + address.setName(name); + if (readChar('/')) { + std::string subject; + if (readWord(subject)) { + address.setSubject(subject); + } else { + return error("Expected subject after /"); + } + } + Variant options = Variant::Map(); + if (readMap(options)) { + address.setOptions(options.asMap()); + } + return true; + } else { + return input.empty() || error("Expected name"); + } +} + +bool AddressParser::readList(Variant& value) +{ + if (readChar('[')) { + value = Variant::List(); + Variant item; + while (readValue(item)) { + value.asList().push_back(item); + if (!readChar(',')) break; + } + return readChar(']') || error("Unmatched '['!"); + } else { + return false; + } +} + +bool AddressParser::readMap(Variant& value) +{ + if (readChar('{')) { + value = Variant::Map(); + while (readKeyValuePair(value.asMap()) && readChar(',')) {} + return readChar('}') || error("Unmatched '{'!"); + } else { + return false; + } +} + +bool AddressParser::readKeyValuePair(Variant::Map& map) +{ + std::string key; + Variant value; + if (readKey(key)) { + if (readChar(':') && readValue(value)) { + map[key] = value; + return true; + } else { + return error("Bad key-value pair!"); + } + } else { + return false; + } +} + +bool AddressParser::readKey(std::string& key) +{ + return readWord(key); +} + +bool AddressParser::readValue(Variant& value) +{ + return readSimpleValue(value) || readQuotedString(value) || + readMap(value) || readList(value) || error("Expected value"); +} + +bool AddressParser::readString(Variant& value, char delimiter) +{ + if (readChar(delimiter)) { + std::string::size_type start = current++; + while (!eos()) { + if (input.at(current) == delimiter) { + if (current > start) { + value = input.substr(start, current - start); + } else { + value = ""; + } + ++current; + return true; + } else { + ++current; + } + } + return error("Unmatched delimiter"); + } else { + return false; + } +} + +bool AddressParser::readQuotedString(Variant& value) +{ + return readString(value, '"') || readString(value, '\''); +} + +bool AddressParser::readSimpleValue(Variant& value) +{ + std::string s; + if (readWord(s)) { + value = s; + try { value = value.asInt64(); return true; } catch (const InvalidConversion&) {} + try { value = value.asDouble(); return true; } catch (const InvalidConversion&) {} + return true; + } else { + return false; + } +} + +bool AddressParser::readWord(std::string& value) +{ + //skip leading whitespace + while (!eos() && iswhitespace()) ++current; + + //read any number of non-whitespace, non-reserved chars into value + std::string::size_type start = current; + while (!eos() && !iswhitespace() && !isreserved()) ++current; + + if (current > start) { + value = input.substr(start, current - start); + return true; + } else { + return false; + } +} + +bool AddressParser::readChar(char c) +{ + while (!eos()) { + if (iswhitespace()) { + ++current; + } else if (input.at(current) == c) { + ++current; + return true; + } else { + return false; + } + } + return false; +} + +bool AddressParser::iswhitespace() +{ + return ::isspace(input.at(current)); +} + +bool AddressParser::isreserved() +{ + return RESERVED.find(input.at(current)) != std::string::npos; +} + +bool AddressParser::eos() +{ + return current >= input.size(); +} + +const std::string AddressParser::RESERVED = "\'\"{}[],:/"; }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/Filter.cpp b/cpp/src/qpid/messaging/Filter.cpp deleted file mode 100644 index b06cbdb373..0000000000 --- a/cpp/src/qpid/messaging/Filter.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/messaging/Filter.h" - -namespace qpid { -namespace client { -} - -namespace messaging { - -Filter::Filter(std::string t, std::string pattern) : type(t) { patterns.push_back(pattern); } -Filter::Filter(std::string t, std::string pattern1, std::string pattern2) : type(t) -{ - patterns.push_back(pattern1); - patterns.push_back(pattern2); -} - -const std::string Filter::WILDCARD("WILDCARD"); -const std::string Filter::EXACT_MATCH("EXACT_MATCH"); - -}} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp index 62b1ca0dcf..b69b575b26 100644 --- a/cpp/src/qpid/messaging/Session.cpp +++ b/cpp/src/qpid/messaging/Session.cpp @@ -20,7 +20,6 @@ */ #include "qpid/messaging/Session.h" #include "qpid/messaging/Address.h" -#include "qpid/messaging/Filter.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" @@ -48,30 +47,22 @@ void Session::acknowledge() { impl->acknowledge(); } void Session::reject(Message& m) { impl->reject(m); } void Session::close() { impl->close(); } -Sender Session::createSender(const Address& address, const VariantMap& options) +Sender Session::createSender(const Address& address) { - return impl->createSender(address, options); + return impl->createSender(address); } -Receiver Session::createReceiver(const Address& address, const VariantMap& options) +Receiver Session::createReceiver(const Address& address) { - return impl->createReceiver(address, options); -} -Receiver Session::createReceiver(const Address& address, const Filter& filter, const VariantMap& options) -{ - return impl->createReceiver(address, filter, options); + return impl->createReceiver(address); } -Sender Session::createSender(const std::string& address, const VariantMap& options) -{ - return impl->createSender(Address(address), options); -} -Receiver Session::createReceiver(const std::string& address, const VariantMap& options) +Sender Session::createSender(const std::string& address) { - return impl->createReceiver(Address(address), options); + return impl->createSender(Address(address)); } -Receiver Session::createReceiver(const std::string& address, const Filter& filter, const VariantMap& options) +Receiver Session::createReceiver(const std::string& address) { - return impl->createReceiver(Address(address), filter, options); + return impl->createReceiver(Address(address)); } Address Session::createTempQueue(const std::string& baseName) diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h index 0933cea9c8..e48e7a4d02 100644 --- a/cpp/src/qpid/messaging/SessionImpl.h +++ b/cpp/src/qpid/messaging/SessionImpl.h @@ -23,7 +23,6 @@ */ #include "qpid/RefCounted.h" #include <string> -#include "qpid/messaging/Variant.h" #include "qpid/sys/Time.h" namespace qpid { @@ -53,9 +52,8 @@ class SessionImpl : public virtual qpid::RefCounted virtual Message fetch(qpid::sys::Duration timeout) = 0; virtual bool dispatch(qpid::sys::Duration timeout) = 0; virtual Address createTempQueue(const std::string& baseName) = 0; - virtual Sender createSender(const Address& address, const VariantMap& options) = 0; - virtual Receiver createReceiver(const Address& address, const VariantMap& options) = 0; - virtual Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options) = 0; + virtual Sender createSender(const Address& address) = 0; + virtual Receiver createReceiver(const Address& address) = 0; virtual uint32_t available() = 0; virtual uint32_t pendingAck() = 0; private: diff --git a/cpp/src/qpid/messaging/Variant.cpp b/cpp/src/qpid/messaging/Variant.cpp index 4e37134b39..3b0c3312ca 100644 --- a/cpp/src/qpid/messaging/Variant.cpp +++ b/cpp/src/qpid/messaging/Variant.cpp @@ -529,6 +529,7 @@ Variant& Variant::operator=(const Variant& v) } VariantType Variant::getType() const { return impl->getType(); } +bool Variant::isVoid() const { return impl->getType() == VAR_VOID; } bool Variant::asBool() const { return impl->asBool(); } uint8_t Variant::asUint8() const { return impl->asUint8(); } uint16_t Variant::asUint16() const { return impl->asUint16(); } diff --git a/cpp/src/tests/Address.cpp b/cpp/src/tests/Address.cpp new file mode 100644 index 0000000000..ab4017a788 --- /dev/null +++ b/cpp/src/tests/Address.cpp @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Variant.h" + +#include "unit_test.h" + +using namespace qpid::messaging; + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(AddressSuite) + +QPID_AUTO_TEST_CASE(testParseNameOnly) +{ + Address address("my-topic"); + BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName()); +} + +QPID_AUTO_TEST_CASE(testParseSubject) +{ + Address address("my-topic/my-subject"); + BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName()); + BOOST_CHECK_EQUAL(std::string("my-subject"), address.getSubject()); +} + +QPID_AUTO_TEST_CASE(testParseOptions) +{ + Address address("my-topic {a:bc, x:101, y:'a string'}"); + BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName()); + BOOST_CHECK_EQUAL(std::string("bc"), address.getOption("a").asString()); + BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64()); + BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString()); +} + +QPID_AUTO_TEST_CASE(testParseSubjectAndOptions) +{ + Address address("my-topic/my-subject {a:bc, x:101, y:'a string'}"); + BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName()); + BOOST_CHECK_EQUAL(std::string("my-subject"), address.getSubject()); + BOOST_CHECK_EQUAL(std::string("bc"), address.getOption("a").asString()); + BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64()); + BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString()); +} + +QPID_AUTO_TEST_CASE(testParseNestedOptions) +{ + Address address("my-topic {a:{p:202, q:'another string'}, x:101, y:'a string'}"); + BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName()); + BOOST_CHECK_EQUAL((uint16_t) 202, address.getOptions()["a"].asMap()["p"].asInt64()); + BOOST_CHECK_EQUAL(std::string("another string"), address.getOptions()["a"].asMap()["q"].asString()); + BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64()); + BOOST_CHECK_EQUAL(std::string("a string"), address.getOption("y").asString()); +} + +QPID_AUTO_TEST_CASE(testParseOptionsWithList) +{ + Address address("my-topic {a:[202, 'another string'], x:101}"); + BOOST_CHECK_EQUAL(std::string("my-topic"), address.getName()); + Variant::List& list = address.getOptions()["a"].asList(); + Variant::List::const_iterator i = list.begin(); + BOOST_CHECK(i != list.end()); + BOOST_CHECK_EQUAL((uint16_t) 202, i->asInt64()); + BOOST_CHECK(++i != list.end()); + BOOST_CHECK_EQUAL(std::string("another string"), i->asString()); + BOOST_CHECK_EQUAL((uint16_t) 101, address.getOption("x").asInt64()); +} + +QPID_AUTO_TEST_SUITE_END() + +}} diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index e1935dfcc6..b84251831d 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -114,7 +114,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ ReplicationTest.cpp \ ClientMessageTest.cpp \ PollableCondition.cpp \ - Variant.cpp + Variant.cpp \ + Address.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index 206f5ba691..fc39557a0e 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -21,6 +21,7 @@ #include "unit_test.h" #include "test_tools.h" #include "BrokerFixture.h" +#include "qpid/messaging/Address.h" #include "qpid/messaging/Connection.h" #include "qpid/messaging/ListContent.h" #include "qpid/messaging/ListView.h" @@ -33,7 +34,9 @@ #include "qpid/messaging/Session.h" #include "qpid/client/Connection.h" #include "qpid/client/Session.h" +#include "qpid/framing/ExchangeQueryResult.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/Uuid.h" #include "qpid/sys/Time.h" #include <boost/assign.hpp> #include <boost/format.hpp> @@ -48,6 +51,7 @@ QPID_AUTO_TEST_SUITE(MessagingSessionTests) using namespace qpid::messaging; using namespace qpid; using qpid::broker::Broker; +using qpid::framing::Uuid; struct BrokerAdmin { @@ -80,6 +84,18 @@ struct BrokerAdmin session.exchangeDelete(qpid::client::arg::exchange=name); } + bool checkQueueExists(const std::string& name) + { + return session.queueQuery(name).getQueue() == name; + } + + bool checkExchangeExists(const std::string& name, std::string& type) + { + qpid::framing::ExchangeQueryResult result = session.exchangeQuery(name); + type = result.getType(); + return !result.getNotFound(); + } + ~BrokerAdmin() { session.close(); @@ -99,6 +115,19 @@ struct MessagingFixture : public BrokerFixture session(connection.newSession()), admin(broker->getPort(Broker::TCP_TRANSPORT)) {} + void ping(const qpid::messaging::Address& address) + { + Receiver r = session.createReceiver(address); + Sender s = session.createSender(address); + Message out(Uuid(true).str()); + s.send(out); + Message in; + BOOST_CHECK(r.fetch(in, 5*qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); + r.cancel(); + s.cancel(); + } + ~MessagingFixture() { session.close(); @@ -178,6 +207,22 @@ std::vector<std::string> fetch(Receiver& receiver, int count, qpid::sys::Duratio return data; } + +void send(Sender& sender, uint count = 1, uint start = 1, const std::string& base = "Message") +{ + for (uint i = start; i < start + count; ++i) { + sender.send(Message((boost::format("%1%_%2%") % base % i).str())); + } +} + +void receive(Receiver& receiver, uint count = 1, uint start = 1, + const std::string& base = "Message", qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5) +{ + for (uint i = start; i < start + count; ++i) { + BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str()); + } +} + QPID_AUTO_TEST_CASE(testSimpleSendReceive) { QueueFixture fix; @@ -212,15 +257,19 @@ QPID_AUTO_TEST_CASE(testSendReceiveHeaders) QPID_AUTO_TEST_CASE(testSenderError) { MessagingFixture fix; - //TODO: this is the wrong type for the exception; define explicit set in messaging namespace - BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::framing::NotFoundException); + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::InvalidAddress); + BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress {create:receiver, type:queue}"), + qpid::messaging::InvalidAddress); } QPID_AUTO_TEST_CASE(testReceiverError) { MessagingFixture fix; - //TODO: this is the wrong type for the exception; define explicit set in messaging namespace - BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::framing::NotFoundException); + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::InvalidAddress); + BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress {create:sender, type:queue}"), + qpid::messaging::InvalidAddress); } QPID_AUTO_TEST_CASE(testSimpleTopic) @@ -433,9 +482,7 @@ QPID_AUTO_TEST_CASE(testPendingSend) { QueueFixture fix; Sender sender = fix.session.createSender(fix.queue); - for (uint i = 0; i < 10; ++i) { - sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); - } + send(sender, 10); //Note: this test relies on 'inside knowledge' of the sender //implementation and the fact that the simple test case makes it //possible to predict when completion information will be sent to @@ -445,12 +492,248 @@ QPID_AUTO_TEST_CASE(testPendingSend) BOOST_CHECK_EQUAL(sender.pending(), 0u); Receiver receiver = fix.session.createReceiver(fix.queue); - for (uint i = 0; i < 10; ++i) { - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), (boost::format("Message_%1%") % (i+1)).str()); - } + receive(receiver, 10); + fix.session.acknowledge(); +} + +QPID_AUTO_TEST_CASE(testBrowse) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + send(sender, 10); + Receiver browser1 = fix.session.createReceiver(fix.queue + " {browse:true}"); + receive(browser1, 10); + Receiver browser2 = fix.session.createReceiver(fix.queue + " {browse:true}"); + receive(browser2, 10); + Receiver consumer = fix.session.createReceiver(fix.queue); + receive(consumer, 10); fix.session.acknowledge(); } +struct QueueCreatePolicyFixture : public MessagingFixture +{ + qpid::messaging::Address address; + + QueueCreatePolicyFixture(const std::string& a) : address(a) {} + + void test() + { + ping(address); + BOOST_CHECK(admin.checkQueueExists(address.getName())); + } + + ~QueueCreatePolicyFixture() + { + admin.deleteQueue(address.getName()); + } +}; + +QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways) +{ + QueueCreatePolicyFixture fix("# {create:always, type:queue}"); + fix.test(); +} + +QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver) +{ + QueueCreatePolicyFixture fix("# {create:receiver, type:queue}"); + Receiver r = fix.session.createReceiver(fix.address); + fix.test(); + r.cancel(); +} + +QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender) +{ + QueueCreatePolicyFixture fix("# {create:sender, type:queue}"); + Sender s = fix.session.createSender(fix.address); + fix.test(); + s.cancel(); +} + +struct ExchangeCreatePolicyFixture : public MessagingFixture +{ + qpid::messaging::Address address; + const std::string exchangeType; + + ExchangeCreatePolicyFixture(const std::string& a, const std::string& t) : + address(a), exchangeType(t) {} + + void test() + { + ping(address); + std::string actualType; + BOOST_CHECK(admin.checkExchangeExists(address.getName(), actualType)); + BOOST_CHECK_EQUAL(exchangeType, actualType); + } + + ~ExchangeCreatePolicyFixture() + { + admin.deleteExchange(address.getName()); + } +}; + +QPID_AUTO_TEST_CASE(testCreatePolicyTopic) +{ + ExchangeCreatePolicyFixture fix("# {create:always, type:topic, node-properties:{x-amqp0-10-exchange-type:topic}}", + "topic"); + fix.test(); +} + +QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout) +{ + ExchangeCreatePolicyFixture fix("#/my-subject {create:receiver, type:topic, node-properties:{x-amqp0-10-exchange-type:fanout}}", "fanout"); + Receiver r = fix.session.createReceiver(fix.address); + fix.test(); + r.cancel(); +} + +QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect) +{ + ExchangeCreatePolicyFixture fix("#/my-subject {create:sender, type:topic, node-properties:{x-amqp0-10-exchange-type:direct}}", "direct"); + Sender s = fix.session.createSender(fix.address); + fix.test(); + s.cancel(); +} + +struct DeletePolicyFixture : public MessagingFixture +{ + enum Mode {RECEIVER, SENDER, ALWAYS, NEVER}; + + std::string getPolicy(Mode mode) + { + switch (mode) { + case SENDER: + return "{delete:sender}"; + case RECEIVER: + return "{delete:receiver}"; + case ALWAYS: + return "{delete:always}"; + case NEVER: + return "{delete:never}"; + } + } + + void testAll() + { + test(RECEIVER); + test(SENDER); + test(ALWAYS); + test(NEVER); + } + + virtual ~DeletePolicyFixture() {} + virtual void create(const qpid::messaging::Address&) = 0; + virtual void destroy(const qpid::messaging::Address&) = 0; + virtual bool exists(const qpid::messaging::Address&) = 0; + + void test(Mode mode) + { + qpid::messaging::Address address("# " + getPolicy(mode)); + create(address); + + Sender s = session.createSender(address); + Receiver r = session.createReceiver(address); + switch (mode) { + case RECEIVER: + s.cancel(); + BOOST_CHECK(exists(address)); + r.cancel(); + BOOST_CHECK(!exists(address)); + break; + case SENDER: + r.cancel(); + BOOST_CHECK(exists(address)); + s.cancel(); + BOOST_CHECK(!exists(address)); + break; + case ALWAYS: + //Problematic case at present; multiple attempts to delete + //will result in all but one attempt failing and killing + //the session which is not desirable. TODO: better + //implementation of delete policy. + s.cancel(); + BOOST_CHECK(!exists(address)); + break; + case NEVER: + r.cancel(); + BOOST_CHECK(exists(address)); + s.cancel(); + BOOST_CHECK(exists(address)); + destroy(address); + } + } +}; + +struct QueueDeletePolicyFixture : DeletePolicyFixture +{ + void create(const qpid::messaging::Address& address) + { + admin.createQueue(address.getName()); + } + void destroy(const qpid::messaging::Address& address) + { + admin.deleteQueue(address.getName()); + } + bool exists(const qpid::messaging::Address& address) + { + return admin.checkQueueExists(address.getName()); + } +}; + +struct ExchangeDeletePolicyFixture : DeletePolicyFixture +{ + const std::string exchangeType; + ExchangeDeletePolicyFixture(const std::string type = "topic") : exchangeType(type) {} + + void create(const qpid::messaging::Address& address) + { + admin.createExchange(address.getName(), exchangeType); + } + void destroy(const qpid::messaging::Address& address) + { + admin.deleteExchange(address.getName()); + } + bool exists(const qpid::messaging::Address& address) + { + std::string actualType; + return admin.checkExchangeExists(address.getName(), actualType) && actualType == exchangeType; + } +}; + +QPID_AUTO_TEST_CASE(testDeletePolicyQueue) +{ + QueueDeletePolicyFixture fix; + fix.testAll(); +} + +QPID_AUTO_TEST_CASE(testDeletePolicyExchange) +{ + ExchangeDeletePolicyFixture fix; + fix.testAll(); +} + +QPID_AUTO_TEST_CASE(testAssertPolicyQueue) +{ + MessagingFixture fix; + std::string a1 = "q {create:always, assert:always, type:queue, node-properties:{durable:false, x-amqp0-10-arguments:{qpid.max-count:100}}}"; + Sender s1 = fix.session.createSender(a1); + s1.cancel(); + Receiver r1 = fix.session.createReceiver(a1); + r1.cancel(); + + std::string a2 = "q {assert:receiver, node-properties:{durable:true, x-amqp0-10-arguments:{qpid.max-count:100}}}"; + Sender s2 = fix.session.createSender(a2); + s2.cancel(); + BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress); + + std::string a3 = "q {assert:sender, node-properties:{x-amqp0-10-arguments:{qpid.max-count:99}}}"; + BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress); + Receiver r3 = fix.session.createReceiver(a3); + r3.cancel(); + + fix.admin.deleteQueue("q"); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |