diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Session.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 38 |
1 files changed, 27 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index bb94a37398..544616b897 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -56,7 +56,7 @@ namespace amqp { class IncomingToQueue : public DecodingIncoming { public: - IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : DecodingIncoming(l, b, p, q->getName(), pn_link_name(l)), queue(q) {} + IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, q->getName(), pn_link_name(l)), queue(q) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr<qpid::broker::Queue> queue; @@ -65,7 +65,7 @@ class IncomingToQueue : public DecodingIncoming class IncomingToExchange : public DecodingIncoming { public: - IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : DecodingIncoming(l, b, p, e->getName(), pn_link_name(l)), exchange(e) {} + IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr<qpid::broker::Exchange> exchange; @@ -146,15 +146,22 @@ void Session::attach(pn_link_t* link) void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name) { ResolvedNode node = resolve(name, target, true); - + const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link)); + if (!sourceAddress) { + sourceAddress = pn_terminus_get_address(pn_link_source(link)); + } + std::string source; + if (sourceAddress) { + source = sourceAddress; + } if (node.queue) { - boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link)); + boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link, source)); incoming[link] = q; } else if (node.exchange) { - boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link)); + boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link, source)); incoming[link] = e; } else if (node.relay) { - boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, name, pn_link_name(link), node.relay)); + boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, source, name, pn_link_name(link), node.relay)); incoming[link] = in; } else { pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); @@ -168,9 +175,18 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s ResolvedNode node = resolve(name, source, false); Filter filter; filter.read(pn_terminus_filter(source)); + const char* targetAddress = pn_terminus_get_address(pn_link_remote_target(link)); + if (!targetAddress) { + targetAddress = pn_terminus_get_address(pn_link_target(link)); + } + std::string target; + if (targetAddress) { + target = targetAddress; + } + if (node.queue) { - boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, node.queue, link, *this, out, false)); + boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false)); q->init(); if (filter.hasSubjectFilter()) { q->setSubjectFilter(filter.getSubjectFilter()); @@ -194,11 +210,11 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s } else { throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/ } - boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, queue, link, *this, out, true)); + boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, true)); outgoing[link] = q; q->init(); } else if (node.relay) { - boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, pn_link_name(link), node.relay)); + boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, target, pn_link_name(link), node.relay)); outgoing[link] = out; out->init(); } else { @@ -220,11 +236,11 @@ void Session::attach(pn_link_t* link, const std::string& src, const std::string& if (relay) { if (pn_link_is_sender(link)) { - boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, pn_link_name(link), relay)); + boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, tgt, pn_link_name(link), relay)); outgoing[link] = out; out->init(); } else { - boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, tgt, pn_link_name(link), relay)); + boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, src, tgt, pn_link_name(link), relay)); incoming[link] = in; } } else { |