summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/amqp/Session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Session.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp38
1 files changed, 27 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index bb94a37398..544616b897 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -56,7 +56,7 @@ namespace amqp {
class IncomingToQueue : public DecodingIncoming
{
public:
- IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : DecodingIncoming(l, b, p, q->getName(), pn_link_name(l)), queue(q) {}
+ IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, q->getName(), pn_link_name(l)), queue(q) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Queue> queue;
@@ -65,7 +65,7 @@ class IncomingToQueue : public DecodingIncoming
class IncomingToExchange : public DecodingIncoming
{
public:
- IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : DecodingIncoming(l, b, p, e->getName(), pn_link_name(l)), exchange(e) {}
+ IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Exchange> exchange;
@@ -146,15 +146,22 @@ void Session::attach(pn_link_t* link)
void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name)
{
ResolvedNode node = resolve(name, target, true);
-
+ const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link));
+ if (!sourceAddress) {
+ sourceAddress = pn_terminus_get_address(pn_link_source(link));
+ }
+ std::string source;
+ if (sourceAddress) {
+ source = sourceAddress;
+ }
if (node.queue) {
- boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link));
+ boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link, source));
incoming[link] = q;
} else if (node.exchange) {
- boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link));
+ boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link, source));
incoming[link] = e;
} else if (node.relay) {
- boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, name, pn_link_name(link), node.relay));
+ boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, source, name, pn_link_name(link), node.relay));
incoming[link] = in;
} else {
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
@@ -168,9 +175,18 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
ResolvedNode node = resolve(name, source, false);
Filter filter;
filter.read(pn_terminus_filter(source));
+ const char* targetAddress = pn_terminus_get_address(pn_link_remote_target(link));
+ if (!targetAddress) {
+ targetAddress = pn_terminus_get_address(pn_link_target(link));
+ }
+ std::string target;
+ if (targetAddress) {
+ target = targetAddress;
+ }
+
if (node.queue) {
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, node.queue, link, *this, out, false));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false));
q->init();
if (filter.hasSubjectFilter()) {
q->setSubjectFilter(filter.getSubjectFilter());
@@ -194,11 +210,11 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
} else {
throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/
}
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, queue, link, *this, out, true));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, true));
outgoing[link] = q;
q->init();
} else if (node.relay) {
- boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, pn_link_name(link), node.relay));
+ boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, target, pn_link_name(link), node.relay));
outgoing[link] = out;
out->init();
} else {
@@ -220,11 +236,11 @@ void Session::attach(pn_link_t* link, const std::string& src, const std::string&
if (relay) {
if (pn_link_is_sender(link)) {
- boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, pn_link_name(link), relay));
+ boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, tgt, pn_link_name(link), relay));
outgoing[link] = out;
out->init();
} else {
- boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, tgt, pn_link_name(link), relay));
+ boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, src, tgt, pn_link_name(link), relay));
incoming[link] = in;
}
} else {