diff options
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Domain.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Domain.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Incoming.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Incoming.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Relay.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Relay.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 38 |
15 files changed, 63 insertions, 38 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 1f70cac2cc..098ffe34c4 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -895,7 +895,9 @@ void Broker::deleteObject(const std::string& type, const std::string& name, connectionId = context->getUrl(); } QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")"); - if (type == TYPE_QUEUE) { + if (objectFactory.deleteObject(*this, type, name, options, userId, connectionId)) { + QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ") handled by registered factory"); + } else if (type == TYPE_QUEUE) { // extract ifEmpty and ifUnused from options bool ifUnused = false, ifEmpty = false; for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) { diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp index 4b13bfc871..eb103c3e53 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp @@ -229,6 +229,11 @@ Domain::Domain(const std::string& n, const qpid::types::Variant::Map& properties } } +Domain::~Domain() +{ + if (domain != 0) domain->resourceDestroy(); +} + boost::shared_ptr<qpid::management::ManagementObject> Domain::GetManagementObject() const { return domain; diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.h b/qpid/cpp/src/qpid/broker/amqp/Domain.h index ccbee6341e..d23b3308fe 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Domain.h +++ b/qpid/cpp/src/qpid/broker/amqp/Domain.h @@ -49,6 +49,7 @@ class Domain : public qpid::management::Manageable { public: Domain(const std::string& name, const qpid::types::Variant::Map& properties, Broker&); + ~Domain(); void connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Interconnects&); void connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, Interconnects&, boost::shared_ptr<Relay>); std::auto_ptr<qpid::Sasl> sasl(const std::string& hostname); diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp index 9616b267bd..14614b0b87 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -27,8 +27,8 @@ namespace qpid { namespace broker { namespace amqp { -Incoming::Incoming(pn_link_t* l, Broker& broker, Session& parent, const std::string& target, const std::string& name) - : ManagedIncomingLink(broker, parent, target, name), credit(100), window(0), link(l), session(parent) {} +Incoming::Incoming(pn_link_t* l, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name) + : ManagedIncomingLink(broker, parent, source, target, name), credit(100), window(0), link(l), session(parent) {} Incoming::~Incoming() {} @@ -77,8 +77,8 @@ namespace { }; } -DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session& parent, const std::string& target, const std::string& name) - : Incoming(link, broker, parent, target, name), session(parent.shared_from_this()) {} +DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name) + : Incoming(link, broker, parent, source, target, name), session(parent.shared_from_this()) {} DecodingIncoming::~DecodingIncoming() {} void DecodingIncoming::readable(pn_delivery_t* delivery) diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h index cab023c7c1..a7c706aed9 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h @@ -37,7 +37,7 @@ class Session; class Incoming : public ManagedIncomingLink { public: - Incoming(pn_link_t*, Broker& broker, Session& parent, const std::string& target, const std::string& name); + Incoming(pn_link_t*, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name); virtual ~Incoming(); virtual bool doWork();//do anything that requires output virtual bool haveWork();//called when handling input to see whether any output work is needed @@ -55,7 +55,7 @@ class Incoming : public ManagedIncomingLink class DecodingIncoming : public Incoming { public: - DecodingIncoming(pn_link_t*, Broker& broker, Session& parent, const std::string& target, const std::string& name); + DecodingIncoming(pn_link_t*, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name); virtual ~DecodingIncoming(); void readable(pn_delivery_t* delivery); virtual void handle(qpid::broker::Message&) = 0; diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp index cbdf0da3ef..8b0891ef3a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp @@ -88,7 +88,7 @@ bool Interconnects::deleteObject(Broker&, const std::string& type, const std::st domains.erase(i); return true; } else { - return false; + throw qpid::Exception(QPID_MSG("No such domain: " << name)); } } else if (type == INCOMING_TYPE || type == OUTGOING_TYPE) { boost::shared_ptr<Interconnect> interconnect; diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp index 8817e410ce..c9875c457b 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp @@ -30,12 +30,12 @@ namespace qpid { namespace broker { namespace amqp { -ManagedIncomingLink::ManagedIncomingLink(Broker& broker, ManagedSession& p, const std::string& target, const std::string& _name) +ManagedIncomingLink::ManagedIncomingLink(Broker& broker, ManagedSession& p, const std::string& source, const std::string& target, const std::string& _name) : parent(p), name(_name) { qpid::management::ManagementAgent* agent = broker.getManagementAgent(); if (agent) { - incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, target, _name)); + incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, source, target, _name)); agent->addObject(incoming); } } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h index 5b7db7997c..5d4b1409d4 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h @@ -36,7 +36,7 @@ class ManagedSession; class ManagedIncomingLink : public qpid::management::Manageable { public: - ManagedIncomingLink(Broker& broker, ManagedSession& parent, const std::string& target, const std::string& name); + ManagedIncomingLink(Broker& broker, ManagedSession& parent, const std::string& source, const std::string& target, const std::string& name); virtual ~ManagedIncomingLink(); qpid::management::ManagementObject::shared_ptr GetManagementObject() const; void incomingMessageReceived(); diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp index 53e49d2bca..4c5f7fc1fc 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp @@ -30,12 +30,12 @@ namespace qpid { namespace broker { namespace amqp { -ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, ManagedSession& p, const std::string& source, const std::string& _name) +ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, ManagedSession& p, const std::string& source, const std::string& target, const std::string& _name) : parent(p), name(_name) { qpid::management::ManagementAgent* agent = broker.getManagementAgent(); if (agent) { - outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, source, _name)); + outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, source, target, _name)); agent->addObject(outgoing); } } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h index 61d0b9c3a0..f6415bff38 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h @@ -36,7 +36,7 @@ class ManagedSession; class ManagedOutgoingLink : public qpid::management::Manageable { public: - ManagedOutgoingLink(Broker& broker, ManagedSession& parent, const std::string& source, const std::string& name); + ManagedOutgoingLink(Broker& broker, ManagedSession& parent, const std::string& source, const std::string& target, const std::string& name); virtual ~ManagedOutgoingLink(); qpid::management::ManagementObject::shared_ptr GetManagementObject() const; void outgoingMessageSent(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index e531e8cd20..7e98195a60 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -33,15 +33,16 @@ namespace qpid { namespace broker { namespace amqp { -Outgoing::Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& name) : ManagedOutgoingLink(broker, parent, source, name), session(parent) {} +Outgoing::Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name) + : ManagedOutgoingLink(broker, parent, source, target, name), session(parent) {} void Outgoing::wakeup() { session.wakeup(); } -OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool topic) - : Outgoing(broker, session, source, pn_link_name(l)), +OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool topic) + : Outgoing(broker, session, source, target, pn_link_name(l)), Consumer(pn_link_name(l), /*FIXME*/CONSUMER), exclusive(topic), queue(q), deliveries(5000), link(l), out(o), diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index b8a689b8f8..b70faa1385 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -59,7 +59,7 @@ class CircularArray class Outgoing : public ManagedOutgoingLink { public: - Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& name); + Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name); virtual void setSubjectFilter(const std::string&) = 0; virtual void setSelectorFilter(const std::string&) = 0; virtual void init() = 0; @@ -88,7 +88,7 @@ class Outgoing : public ManagedOutgoingLink class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue> { public: - OutgoingFromQueue(Broker&, const std::string& source, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool topic); + OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool topic); void setSubjectFilter(const std::string&); void setSelectorFilter(const std::string&); void init(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp index 48a629a66e..70d12455cd 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp @@ -110,8 +110,8 @@ void Relay::detached(Incoming*) } OutgoingFromRelay::OutgoingFromRelay(pn_link_t* l, Broker& broker, Session& parent, const std::string& source, - const std::string& name_, boost::shared_ptr<Relay> r) - : Outgoing(broker, parent, source, name_), name(name_), link(l), relay(r) {} + const std::string& target, const std::string& name_, boost::shared_ptr<Relay> r) + : Outgoing(broker, parent, source, target, name_), name(name_), link(l), relay(r) {} /** * Allows the link to initiate any outgoing transfers */ @@ -173,9 +173,9 @@ void OutgoingFromRelay::setSelectorFilter(const std::string&) //TODO } -IncomingToRelay::IncomingToRelay(pn_link_t* link, Broker& broker, Session& parent, const std::string& target, - const std::string& name, boost::shared_ptr<Relay> r) - : Incoming(link, broker, parent, target, name), relay(r) +IncomingToRelay::IncomingToRelay(pn_link_t* link, Broker& broker, Session& parent, const std::string& source, + const std::string& target, const std::string& name, boost::shared_ptr<Relay> r) + : Incoming(link, broker, parent, source, target, name), relay(r) { relay->attached(this); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.h b/qpid/cpp/src/qpid/broker/amqp/Relay.h index 0b20b563d4..19e0c2e3fe 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Relay.h +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.h @@ -95,7 +95,7 @@ class OutgoingFromRelay : public Outgoing { public: OutgoingFromRelay(pn_link_t*, Broker&, Session&, const std::string& source, - const std::string& name, boost::shared_ptr<Relay>); + const std::string& target, const std::string& name, boost::shared_ptr<Relay>); bool doWork(); void handle(pn_delivery_t* delivery); void detached(); @@ -111,8 +111,8 @@ class OutgoingFromRelay : public Outgoing class IncomingToRelay : public Incoming { public: - IncomingToRelay(pn_link_t*, Broker&, Session&, const std::string& target, - const std::string& name, boost::shared_ptr<Relay> r); + IncomingToRelay(pn_link_t*, Broker&, Session&, const std::string& source, + const std::string& target, const std::string& name, boost::shared_ptr<Relay> r); bool settle(); bool doWork(); bool haveWork(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index bb94a37398..544616b897 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -56,7 +56,7 @@ namespace amqp { class IncomingToQueue : public DecodingIncoming { public: - IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : DecodingIncoming(l, b, p, q->getName(), pn_link_name(l)), queue(q) {} + IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, q->getName(), pn_link_name(l)), queue(q) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr<qpid::broker::Queue> queue; @@ -65,7 +65,7 @@ class IncomingToQueue : public DecodingIncoming class IncomingToExchange : public DecodingIncoming { public: - IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : DecodingIncoming(l, b, p, e->getName(), pn_link_name(l)), exchange(e) {} + IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr<qpid::broker::Exchange> exchange; @@ -146,15 +146,22 @@ void Session::attach(pn_link_t* link) void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name) { ResolvedNode node = resolve(name, target, true); - + const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link)); + if (!sourceAddress) { + sourceAddress = pn_terminus_get_address(pn_link_source(link)); + } + std::string source; + if (sourceAddress) { + source = sourceAddress; + } if (node.queue) { - boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link)); + boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link, source)); incoming[link] = q; } else if (node.exchange) { - boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link)); + boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link, source)); incoming[link] = e; } else if (node.relay) { - boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, name, pn_link_name(link), node.relay)); + boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, source, name, pn_link_name(link), node.relay)); incoming[link] = in; } else { pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); @@ -168,9 +175,18 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s ResolvedNode node = resolve(name, source, false); Filter filter; filter.read(pn_terminus_filter(source)); + const char* targetAddress = pn_terminus_get_address(pn_link_remote_target(link)); + if (!targetAddress) { + targetAddress = pn_terminus_get_address(pn_link_target(link)); + } + std::string target; + if (targetAddress) { + target = targetAddress; + } + if (node.queue) { - boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, node.queue, link, *this, out, false)); + boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false)); q->init(); if (filter.hasSubjectFilter()) { q->setSubjectFilter(filter.getSubjectFilter()); @@ -194,11 +210,11 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s } else { throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/ } - boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, queue, link, *this, out, true)); + boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, true)); outgoing[link] = q; q->init(); } else if (node.relay) { - boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, pn_link_name(link), node.relay)); + boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, target, pn_link_name(link), node.relay)); outgoing[link] = out; out->init(); } else { @@ -220,11 +236,11 @@ void Session::attach(pn_link_t* link, const std::string& src, const std::string& if (relay) { if (pn_link_is_sender(link)) { - boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, pn_link_name(link), relay)); + boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, tgt, pn_link_name(link), relay)); outgoing[link] = out; out->init(); } else { - boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, tgt, pn_link_name(link), relay)); + boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, src, tgt, pn_link_name(link), relay)); incoming[link] = in; } } else { |