summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Domain.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Domain.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Relay.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Relay.h6
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp38
15 files changed, 63 insertions, 38 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 1f70cac2cc..098ffe34c4 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -895,7 +895,9 @@ void Broker::deleteObject(const std::string& type, const std::string& name,
connectionId = context->getUrl();
}
QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")");
- if (type == TYPE_QUEUE) {
+ if (objectFactory.deleteObject(*this, type, name, options, userId, connectionId)) {
+ QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ") handled by registered factory");
+ } else if (type == TYPE_QUEUE) {
// extract ifEmpty and ifUnused from options
bool ifUnused = false, ifEmpty = false;
for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
index 4b13bfc871..eb103c3e53 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
@@ -229,6 +229,11 @@ Domain::Domain(const std::string& n, const qpid::types::Variant::Map& properties
}
}
+Domain::~Domain()
+{
+ if (domain != 0) domain->resourceDestroy();
+}
+
boost::shared_ptr<qpid::management::ManagementObject> Domain::GetManagementObject() const
{
return domain;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.h b/qpid/cpp/src/qpid/broker/amqp/Domain.h
index ccbee6341e..d23b3308fe 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Domain.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Domain.h
@@ -49,6 +49,7 @@ class Domain : public qpid::management::Manageable
{
public:
Domain(const std::string& name, const qpid::types::Variant::Map& properties, Broker&);
+ ~Domain();
void connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Interconnects&);
void connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, Interconnects&, boost::shared_ptr<Relay>);
std::auto_ptr<qpid::Sasl> sasl(const std::string& hostname);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
index 9616b267bd..14614b0b87 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
@@ -27,8 +27,8 @@
namespace qpid {
namespace broker {
namespace amqp {
-Incoming::Incoming(pn_link_t* l, Broker& broker, Session& parent, const std::string& target, const std::string& name)
- : ManagedIncomingLink(broker, parent, target, name), credit(100), window(0), link(l), session(parent) {}
+Incoming::Incoming(pn_link_t* l, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name)
+ : ManagedIncomingLink(broker, parent, source, target, name), credit(100), window(0), link(l), session(parent) {}
Incoming::~Incoming() {}
@@ -77,8 +77,8 @@ namespace {
};
}
-DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session& parent, const std::string& target, const std::string& name)
- : Incoming(link, broker, parent, target, name), session(parent.shared_from_this()) {}
+DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name)
+ : Incoming(link, broker, parent, source, target, name), session(parent.shared_from_this()) {}
DecodingIncoming::~DecodingIncoming() {}
void DecodingIncoming::readable(pn_delivery_t* delivery)
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
index cab023c7c1..a7c706aed9 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
@@ -37,7 +37,7 @@ class Session;
class Incoming : public ManagedIncomingLink
{
public:
- Incoming(pn_link_t*, Broker& broker, Session& parent, const std::string& target, const std::string& name);
+ Incoming(pn_link_t*, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name);
virtual ~Incoming();
virtual bool doWork();//do anything that requires output
virtual bool haveWork();//called when handling input to see whether any output work is needed
@@ -55,7 +55,7 @@ class Incoming : public ManagedIncomingLink
class DecodingIncoming : public Incoming
{
public:
- DecodingIncoming(pn_link_t*, Broker& broker, Session& parent, const std::string& target, const std::string& name);
+ DecodingIncoming(pn_link_t*, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name);
virtual ~DecodingIncoming();
void readable(pn_delivery_t* delivery);
virtual void handle(qpid::broker::Message&) = 0;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp
index cbdf0da3ef..8b0891ef3a 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp
@@ -88,7 +88,7 @@ bool Interconnects::deleteObject(Broker&, const std::string& type, const std::st
domains.erase(i);
return true;
} else {
- return false;
+ throw qpid::Exception(QPID_MSG("No such domain: " << name));
}
} else if (type == INCOMING_TYPE || type == OUTGOING_TYPE) {
boost::shared_ptr<Interconnect> interconnect;
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
index 8817e410ce..c9875c457b 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
@@ -30,12 +30,12 @@ namespace qpid {
namespace broker {
namespace amqp {
-ManagedIncomingLink::ManagedIncomingLink(Broker& broker, ManagedSession& p, const std::string& target, const std::string& _name)
+ManagedIncomingLink::ManagedIncomingLink(Broker& broker, ManagedSession& p, const std::string& source, const std::string& target, const std::string& _name)
: parent(p), name(_name)
{
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent) {
- incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, target, _name));
+ incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, source, target, _name));
agent->addObject(incoming);
}
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h
index 5b7db7997c..5d4b1409d4 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h
@@ -36,7 +36,7 @@ class ManagedSession;
class ManagedIncomingLink : public qpid::management::Manageable
{
public:
- ManagedIncomingLink(Broker& broker, ManagedSession& parent, const std::string& target, const std::string& name);
+ ManagedIncomingLink(Broker& broker, ManagedSession& parent, const std::string& source, const std::string& target, const std::string& name);
virtual ~ManagedIncomingLink();
qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
void incomingMessageReceived();
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
index 53e49d2bca..4c5f7fc1fc 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
@@ -30,12 +30,12 @@ namespace qpid {
namespace broker {
namespace amqp {
-ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, ManagedSession& p, const std::string& source, const std::string& _name)
+ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, ManagedSession& p, const std::string& source, const std::string& target, const std::string& _name)
: parent(p), name(_name)
{
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent) {
- outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, source, _name));
+ outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, source, target, _name));
agent->addObject(outgoing);
}
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
index 61d0b9c3a0..f6415bff38 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
@@ -36,7 +36,7 @@ class ManagedSession;
class ManagedOutgoingLink : public qpid::management::Manageable
{
public:
- ManagedOutgoingLink(Broker& broker, ManagedSession& parent, const std::string& source, const std::string& name);
+ ManagedOutgoingLink(Broker& broker, ManagedSession& parent, const std::string& source, const std::string& target, const std::string& name);
virtual ~ManagedOutgoingLink();
qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
void outgoingMessageSent();
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index e531e8cd20..7e98195a60 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -33,15 +33,16 @@ namespace qpid {
namespace broker {
namespace amqp {
-Outgoing::Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& name) : ManagedOutgoingLink(broker, parent, source, name), session(parent) {}
+Outgoing::Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name)
+ : ManagedOutgoingLink(broker, parent, source, target, name), session(parent) {}
void Outgoing::wakeup()
{
session.wakeup();
}
-OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool topic)
- : Outgoing(broker, session, source, pn_link_name(l)),
+OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool topic)
+ : Outgoing(broker, session, source, target, pn_link_name(l)),
Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
exclusive(topic),
queue(q), deliveries(5000), link(l), out(o),
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index b8a689b8f8..b70faa1385 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -59,7 +59,7 @@ class CircularArray
class Outgoing : public ManagedOutgoingLink
{
public:
- Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& name);
+ Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name);
virtual void setSubjectFilter(const std::string&) = 0;
virtual void setSelectorFilter(const std::string&) = 0;
virtual void init() = 0;
@@ -88,7 +88,7 @@ class Outgoing : public ManagedOutgoingLink
class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue>
{
public:
- OutgoingFromQueue(Broker&, const std::string& source, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool topic);
+ OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool topic);
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
void init();
diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
index 48a629a66e..70d12455cd 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
@@ -110,8 +110,8 @@ void Relay::detached(Incoming*)
}
OutgoingFromRelay::OutgoingFromRelay(pn_link_t* l, Broker& broker, Session& parent, const std::string& source,
- const std::string& name_, boost::shared_ptr<Relay> r)
- : Outgoing(broker, parent, source, name_), name(name_), link(l), relay(r) {}
+ const std::string& target, const std::string& name_, boost::shared_ptr<Relay> r)
+ : Outgoing(broker, parent, source, target, name_), name(name_), link(l), relay(r) {}
/**
* Allows the link to initiate any outgoing transfers
*/
@@ -173,9 +173,9 @@ void OutgoingFromRelay::setSelectorFilter(const std::string&)
//TODO
}
-IncomingToRelay::IncomingToRelay(pn_link_t* link, Broker& broker, Session& parent, const std::string& target,
- const std::string& name, boost::shared_ptr<Relay> r)
- : Incoming(link, broker, parent, target, name), relay(r)
+IncomingToRelay::IncomingToRelay(pn_link_t* link, Broker& broker, Session& parent, const std::string& source,
+ const std::string& target, const std::string& name, boost::shared_ptr<Relay> r)
+ : Incoming(link, broker, parent, source, target, name), relay(r)
{
relay->attached(this);
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.h b/qpid/cpp/src/qpid/broker/amqp/Relay.h
index 0b20b563d4..19e0c2e3fe 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Relay.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Relay.h
@@ -95,7 +95,7 @@ class OutgoingFromRelay : public Outgoing
{
public:
OutgoingFromRelay(pn_link_t*, Broker&, Session&, const std::string& source,
- const std::string& name, boost::shared_ptr<Relay>);
+ const std::string& target, const std::string& name, boost::shared_ptr<Relay>);
bool doWork();
void handle(pn_delivery_t* delivery);
void detached();
@@ -111,8 +111,8 @@ class OutgoingFromRelay : public Outgoing
class IncomingToRelay : public Incoming
{
public:
- IncomingToRelay(pn_link_t*, Broker&, Session&, const std::string& target,
- const std::string& name, boost::shared_ptr<Relay> r);
+ IncomingToRelay(pn_link_t*, Broker&, Session&, const std::string& source,
+ const std::string& target, const std::string& name, boost::shared_ptr<Relay> r);
bool settle();
bool doWork();
bool haveWork();
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index bb94a37398..544616b897 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -56,7 +56,7 @@ namespace amqp {
class IncomingToQueue : public DecodingIncoming
{
public:
- IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : DecodingIncoming(l, b, p, q->getName(), pn_link_name(l)), queue(q) {}
+ IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, q->getName(), pn_link_name(l)), queue(q) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Queue> queue;
@@ -65,7 +65,7 @@ class IncomingToQueue : public DecodingIncoming
class IncomingToExchange : public DecodingIncoming
{
public:
- IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : DecodingIncoming(l, b, p, e->getName(), pn_link_name(l)), exchange(e) {}
+ IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Exchange> exchange;
@@ -146,15 +146,22 @@ void Session::attach(pn_link_t* link)
void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name)
{
ResolvedNode node = resolve(name, target, true);
-
+ const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link));
+ if (!sourceAddress) {
+ sourceAddress = pn_terminus_get_address(pn_link_source(link));
+ }
+ std::string source;
+ if (sourceAddress) {
+ source = sourceAddress;
+ }
if (node.queue) {
- boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link));
+ boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link, source));
incoming[link] = q;
} else if (node.exchange) {
- boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link));
+ boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link, source));
incoming[link] = e;
} else if (node.relay) {
- boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, name, pn_link_name(link), node.relay));
+ boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, source, name, pn_link_name(link), node.relay));
incoming[link] = in;
} else {
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
@@ -168,9 +175,18 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
ResolvedNode node = resolve(name, source, false);
Filter filter;
filter.read(pn_terminus_filter(source));
+ const char* targetAddress = pn_terminus_get_address(pn_link_remote_target(link));
+ if (!targetAddress) {
+ targetAddress = pn_terminus_get_address(pn_link_target(link));
+ }
+ std::string target;
+ if (targetAddress) {
+ target = targetAddress;
+ }
+
if (node.queue) {
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, node.queue, link, *this, out, false));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false));
q->init();
if (filter.hasSubjectFilter()) {
q->setSubjectFilter(filter.getSubjectFilter());
@@ -194,11 +210,11 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
} else {
throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/
}
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, queue, link, *this, out, true));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, true));
outgoing[link] = q;
q->init();
} else if (node.relay) {
- boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, pn_link_name(link), node.relay));
+ boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, target, pn_link_name(link), node.relay));
outgoing[link] = out;
out->init();
} else {
@@ -220,11 +236,11 @@ void Session::attach(pn_link_t* link, const std::string& src, const std::string&
if (relay) {
if (pn_link_is_sender(link)) {
- boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, pn_link_name(link), relay));
+ boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, tgt, pn_link_name(link), relay));
outgoing[link] = out;
out->init();
} else {
- boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, tgt, pn_link_name(link), relay));
+ boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, src, tgt, pn_link_name(link), relay));
incoming[link] = in;
}
} else {