summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-06-12 17:56:20 +0000
committerGordon Sim <gsim@apache.org>2013-06-12 17:56:20 +0000
commitf612854cf71573bf934248e4b862620be6e32715 (patch)
tree51842fcd6836ec5f07e0d25de4d9e5c408374337
parent4d4276904ea9fa45d36ea8b44b08b60c1816f3d7 (diff)
downloadqpid-python-f612854cf71573bf934248e4b862620be6e32715.tar.gz
QPID-4917: allow shared topic subscriptions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1492311 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/amqp/Outgoing.cpp4
-rw-r--r--cpp/src/qpid/broker/amqp/Outgoing.h2
-rw-r--r--cpp/src/qpid/broker/amqp/Session.cpp23
3 files changed, 22 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/amqp/Outgoing.cpp b/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 3b0836a453..7dbafb2fd1 100644
--- a/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -41,10 +41,10 @@ void Outgoing::wakeup()
session.wakeup();
}
-OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool topic)
+OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool e)
: Outgoing(broker, session, source, target, pn_link_name(l)),
Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
- exclusive(topic),
+ exclusive(e),
queue(q), deliveries(5000), link(l), out(o),
current(0), outstanding(0),
buffer(1024)/*used only for header at present*/
diff --git a/cpp/src/qpid/broker/amqp/Outgoing.h b/cpp/src/qpid/broker/amqp/Outgoing.h
index b70faa1385..a63a8cc0a6 100644
--- a/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -88,7 +88,7 @@ class Outgoing : public ManagedOutgoingLink
class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue>
{
public:
- OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool topic);
+ OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool exclusive);
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
void init();
diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp
index 8d07a23499..d1bab7f775 100644
--- a/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/cpp/src/qpid/broker/amqp/Session.cpp
@@ -55,6 +55,7 @@ namespace amqp {
namespace {
bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
{
+ pn_data_rewind(capabilities);
while (pn_data_next(capabilities)) {
pn_bytes_t c = pn_data_get_symbol(capabilities);
std::string s(c.start, c.size);
@@ -69,9 +70,11 @@ const std::string QUEUE("queue");
const std::string TOPIC("topic");
const std::string DIRECT_FILTER("legacy-amqp-direct-binding");
const std::string TOPIC_FILTER("legacy-amqp-topic-binding");
+const std::string SHARED("shared");
void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
{
+ pn_data_rewind(in);
while (pn_data_next(in)) {
pn_bytes_t c = pn_data_get_symbol(in);
std::string s(c.start, c.size);
@@ -85,11 +88,14 @@ void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> nod
void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
{
+ pn_data_rewind(in);
while (pn_data_next(in)) {
pn_bytes_t c = pn_data_get_symbol(in);
std::string s(c.start, c.size);
if (s == DURABLE) {
if (node->isDurable()) pn_data_put_symbol(out, c);
+ } else if (s == SHARED) {
+ pn_data_put_symbol(out, c);
} else if (s == CREATE_ON_DEMAND || s == TOPIC) {
pn_data_put_symbol(out, c);
} else if (s == DIRECT_FILTER) {
@@ -281,18 +287,27 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
filter.apply(q);
outgoing[link] = q;
} else if (node.exchange) {
+ bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
bool durable = pn_terminus_get_durability(source);
QueueSettings settings(durable, !durable);
filter.configure(settings);
//TODO: populate settings from source details when available from engine
- std::stringstream queueName;//combination of container id and link name is unique
- queueName << connection.getContainerId() << "_" << pn_link_name(link);
+ std::stringstream queueName;
+ if (shared) {
+ //just use link name (TODO: could allow this to be
+ //overridden when acces to link properties is provided
+ //(PROTON-335))
+ queueName << pn_link_name(link);
+ } else {
+ //combination of container id and link name is unique
+ queueName << connection.getContainerId() << "_" << pn_link_name(link);
+ }
boost::shared_ptr<qpid::broker::Queue> queue
= broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(), connection.getId()).first;
- queue->setExclusiveOwner(this);
+ if (!shared) queue->setExclusiveOwner(this);
filter.bind(node.exchange, queue);
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, true));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, !shared));
outgoing[link] = q;
q->init();
} else if (node.relay) {