diff options
author | Gordon Sim <gsim@apache.org> | 2013-03-23 17:59:50 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2013-03-23 17:59:50 +0000 |
commit | 990613d5320add3f29119e5039f9d757638be1fb (patch) | |
tree | 2316e9bc514c4637875edfae334e4113849a248d | |
parent | 4b532586d1a44628ffa037fa131f4f314592fda5 (diff) | |
download | qpid-python-990613d5320add3f29119e5039f9d757638be1fb.tar.gz |
QPID-4586: fixes for dynamic sources/targets and on demand creation of nodes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1460198 13f79535-47bb-0310-9956-ffa450edef68
36 files changed, 253 insertions, 95 deletions
diff --git a/qpid/cpp/examples/messaging/client.cpp b/qpid/cpp/examples/messaging/client.cpp index f0ecd96206..983f0a8878 100644 --- a/qpid/cpp/examples/messaging/client.cpp +++ b/qpid/cpp/examples/messaging/client.cpp @@ -48,8 +48,8 @@ int main(int argc, char** argv) { Sender sender = session.createSender("service_queue"); //create temp queue & receiver... - Address responseQueue("#response-queue; {create:always, delete:always}"); - Receiver receiver = session.createReceiver(responseQueue); + Receiver receiver = session.createReceiver("#"); + Address responseQueue = receiver.getAddress(); // Now send some messages ... string s[] = { diff --git a/qpid/cpp/include/qpid/messaging/Address.h b/qpid/cpp/include/qpid/messaging/Address.h index 63dce0c49d..224a70b193 100644 --- a/qpid/cpp/include/qpid/messaging/Address.h +++ b/qpid/cpp/include/qpid/messaging/Address.h @@ -153,6 +153,7 @@ class QPID_MESSAGING_CLASS_EXTERN Address QPID_MESSAGING_EXTERN bool operator !() const; private: AddressImpl* impl; + friend class AddressImpl; }; #ifndef SWIG diff --git a/qpid/cpp/include/qpid/messaging/Receiver.h b/qpid/cpp/include/qpid/messaging/Receiver.h index 13317dfcbd..6c6fdaa7cb 100644 --- a/qpid/cpp/include/qpid/messaging/Receiver.h +++ b/qpid/cpp/include/qpid/messaging/Receiver.h @@ -34,6 +34,7 @@ namespace messaging { template <class> class PrivateImplRef; #endif +class Address; class Message; class ReceiverImpl; class Session; @@ -134,6 +135,11 @@ class QPID_MESSAGING_CLASS_EXTERN Receiver : public qpid::messaging::Handle<Rece */ QPID_MESSAGING_EXTERN Session getSession() const; + /** + * Returns an address for this receiver. + */ + QPID_MESSAGING_EXTERN Address getAddress() const; + #ifndef SWIG private: friend class qpid::messaging::PrivateImplRef<Receiver>; diff --git a/qpid/cpp/include/qpid/messaging/Sender.h b/qpid/cpp/include/qpid/messaging/Sender.h index 8e1c5846e9..9c7bca1905 100644 --- a/qpid/cpp/include/qpid/messaging/Sender.h +++ b/qpid/cpp/include/qpid/messaging/Sender.h @@ -34,6 +34,7 @@ namespace messaging { #ifndef SWIG template <class> class PrivateImplRef; #endif +class Address; class Message; class SenderImpl; class Session; @@ -89,6 +90,11 @@ class QPID_MESSAGING_CLASS_EXTERN Sender : public qpid::messaging::Handle<Sender * Returns a handle to the session associated with this sender. */ QPID_MESSAGING_EXTERN Session getSession() const; + + /** + * Returns an address for this sender. + */ + QPID_MESSAGING_EXTERN Address getAddress() const; #ifndef SWIG private: friend class qpid::messaging::PrivateImplRef<Sender>; diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index ef68d79fb2..bb00046857 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1063,6 +1063,7 @@ install_pdb (qpidclient ${QPID_COMPONENT_CLIENT}) set (qpidmessaging_SOURCES_hidden qpid/messaging/AddressParser.h + qpid/messaging/AddressImpl.h qpid/messaging/ConnectionImpl.h qpid/messaging/ReceiverImpl.h qpid/messaging/SessionImpl.h diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 42d5c60475..cad41bca86 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -935,6 +935,7 @@ libqpidtypes_la_LDFLAGS = -version-info $(QPIDTYPES_VERSION_INFO) libqpidmessaging_la_LIBADD = libqpidclient.la libqpidtypes.la libqpidmessaging_la_SOURCES = \ qpid/messaging/Address.cpp \ + qpid/messaging/AddressImpl.h \ qpid/messaging/AddressParser.h \ qpid/messaging/AddressParser.cpp \ qpid/messaging/Connection.cpp \ diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index a83034eb6e..be98c048b6 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -37,11 +37,11 @@ namespace qpid { namespace broker { namespace amqp { -Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse) +Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse, const std::string& d) : ManagedConnection(b, i), connection(pn_connection()), transport(pn_transport()), - out(o), id(i), broker(b), haveOutput(true), interconnects(interconnects_) + out(o), id(i), broker(b), haveOutput(true), interconnects(interconnects_), domain(d) { if (pn_transport_bind(transport, connection)) { //error @@ -265,4 +265,8 @@ std::string Connection::getError() return text.str(); } +std::string Connection::getDomain() const +{ + return domain; +} }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index 28cf86f123..d61db82e60 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -45,7 +45,7 @@ class Session; class Connection : public sys::ConnectionCodec, public ManagedConnection { public: - Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, bool saslInUse); + Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, bool saslInUse, const std::string& domain); virtual ~Connection(); size_t decode(const char* buffer, size_t size); virtual size_t encode(char* buffer, size_t size); @@ -57,6 +57,7 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection framing::ProtocolVersion getVersion() const; pn_transport_t* getTransport(); Interconnects& getInterconnects(); + std::string getDomain() const; protected: typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions; pn_connection_t* connection; @@ -67,6 +68,7 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection bool haveOutput; Sessions sessions; Interconnects& interconnects; + std::string domain; virtual void process(); std::string getError(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp index 082715b1b2..92a6f75f1e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp @@ -41,7 +41,7 @@ namespace amqp { Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse, bool i, const std::string& n, const std::string& s, const std::string& t, Domain& d, Interconnects& r) - : Connection(out, id, broker, r, saslInUse), incoming(i), name(n), source(s), target(t), domain(d), registry(r), headerDiscarded(false), + : Connection(out, id, broker, r, saslInUse, std::string()), incoming(i), name(n), source(s), target(t), domain(d), registry(r), headerDiscarded(false), closeRequested(false), isTransportDeleted(false) {} diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp index 0e622f8d20..16f0cdc39f 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp @@ -41,10 +41,19 @@ namespace qpid { namespace broker { namespace amqp { +struct Options : public qpid::Options { + std::string domain; + + Options() : qpid::Options("AMQP 1.0 Options") { + addOptions() + ("domain", optValue(domain, "DOMAIN"), "Domain of this broker"); + } +}; + class ProtocolImpl : public Protocol { public: - ProtocolImpl(Interconnects* i, Broker& b) : interconnects(i), broker(b) + ProtocolImpl(Interconnects* i, Broker& b, const std::string& d) : interconnects(i), broker(b), domain(d) { broker.getObjectFactoryRegistry().add(interconnects);//registry deletes on shutdown } @@ -54,16 +63,20 @@ class ProtocolImpl : public Protocol private: Interconnects* interconnects; Broker& broker; + std::string domain; }; struct ProtocolPlugin : public Plugin { + Options options; + Options* getOptions() { return &options; } + void earlyInitialize(Plugin::Target& target) { //need to register protocol before recovery from store broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target); if (broker) { - ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), *broker); + ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), *broker, options.domain); broker->getProtocolRegistry().add("AMQP 1.0", impl);//registry deletes on shutdown } } @@ -79,18 +92,20 @@ qpid::sys::ConnectionCodec* ProtocolImpl::create(const qpid::framing::ProtocolVe if (v.getProtocol() == qpid::framing::ProtocolVersion::SASL) { if (broker.getOptions().auth) { QPID_LOG(info, "Using AMQP 1.0 (with SASL layer)"); - return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm, broker.getOptions().requireEncrypted, external)); + return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, + qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm,broker.getOptions().requireEncrypted, external), + domain); } else { std::auto_ptr<SaslServer> authenticator(new qpid::NullSaslServer(broker.getOptions().realm)); QPID_LOG(info, "Using AMQP 1.0 (with dummy SASL layer)"); - return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, authenticator); + return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, authenticator, domain); } } else { if (broker.getOptions().auth) { throw qpid::Exception("SASL layer required!"); } else { QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)"); - return new qpid::broker::amqp::Connection(out, id, broker, *interconnects, false); + return new qpid::broker::amqp::Connection(out, id, broker, *interconnects, false, domain); } } } diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp index d8e12fcfdd..820aaf87d4 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp @@ -31,8 +31,8 @@ namespace qpid { namespace broker { namespace amqp { -Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, Interconnects& i, std::auto_ptr<qpid::SaslServer> auth) - : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, i, true), +Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, Interconnects& i, std::auto_ptr<qpid::SaslServer> auth, const std::string& domain) + : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, i, true, domain), authenticator(auth), state(INCOMPLETE), writeHeader(true), haveOutput(true) { diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.h b/qpid/cpp/src/qpid/broker/amqp/Sasl.h index 7718b4c43a..194ab0a0d5 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Sasl.h +++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.h @@ -39,7 +39,7 @@ namespace amqp { class Sasl : public sys::ConnectionCodec, qpid::amqp::SaslServer { public: - Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, std::auto_ptr<qpid::SaslServer> authenticator); + Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, std::auto_ptr<qpid::SaslServer> authenticator, const std::string& domain); ~Sasl(); size_t decode(const char* buffer, size_t size); diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 544616b897..62011f6372 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -53,6 +53,20 @@ namespace qpid { namespace broker { namespace amqp { +namespace { +bool is_capability_requested(const std::string& name, pn_data_t* capabilities) +{ + while (pn_data_next(capabilities)) { + pn_bytes_t c = pn_data_get_symbol(capabilities); + std::string s(c.start, c.size); + if (s == name) return true; + } + return false; +} + +const std::string CREATE_ON_DEMAND("create-on-demand"); +} + class IncomingToQueue : public DecodingIncoming { public: @@ -81,7 +95,7 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te node.exchange = broker.getExchanges().find(name); node.queue = broker.getQueues().find(name); if (!node.queue && !node.exchange) { - if (pn_terminus_is_dynamic(terminus)) { + if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) { //is it a queue or an exchange? NodeProperties properties; properties.read(pn_terminus_properties(terminus)); @@ -117,25 +131,43 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te return node; } +std::string Session::generateName(pn_link_t* link) +{ + std::stringstream s; + s << qpid::types::Uuid(true) << "::" << pn_link_name(link); + if (!connection.getDomain().empty()) { + s << "@" << connection.getDomain(); + } + return s.str(); +} + void Session::attach(pn_link_t* link) { if (pn_link_is_sender(link)) { pn_terminus_t* source = pn_link_remote_source(link); //i.e a subscription + std::string name; if (pn_terminus_get_type(source) == PN_UNSPECIFIED) { throw qpid::Exception("No source specified!");/*invalid-field?*/ + } else if (pn_terminus_is_dynamic(source)) { + name = generateName(link); + } else { + name = pn_terminus_get_address(source); } - std::string name = pn_terminus_get_address(source); QPID_LOG(debug, "Received attach request for outgoing link from " << name); pn_terminus_set_address(pn_link_source(link), name.c_str()); setupOutgoing(link, source, name); } else { pn_terminus_t* target = pn_link_remote_target(link); + std::string name; if (pn_terminus_get_type(target) == PN_UNSPECIFIED) { throw qpid::Exception("No target specified!");/*invalid field?*/ + } else if (pn_terminus_is_dynamic(target)) { + name = generateName(link); + } else { + name = pn_terminus_get_address(target); } - std::string name = pn_terminus_get_address(target); QPID_LOG(debug, "Received attach request for incoming link to " << name); pn_terminus_set_address(pn_link_target(link), name.c_str()); diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h index 74f50a9eda..b142224cfd 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.h +++ b/qpid/cpp/src/qpid/broker/amqp/Session.h @@ -95,6 +95,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming); void setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name); void setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name); + std::string generateName(pn_link_t*); }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 40b09b0fc0..11f9475cad 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -143,6 +143,11 @@ uint32_t ReceiverImpl::getUnsettled() return parent->getUnsettledAcks(destination); } +qpid::messaging::Address ReceiverImpl::getAddress() const +{ + return address; +} + ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, const qpid::messaging::Address& a) : diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index 76da4f31a9..4dba76c8d9 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -66,6 +66,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl void received(qpid::messaging::Message& message); qpid::messaging::Session getSession() const; bool isClosed() const; + qpid::messaging::Address getAddress() const; private: mutable sys::Mutex lock; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index b275db38d7..7001acaf99 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -34,6 +34,11 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, parent(&_parent), name(_name), address(_address), state(UNRESOLVED), capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {} +qpid::messaging::Address SenderImpl::getAddress() const +{ + return address; +} + void SenderImpl::send(const qpid::messaging::Message& message, bool sync) { if (unreliable) { // immutable, don't need lock diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h index d75863c743..ee250af2d4 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -56,6 +56,7 @@ class SenderImpl : public qpid::messaging::SenderImpl void init(qpid::client::AsyncSession, AddressResolution&); const std::string& getName() const; qpid::messaging::Session getSession() const; + qpid::messaging::Address getAddress() const; private: mutable sys::Mutex lock; diff --git a/qpid/cpp/src/qpid/messaging/Address.cpp b/qpid/cpp/src/qpid/messaging/Address.cpp index a516959edb..6fbaeef661 100644 --- a/qpid/cpp/src/qpid/messaging/Address.cpp +++ b/qpid/cpp/src/qpid/messaging/Address.cpp @@ -19,6 +19,8 @@ * */ #include "qpid/messaging/Address.h" +#include "qpid/messaging/AddressImpl.h" +#include "qpid/messaging/AddressParser.h" #include "qpid/framing/Uuid.h" #include <sstream> #include <boost/format.hpp> @@ -34,51 +36,9 @@ const std::string OPTIONS_DIVIDER = ";"; const std::string SPACE = " "; const std::string TYPE = "type"; } -class AddressImpl -{ - public: - std::string name; - std::string subject; - Variant::Map options; - - AddressImpl() {} - AddressImpl(const std::string& n, const std::string& s, const Variant::Map& o) : - name(n), subject(s), options(o) {} -}; - -class AddressParser -{ - public: - AddressParser(const std::string&); - bool parse(Address& address); - private: - const std::string& input; - std::string::size_type current; - static const std::string RESERVED; - - bool readChar(char c); - bool readQuotedString(std::string& s); - bool readQuotedValue(Variant& value); - bool readString(std::string& value, char delimiter); - bool readWord(std::string& word, const std::string& delims = RESERVED); - bool readSimpleValue(Variant& word); - bool readKey(std::string& key); - bool readValue(Variant& value); - bool readKeyValuePair(Variant::Map& map); - bool readMap(Variant& value); - bool readList(Variant& value); - bool readName(std::string& name); - bool readSubject(std::string& subject); - bool error(const std::string& message); - bool eos(); - bool iswhitespace(); - bool in(const std::string& delims); - bool isreserved(); -}; - Address::Address() : impl(new AddressImpl()) {} Address::Address(const std::string& address) : impl(new AddressImpl()) -{ +{ AddressParser parser(address); parser.parse(*this); } @@ -86,7 +46,7 @@ Address::Address(const std::string& name, const std::string& subject, const Vari const std::string& type) : impl(new AddressImpl(name, subject, options)) { setType(type); } Address::Address(const Address& a) : - impl(new AddressImpl(a.impl->name, a.impl->subject, a.impl->options)) {} + impl(new AddressImpl(a.impl->name, a.impl->subject, a.impl->options)) { impl->temporary = a.impl->temporary; } Address::~Address() { delete impl; } Address& Address::operator=(const Address& a) { *impl = *a.impl; return *this; } diff --git a/qpid/cpp/src/qpid/messaging/AddressImpl.h b/qpid/cpp/src/qpid/messaging/AddressImpl.h new file mode 100644 index 0000000000..e35aa7fef2 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/AddressImpl.h @@ -0,0 +1,44 @@ +#ifndef QPID_MESSAGING_ADDRESSIMPL_H +#define QPID_MESSAGING_ADDRESSIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/types/Variant.h" +namespace qpid { +namespace messaging { + +class AddressImpl +{ + public: + std::string name; + std::string subject; + qpid::types::Variant::Map options; + bool temporary; + + AddressImpl() : temporary(false) {} + AddressImpl(const std::string& n, const std::string& s, const qpid::types::Variant::Map& o) : + name(n), subject(s), options(o), temporary(false) {} + static void setTemporary(Address& a, bool value) { a.impl->temporary = value; } + static bool isTemporary(const Address& a) { return a.impl->temporary; } +}; +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_ADDRESSIMPL_H*/ diff --git a/qpid/cpp/src/qpid/messaging/AddressParser.cpp b/qpid/cpp/src/qpid/messaging/AddressParser.cpp index 67249e188e..882deba463 100644 --- a/qpid/cpp/src/qpid/messaging/AddressParser.cpp +++ b/qpid/cpp/src/qpid/messaging/AddressParser.cpp @@ -19,6 +19,7 @@ * */ #include "AddressParser.h" +#include "AddressImpl.h" #include "qpid/framing/Uuid.h" #include <boost/format.hpp> @@ -38,7 +39,10 @@ bool AddressParser::parse(Address& address) { std::string name; if (readName(name)) { - if (name.find('#') == 0) name = qpid::framing::Uuid(true).str() + name; + if (name.find('#') == 0) { + name = qpid::framing::Uuid(true).str() + name; + AddressImpl::setTemporary(address, true); + } address.setName(name); if (readChar('/')) { std::string subject; diff --git a/qpid/cpp/src/qpid/messaging/Receiver.cpp b/qpid/cpp/src/qpid/messaging/Receiver.cpp index 78e0c5daa3..c45ebd6760 100644 --- a/qpid/cpp/src/qpid/messaging/Receiver.cpp +++ b/qpid/cpp/src/qpid/messaging/Receiver.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Address.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/ReceiverImpl.h" #include "qpid/messaging/Session.h" @@ -45,4 +46,5 @@ void Receiver::close() { impl->close(); } const std::string& Receiver::getName() const { return impl->getName(); } Session Receiver::getSession() const { return impl->getSession(); } bool Receiver::isClosed() const { return impl->isClosed(); } +Address Receiver::getAddress() const { return impl->getAddress(); } }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h index e450693d2c..59ccc3214e 100644 --- a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h +++ b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h @@ -27,6 +27,7 @@ namespace qpid { namespace messaging { +class Address; class Duration; class Message; class MessageListener; @@ -48,6 +49,7 @@ class ReceiverImpl : public virtual qpid::RefCounted virtual const std::string& getName() const = 0; virtual Session getSession() const = 0; virtual bool isClosed() const = 0; + virtual Address getAddress() const = 0; }; }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/Sender.cpp b/qpid/cpp/src/qpid/messaging/Sender.cpp index 53dbb69777..a60de3d606 100644 --- a/qpid/cpp/src/qpid/messaging/Sender.cpp +++ b/qpid/cpp/src/qpid/messaging/Sender.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/messaging/Sender.h" +#include "qpid/messaging/Address.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/SenderImpl.h" #include "qpid/messaging/Session.h" @@ -40,5 +41,5 @@ uint32_t Sender::getUnsettled() { return impl->getUnsettled(); } uint32_t Sender::getAvailable() { return getCapacity() - getUnsettled(); } const std::string& Sender::getName() const { return impl->getName(); } Session Sender::getSession() const { return impl->getSession(); } - +Address Sender::getAddress() const { return impl->getAddress(); } }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/SenderImpl.h b/qpid/cpp/src/qpid/messaging/SenderImpl.h index d978463fdb..91fd9b1536 100644 --- a/qpid/cpp/src/qpid/messaging/SenderImpl.h +++ b/qpid/cpp/src/qpid/messaging/SenderImpl.h @@ -27,6 +27,7 @@ namespace qpid { namespace messaging { +class Address; class Message; class Session; @@ -41,6 +42,7 @@ class SenderImpl : public virtual qpid::RefCounted virtual uint32_t getUnsettled() = 0; virtual const std::string& getName() const = 0; virtual Session getSession() const = 0; + virtual Address getAddress() const = 0; private: }; }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 359660dce5..a46606a526 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -20,6 +20,7 @@ */ #include "qpid/messaging/amqp/AddressHelper.h" #include "qpid/messaging/Address.h" +#include "qpid/log/Statement.h" #include <vector> #include <boost/assign.hpp> extern "C" { @@ -56,7 +57,9 @@ const std::string MOVE("move"); const std::string COPY("copy"); const std::string SUPPORTED_DIST_MODES("supported-dist-modes"); +const std::string CREATE_ON_DEMAND("create-on-demand"); +const std::string DUMMY("."); const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER); const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER); @@ -155,26 +158,33 @@ const qpid::types::Variant::Map& AddressHelper::getLinkProperties() const return link; } -void AddressHelper::setNodeProperties(pn_terminus_t* terminus) +void AddressHelper::setNodeProperties(pn_terminus_t* terminus, bool dynamic) { - pn_terminus_set_dynamic(terminus, true); + if (dynamic) { + pn_terminus_set_address(terminus, DUMMY.c_str());//Workaround for proton bug + pn_terminus_set_dynamic(terminus, true); + } else { + pn_data_t* capabilities = pn_terminus_capabilities(terminus); + if (!capabilities) { + QPID_LOG(error, "!!!No capabilities!!!"); + } + pn_data_put_symbol(capabilities, convert(CREATE_ON_DEMAND)); + } //properties for dynamically created node: - pn_data_t* data = pn_terminus_properties(terminus); if (node.size()) { + pn_data_t* data = pn_terminus_properties(terminus); pn_data_put_map(data); pn_data_enter(data); - } - for (qpid::types::Variant::Map::const_iterator i = node.begin(); i != node.end(); ++i) { - if (i->first == TYPE) { - pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); - pn_data_put_string(data, convert(i->second == TOPIC ? COPY : MOVE)); - } else { - pn_data_put_symbol(data, convert(i->first)); - pn_data_put_string(data, convert(i->second.asString())); + for (qpid::types::Variant::Map::const_iterator i = node.begin(); i != node.end(); ++i) { + if (i->first == TYPE) { + pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); + pn_data_put_string(data, convert(i->second == TOPIC ? COPY : MOVE)); + } else { + pn_data_put_symbol(data, convert(i->first)); + pn_data_put_string(data, convert(i->second.asString())); + } } - } - if (node.size()) { pn_data_exit(data); } } diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h index cd0aa1be9e..2442619ed3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -40,7 +40,7 @@ class AddressHelper bool deleteEnabled(CheckMode mode) const; bool assertEnabled(CheckMode mode) const; - void setNodeProperties(pn_terminus_t*); + void setNodeProperties(pn_terminus_t*, bool dynamic); const qpid::types::Variant::Map& getNodeProperties() const; const qpid::types::Variant::Map& getLinkProperties() const; private: @@ -49,6 +49,7 @@ class AddressHelper std::string deletePolicy; qpid::types::Variant::Map node; qpid::types::Variant::Map link; + std::string name; bool enabled(const std::string& policy, CheckMode mode) const; }; diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index c74ee01898..9febe66f7e 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -26,6 +26,7 @@ #include "SessionContext.h" #include "Transport.h" #include "qpid/messaging/exceptions.h" +#include "qpid/messaging/AddressImpl.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" @@ -285,34 +286,45 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha { lnk->configure(); attach(ssn->session, (pn_link_t*) lnk->sender); - if (!pn_link_remote_target((pn_link_t*) lnk->sender)) { + pn_terminus_t* t = pn_link_remote_target(lnk->sender); + if (!pn_terminus_get_address(t)) { std::string msg("No such target : "); msg += lnk->getTarget(); + QPID_LOG(debug, msg); throw qpid::messaging::NotFound(msg); + } else if (AddressImpl::isTemporary(lnk->address)) { + lnk->address.setName(pn_terminus_get_address(t)); + QPID_LOG(debug, "Dynamic target name set to " << lnk->address.getName()); } + QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget()); } void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) { lnk->configure(); attach(ssn->session, lnk->receiver, lnk->capacity); - if (!pn_link_remote_source(lnk->receiver)) { + pn_terminus_t* s = pn_link_remote_source(lnk->receiver); + if (!pn_terminus_get_address(s)) { std::string msg("No such source : "); msg += lnk->getSource(); + QPID_LOG(debug, msg); throw qpid::messaging::NotFound(msg); + } else if (AddressImpl::isTemporary(lnk->address)) { + lnk->address.setName(pn_terminus_get_address(s)); + QPID_LOG(debug, "Dynamic source name set to " << lnk->address.getName()); } + QPID_LOG(debug, "Attach succeeded from " << lnk->getSource()); } void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int credit) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - QPID_LOG(debug, "Attaching link " << link << ", state=" << pn_link_state(link)); pn_link_open(link); - QPID_LOG(debug, "Link attached " << link << ", state=" << pn_link_state(link)); + QPID_LOG(debug, "Link attach sent for " << link << ", state=" << pn_link_state(link)); if (credit) pn_link_flow(link, credit); wakeupDriver(); while (pn_link_state(link) & PN_REMOTE_UNINIT) { - QPID_LOG(debug, "waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link)); + QPID_LOG(debug, "Waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link) << "..."); wait(); } } diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index e5f6738b61..f7b06ddc05 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -20,9 +20,11 @@ */ #include "qpid/messaging/amqp/ReceiverContext.h" #include "qpid/messaging/amqp/AddressHelper.h" +#include "qpid/messaging/AddressImpl.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/Message.h" #include "qpid/amqp/descriptors.h" +#include "qpid/log/Statement.h" extern "C" { #include <proton/engine.h> } @@ -38,7 +40,7 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co capacity(0) {} ReceiverContext::~ReceiverContext() { - pn_link_free(receiver); + //pn_link_free(receiver); } void ReceiverContext::setCapacity(uint32_t c) @@ -76,7 +78,7 @@ uint32_t ReceiverContext::getUnsettled() void ReceiverContext::close() { - + pn_link_close(receiver); } const std::string& ReceiverContext::getName() const @@ -113,11 +115,18 @@ void ReceiverContext::configure() const } void ReceiverContext::configure(pn_terminus_t* source) const { - pn_terminus_set_address(source, address.getName().c_str()); //dynamic create: AddressHelper helper(address); - if (helper.createEnabled(AddressHelper::FOR_RECEIVER)) { - helper.setNodeProperties(source); + if (AddressImpl::isTemporary(address)) { + //application expects a name to be generated + QPID_LOG(debug, "source is dynamic"); + helper.setNodeProperties(source, true); + } else { + pn_terminus_set_address(source, address.getName().c_str()); + if (helper.createEnabled(AddressHelper::FOR_RECEIVER)) { + //application expects name of node to be as specified + helper.setNodeProperties(source, false); + } } // Look specifically for qpid.selector link property and add a filter for it @@ -149,6 +158,11 @@ void ReceiverContext::configure(pn_terminus_t* source) const } } +Address ReceiverContext::getAddress() const +{ + return address; +} + bool ReceiverContext::isClosed() const { return false;//TODO diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h index 34ecdda6be..9c5386157b 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -55,10 +55,11 @@ class ReceiverContext const std::string& getSource() const; bool isClosed() const; void configure() const; + Address getAddress() const; private: friend class ConnectionContext; const std::string name; - const Address address; + Address address; pn_link_t* receiver; uint32_t capacity; void configure(pn_terminus_t*) const; diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp index 9bf64ebb8d..c601d05ed0 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp @@ -103,4 +103,9 @@ bool ReceiverHandle::isClosed() const return receiver->isClosed(); } +Address ReceiverHandle::getAddress() const +{ + return receiver->getAddress(); +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h index a1a6f26025..08a95fb585 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h @@ -53,6 +53,7 @@ class ReceiverHandle : public qpid::messaging::ReceiverImpl const std::string& getName() const; qpid::messaging::Session getSession() const; bool isClosed() const; + Address getAddress() const; private: boost::shared_ptr<ConnectionContext> connection; boost::shared_ptr<SessionContext> session; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 96c4437b89..fe74a4bca8 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -21,6 +21,7 @@ #include "qpid/messaging/amqp/SenderContext.h" #include "qpid/messaging/amqp/EncodedMessage.h" #include "qpid/messaging/amqp/AddressHelper.h" +#include "qpid/messaging/AddressImpl.h" #include "qpid/amqp/descriptors.h" #include "qpid/amqp/MessageEncoder.h" #include "qpid/messaging/exceptions.h" @@ -44,12 +45,12 @@ SenderContext::SenderContext(pn_session_t* session, const std::string& n, const SenderContext::~SenderContext() { - pn_link_free(sender); + //pn_link_free(sender); } void SenderContext::close() { - + pn_link_close(sender); } void SenderContext::setCapacity(uint32_t c) @@ -347,11 +348,16 @@ void SenderContext::configure() const } void SenderContext::configure(pn_terminus_t* target) const { - pn_terminus_set_address(target, address.getName().c_str()); - //dynamic create: AddressHelper helper(address); - if (helper.createEnabled(AddressHelper::FOR_SENDER)) { - helper.setNodeProperties(target); + if (AddressImpl::isTemporary(address)) { + //application expects a name to be generated + helper.setNodeProperties(target, true); + } else { + pn_terminus_set_address(target, address.getName().c_str()); + if (helper.createEnabled(AddressHelper::FOR_SENDER)) { + //application expects name of node to be as specified + helper.setNodeProperties(target, false); + } } } @@ -360,4 +366,9 @@ bool SenderContext::settled() return processUnsettled() == 0; } +Address SenderContext::getAddress() const +{ + return address; +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h index 3595379e70..2969e75a16 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -71,12 +71,13 @@ class SenderContext Delivery* send(const qpid::messaging::Message& message); void configure() const; bool settled(); + Address getAddress() const; private: friend class ConnectionContext; typedef std::deque<Delivery> Deliveries; const std::string name; - const qpid::messaging::Address address; + qpid::messaging::Address address; pn_link_t* sender; int32_t nextId; Deliveries deliveries; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp index b7168e5b31..4e258e7b38 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp @@ -72,4 +72,9 @@ qpid::messaging::Session SenderHandle::getSession() const return qpid::messaging::Session(new SessionHandle(connection, session)); } +Address SenderHandle::getAddress() const +{ + return sender->getAddress(); +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h index 3c6b666582..fab158c1ef 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h @@ -48,6 +48,7 @@ class SenderHandle : public qpid::messaging::SenderImpl uint32_t getUnsettled(); const std::string& getName() const; Session getSession() const; + Address getAddress() const; private: boost::shared_ptr<ConnectionContext> connection; boost::shared_ptr<SessionContext> session; |