diff options
author | Gordon Sim <gsim@apache.org> | 2013-06-12 17:56:20 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2013-06-12 17:56:20 +0000 |
commit | f612854cf71573bf934248e4b862620be6e32715 (patch) | |
tree | 51842fcd6836ec5f07e0d25de4d9e5c408374337 | |
parent | 4d4276904ea9fa45d36ea8b44b08b60c1816f3d7 (diff) | |
download | qpid-python-f612854cf71573bf934248e4b862620be6e32715.tar.gz |
QPID-4917: allow shared topic subscriptions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1492311 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/amqp/Outgoing.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Outgoing.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Session.cpp | 23 |
3 files changed, 22 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/amqp/Outgoing.cpp b/cpp/src/qpid/broker/amqp/Outgoing.cpp index 3b0836a453..7dbafb2fd1 100644 --- a/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -41,10 +41,10 @@ void Outgoing::wakeup() session.wakeup(); } -OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool topic) +OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool e) : Outgoing(broker, session, source, target, pn_link_name(l)), Consumer(pn_link_name(l), /*FIXME*/CONSUMER), - exclusive(topic), + exclusive(e), queue(q), deliveries(5000), link(l), out(o), current(0), outstanding(0), buffer(1024)/*used only for header at present*/ diff --git a/cpp/src/qpid/broker/amqp/Outgoing.h b/cpp/src/qpid/broker/amqp/Outgoing.h index b70faa1385..a63a8cc0a6 100644 --- a/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/cpp/src/qpid/broker/amqp/Outgoing.h @@ -88,7 +88,7 @@ class Outgoing : public ManagedOutgoingLink class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue> { public: - OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool topic); + OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool exclusive); void setSubjectFilter(const std::string&); void setSelectorFilter(const std::string&); void init(); diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp index 8d07a23499..d1bab7f775 100644 --- a/cpp/src/qpid/broker/amqp/Session.cpp +++ b/cpp/src/qpid/broker/amqp/Session.cpp @@ -55,6 +55,7 @@ namespace amqp { namespace { bool is_capability_requested(const std::string& name, pn_data_t* capabilities) { + pn_data_rewind(capabilities); while (pn_data_next(capabilities)) { pn_bytes_t c = pn_data_get_symbol(capabilities); std::string s(c.start, c.size); @@ -69,9 +70,11 @@ const std::string QUEUE("queue"); const std::string TOPIC("topic"); const std::string DIRECT_FILTER("legacy-amqp-direct-binding"); const std::string TOPIC_FILTER("legacy-amqp-topic-binding"); +const std::string SHARED("shared"); void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node) { + pn_data_rewind(in); while (pn_data_next(in)) { pn_bytes_t c = pn_data_get_symbol(in); std::string s(c.start, c.size); @@ -85,11 +88,14 @@ void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> nod void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node) { + pn_data_rewind(in); while (pn_data_next(in)) { pn_bytes_t c = pn_data_get_symbol(in); std::string s(c.start, c.size); if (s == DURABLE) { if (node->isDurable()) pn_data_put_symbol(out, c); + } else if (s == SHARED) { + pn_data_put_symbol(out, c); } else if (s == CREATE_ON_DEMAND || s == TOPIC) { pn_data_put_symbol(out, c); } else if (s == DIRECT_FILTER) { @@ -281,18 +287,27 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s filter.apply(q); outgoing[link] = q; } else if (node.exchange) { + bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source)); bool durable = pn_terminus_get_durability(source); QueueSettings settings(durable, !durable); filter.configure(settings); //TODO: populate settings from source details when available from engine - std::stringstream queueName;//combination of container id and link name is unique - queueName << connection.getContainerId() << "_" << pn_link_name(link); + std::stringstream queueName; + if (shared) { + //just use link name (TODO: could allow this to be + //overridden when acces to link properties is provided + //(PROTON-335)) + queueName << pn_link_name(link); + } else { + //combination of container id and link name is unique + queueName << connection.getContainerId() << "_" << pn_link_name(link); + } boost::shared_ptr<qpid::broker::Queue> queue = broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(), connection.getId()).first; - queue->setExclusiveOwner(this); + if (!shared) queue->setExclusiveOwner(this); filter.bind(node.exchange, queue); - boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, true)); + boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, !shared)); outgoing[link] = q; q->init(); } else if (node.relay) { |