summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-03-23 17:59:50 +0000
committerGordon Sim <gsim@apache.org>2013-03-23 17:59:50 +0000
commit990613d5320add3f29119e5039f9d757638be1fb (patch)
tree2316e9bc514c4637875edfae334e4113849a248d
parent4b532586d1a44628ffa037fa131f4f314592fda5 (diff)
downloadqpid-python-990613d5320add3f29119e5039f9d757638be1fb.tar.gz
QPID-4586: fixes for dynamic sources/targets and on demand creation of nodes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1460198 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/examples/messaging/client.cpp4
-rw-r--r--qpid/cpp/include/qpid/messaging/Address.h1
-rw-r--r--qpid/cpp/include/qpid/messaging/Receiver.h6
-rw-r--r--qpid/cpp/include/qpid/messaging/Sender.h6
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/Makefile.am1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp25
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Sasl.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Sasl.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp38
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.h1
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp5
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h1
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp5
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/Address.cpp48
-rw-r--r--qpid/cpp/src/qpid/messaging/AddressImpl.h44
-rw-r--r--qpid/cpp/src/qpid/messaging/AddressParser.cpp6
-rw-r--r--qpid/cpp/src/qpid/messaging/Receiver.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/ReceiverImpl.h2
-rw-r--r--qpid/cpp/src/qpid/messaging/Sender.cpp3
-rw-r--r--qpid/cpp/src/qpid/messaging/SenderImpl.h2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp36
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h3
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp22
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp24
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h3
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp5
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp23
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.h3
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp5
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h1
36 files changed, 253 insertions, 95 deletions
diff --git a/qpid/cpp/examples/messaging/client.cpp b/qpid/cpp/examples/messaging/client.cpp
index f0ecd96206..983f0a8878 100644
--- a/qpid/cpp/examples/messaging/client.cpp
+++ b/qpid/cpp/examples/messaging/client.cpp
@@ -48,8 +48,8 @@ int main(int argc, char** argv) {
Sender sender = session.createSender("service_queue");
//create temp queue & receiver...
- Address responseQueue("#response-queue; {create:always, delete:always}");
- Receiver receiver = session.createReceiver(responseQueue);
+ Receiver receiver = session.createReceiver("#");
+ Address responseQueue = receiver.getAddress();
// Now send some messages ...
string s[] = {
diff --git a/qpid/cpp/include/qpid/messaging/Address.h b/qpid/cpp/include/qpid/messaging/Address.h
index 63dce0c49d..224a70b193 100644
--- a/qpid/cpp/include/qpid/messaging/Address.h
+++ b/qpid/cpp/include/qpid/messaging/Address.h
@@ -153,6 +153,7 @@ class QPID_MESSAGING_CLASS_EXTERN Address
QPID_MESSAGING_EXTERN bool operator !() const;
private:
AddressImpl* impl;
+ friend class AddressImpl;
};
#ifndef SWIG
diff --git a/qpid/cpp/include/qpid/messaging/Receiver.h b/qpid/cpp/include/qpid/messaging/Receiver.h
index 13317dfcbd..6c6fdaa7cb 100644
--- a/qpid/cpp/include/qpid/messaging/Receiver.h
+++ b/qpid/cpp/include/qpid/messaging/Receiver.h
@@ -34,6 +34,7 @@ namespace messaging {
template <class> class PrivateImplRef;
#endif
+class Address;
class Message;
class ReceiverImpl;
class Session;
@@ -134,6 +135,11 @@ class QPID_MESSAGING_CLASS_EXTERN Receiver : public qpid::messaging::Handle<Rece
*/
QPID_MESSAGING_EXTERN Session getSession() const;
+ /**
+ * Returns an address for this receiver.
+ */
+ QPID_MESSAGING_EXTERN Address getAddress() const;
+
#ifndef SWIG
private:
friend class qpid::messaging::PrivateImplRef<Receiver>;
diff --git a/qpid/cpp/include/qpid/messaging/Sender.h b/qpid/cpp/include/qpid/messaging/Sender.h
index 8e1c5846e9..9c7bca1905 100644
--- a/qpid/cpp/include/qpid/messaging/Sender.h
+++ b/qpid/cpp/include/qpid/messaging/Sender.h
@@ -34,6 +34,7 @@ namespace messaging {
#ifndef SWIG
template <class> class PrivateImplRef;
#endif
+class Address;
class Message;
class SenderImpl;
class Session;
@@ -89,6 +90,11 @@ class QPID_MESSAGING_CLASS_EXTERN Sender : public qpid::messaging::Handle<Sender
* Returns a handle to the session associated with this sender.
*/
QPID_MESSAGING_EXTERN Session getSession() const;
+
+ /**
+ * Returns an address for this sender.
+ */
+ QPID_MESSAGING_EXTERN Address getAddress() const;
#ifndef SWIG
private:
friend class qpid::messaging::PrivateImplRef<Sender>;
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index ef68d79fb2..bb00046857 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -1063,6 +1063,7 @@ install_pdb (qpidclient ${QPID_COMPONENT_CLIENT})
set (qpidmessaging_SOURCES_hidden
qpid/messaging/AddressParser.h
+ qpid/messaging/AddressImpl.h
qpid/messaging/ConnectionImpl.h
qpid/messaging/ReceiverImpl.h
qpid/messaging/SessionImpl.h
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 42d5c60475..cad41bca86 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -935,6 +935,7 @@ libqpidtypes_la_LDFLAGS = -version-info $(QPIDTYPES_VERSION_INFO)
libqpidmessaging_la_LIBADD = libqpidclient.la libqpidtypes.la
libqpidmessaging_la_SOURCES = \
qpid/messaging/Address.cpp \
+ qpid/messaging/AddressImpl.h \
qpid/messaging/AddressParser.h \
qpid/messaging/AddressParser.cpp \
qpid/messaging/Connection.cpp \
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index a83034eb6e..be98c048b6 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -37,11 +37,11 @@ namespace qpid {
namespace broker {
namespace amqp {
-Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse)
+Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse, const std::string& d)
: ManagedConnection(b, i),
connection(pn_connection()),
transport(pn_transport()),
- out(o), id(i), broker(b), haveOutput(true), interconnects(interconnects_)
+ out(o), id(i), broker(b), haveOutput(true), interconnects(interconnects_), domain(d)
{
if (pn_transport_bind(transport, connection)) {
//error
@@ -265,4 +265,8 @@ std::string Connection::getError()
return text.str();
}
+std::string Connection::getDomain() const
+{
+ return domain;
+}
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h
index 28cf86f123..d61db82e60 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h
@@ -45,7 +45,7 @@ class Session;
class Connection : public sys::ConnectionCodec, public ManagedConnection
{
public:
- Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, bool saslInUse);
+ Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, bool saslInUse, const std::string& domain);
virtual ~Connection();
size_t decode(const char* buffer, size_t size);
virtual size_t encode(char* buffer, size_t size);
@@ -57,6 +57,7 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection
framing::ProtocolVersion getVersion() const;
pn_transport_t* getTransport();
Interconnects& getInterconnects();
+ std::string getDomain() const;
protected:
typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
pn_connection_t* connection;
@@ -67,6 +68,7 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection
bool haveOutput;
Sessions sessions;
Interconnects& interconnects;
+ std::string domain;
virtual void process();
std::string getError();
diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
index 082715b1b2..92a6f75f1e 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
@@ -41,7 +41,7 @@ namespace amqp {
Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse,
bool i, const std::string& n, const std::string& s, const std::string& t, Domain& d, Interconnects& r)
- : Connection(out, id, broker, r, saslInUse), incoming(i), name(n), source(s), target(t), domain(d), registry(r), headerDiscarded(false),
+ : Connection(out, id, broker, r, saslInUse, std::string()), incoming(i), name(n), source(s), target(t), domain(d), registry(r), headerDiscarded(false),
closeRequested(false), isTransportDeleted(false)
{}
diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
index 0e622f8d20..16f0cdc39f 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
@@ -41,10 +41,19 @@ namespace qpid {
namespace broker {
namespace amqp {
+struct Options : public qpid::Options {
+ std::string domain;
+
+ Options() : qpid::Options("AMQP 1.0 Options") {
+ addOptions()
+ ("domain", optValue(domain, "DOMAIN"), "Domain of this broker");
+ }
+};
+
class ProtocolImpl : public Protocol
{
public:
- ProtocolImpl(Interconnects* i, Broker& b) : interconnects(i), broker(b)
+ ProtocolImpl(Interconnects* i, Broker& b, const std::string& d) : interconnects(i), broker(b), domain(d)
{
broker.getObjectFactoryRegistry().add(interconnects);//registry deletes on shutdown
}
@@ -54,16 +63,20 @@ class ProtocolImpl : public Protocol
private:
Interconnects* interconnects;
Broker& broker;
+ std::string domain;
};
struct ProtocolPlugin : public Plugin
{
+ Options options;
+ Options* getOptions() { return &options; }
+
void earlyInitialize(Plugin::Target& target)
{
//need to register protocol before recovery from store
broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target);
if (broker) {
- ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), *broker);
+ ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), *broker, options.domain);
broker->getProtocolRegistry().add("AMQP 1.0", impl);//registry deletes on shutdown
}
}
@@ -79,18 +92,20 @@ qpid::sys::ConnectionCodec* ProtocolImpl::create(const qpid::framing::ProtocolVe
if (v.getProtocol() == qpid::framing::ProtocolVersion::SASL) {
if (broker.getOptions().auth) {
QPID_LOG(info, "Using AMQP 1.0 (with SASL layer)");
- return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm, broker.getOptions().requireEncrypted, external));
+ return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects,
+ qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm,broker.getOptions().requireEncrypted, external),
+ domain);
} else {
std::auto_ptr<SaslServer> authenticator(new qpid::NullSaslServer(broker.getOptions().realm));
QPID_LOG(info, "Using AMQP 1.0 (with dummy SASL layer)");
- return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, authenticator);
+ return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, authenticator, domain);
}
} else {
if (broker.getOptions().auth) {
throw qpid::Exception("SASL layer required!");
} else {
QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)");
- return new qpid::broker::amqp::Connection(out, id, broker, *interconnects, false);
+ return new qpid::broker::amqp::Connection(out, id, broker, *interconnects, false, domain);
}
}
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
index d8e12fcfdd..820aaf87d4 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
@@ -31,8 +31,8 @@ namespace qpid {
namespace broker {
namespace amqp {
-Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, Interconnects& i, std::auto_ptr<qpid::SaslServer> auth)
- : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, i, true),
+Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, Interconnects& i, std::auto_ptr<qpid::SaslServer> auth, const std::string& domain)
+ : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, i, true, domain),
authenticator(auth),
state(INCOMPLETE), writeHeader(true), haveOutput(true)
{
diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.h b/qpid/cpp/src/qpid/broker/amqp/Sasl.h
index 7718b4c43a..194ab0a0d5 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Sasl.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.h
@@ -39,7 +39,7 @@ namespace amqp {
class Sasl : public sys::ConnectionCodec, qpid::amqp::SaslServer
{
public:
- Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, std::auto_ptr<qpid::SaslServer> authenticator);
+ Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, std::auto_ptr<qpid::SaslServer> authenticator, const std::string& domain);
~Sasl();
size_t decode(const char* buffer, size_t size);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 544616b897..62011f6372 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -53,6 +53,20 @@ namespace qpid {
namespace broker {
namespace amqp {
+namespace {
+bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
+{
+ while (pn_data_next(capabilities)) {
+ pn_bytes_t c = pn_data_get_symbol(capabilities);
+ std::string s(c.start, c.size);
+ if (s == name) return true;
+ }
+ return false;
+}
+
+const std::string CREATE_ON_DEMAND("create-on-demand");
+}
+
class IncomingToQueue : public DecodingIncoming
{
public:
@@ -81,7 +95,7 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
node.exchange = broker.getExchanges().find(name);
node.queue = broker.getQueues().find(name);
if (!node.queue && !node.exchange) {
- if (pn_terminus_is_dynamic(terminus)) {
+ if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
//is it a queue or an exchange?
NodeProperties properties;
properties.read(pn_terminus_properties(terminus));
@@ -117,25 +131,43 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
return node;
}
+std::string Session::generateName(pn_link_t* link)
+{
+ std::stringstream s;
+ s << qpid::types::Uuid(true) << "::" << pn_link_name(link);
+ if (!connection.getDomain().empty()) {
+ s << "@" << connection.getDomain();
+ }
+ return s.str();
+}
+
void Session::attach(pn_link_t* link)
{
if (pn_link_is_sender(link)) {
pn_terminus_t* source = pn_link_remote_source(link);
//i.e a subscription
+ std::string name;
if (pn_terminus_get_type(source) == PN_UNSPECIFIED) {
throw qpid::Exception("No source specified!");/*invalid-field?*/
+ } else if (pn_terminus_is_dynamic(source)) {
+ name = generateName(link);
+ } else {
+ name = pn_terminus_get_address(source);
}
- std::string name = pn_terminus_get_address(source);
QPID_LOG(debug, "Received attach request for outgoing link from " << name);
pn_terminus_set_address(pn_link_source(link), name.c_str());
setupOutgoing(link, source, name);
} else {
pn_terminus_t* target = pn_link_remote_target(link);
+ std::string name;
if (pn_terminus_get_type(target) == PN_UNSPECIFIED) {
throw qpid::Exception("No target specified!");/*invalid field?*/
+ } else if (pn_terminus_is_dynamic(target)) {
+ name = generateName(link);
+ } else {
+ name = pn_terminus_get_address(target);
}
- std::string name = pn_terminus_get_address(target);
QPID_LOG(debug, "Received attach request for incoming link to " << name);
pn_terminus_set_address(pn_link_target(link), name.c_str());
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h
index 74f50a9eda..b142224cfd 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.h
@@ -95,6 +95,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming);
void setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name);
void setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name);
+ std::string generateName(pn_link_t*);
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 40b09b0fc0..11f9475cad 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -143,6 +143,11 @@ uint32_t ReceiverImpl::getUnsettled()
return parent->getUnsettledAcks(destination);
}
+qpid::messaging::Address ReceiverImpl::getAddress() const
+{
+ return address;
+}
+
ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
const qpid::messaging::Address& a) :
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index 76da4f31a9..4dba76c8d9 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -66,6 +66,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
void received(qpid::messaging::Message& message);
qpid::messaging::Session getSession() const;
bool isClosed() const;
+ qpid::messaging::Address getAddress() const;
private:
mutable sys::Mutex lock;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index b275db38d7..7001acaf99 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -34,6 +34,11 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
parent(&_parent), name(_name), address(_address), state(UNRESOLVED),
capacity(50), window(0), flushed(false), unreliable(AddressResolution::is_unreliable(address)) {}
+qpid::messaging::Address SenderImpl::getAddress() const
+{
+ return address;
+}
+
void SenderImpl::send(const qpid::messaging::Message& message, bool sync)
{
if (unreliable) { // immutable, don't need lock
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index d75863c743..ee250af2d4 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -56,6 +56,7 @@ class SenderImpl : public qpid::messaging::SenderImpl
void init(qpid::client::AsyncSession, AddressResolution&);
const std::string& getName() const;
qpid::messaging::Session getSession() const;
+ qpid::messaging::Address getAddress() const;
private:
mutable sys::Mutex lock;
diff --git a/qpid/cpp/src/qpid/messaging/Address.cpp b/qpid/cpp/src/qpid/messaging/Address.cpp
index a516959edb..6fbaeef661 100644
--- a/qpid/cpp/src/qpid/messaging/Address.cpp
+++ b/qpid/cpp/src/qpid/messaging/Address.cpp
@@ -19,6 +19,8 @@
*
*/
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/AddressImpl.h"
+#include "qpid/messaging/AddressParser.h"
#include "qpid/framing/Uuid.h"
#include <sstream>
#include <boost/format.hpp>
@@ -34,51 +36,9 @@ const std::string OPTIONS_DIVIDER = ";";
const std::string SPACE = " ";
const std::string TYPE = "type";
}
-class AddressImpl
-{
- public:
- std::string name;
- std::string subject;
- Variant::Map options;
-
- AddressImpl() {}
- AddressImpl(const std::string& n, const std::string& s, const Variant::Map& o) :
- name(n), subject(s), options(o) {}
-};
-
-class AddressParser
-{
- public:
- AddressParser(const std::string&);
- bool parse(Address& address);
- private:
- const std::string& input;
- std::string::size_type current;
- static const std::string RESERVED;
-
- bool readChar(char c);
- bool readQuotedString(std::string& s);
- bool readQuotedValue(Variant& value);
- bool readString(std::string& value, char delimiter);
- bool readWord(std::string& word, const std::string& delims = RESERVED);
- bool readSimpleValue(Variant& word);
- bool readKey(std::string& key);
- bool readValue(Variant& value);
- bool readKeyValuePair(Variant::Map& map);
- bool readMap(Variant& value);
- bool readList(Variant& value);
- bool readName(std::string& name);
- bool readSubject(std::string& subject);
- bool error(const std::string& message);
- bool eos();
- bool iswhitespace();
- bool in(const std::string& delims);
- bool isreserved();
-};
-
Address::Address() : impl(new AddressImpl()) {}
Address::Address(const std::string& address) : impl(new AddressImpl())
-{
+{
AddressParser parser(address);
parser.parse(*this);
}
@@ -86,7 +46,7 @@ Address::Address(const std::string& name, const std::string& subject, const Vari
const std::string& type)
: impl(new AddressImpl(name, subject, options)) { setType(type); }
Address::Address(const Address& a) :
- impl(new AddressImpl(a.impl->name, a.impl->subject, a.impl->options)) {}
+ impl(new AddressImpl(a.impl->name, a.impl->subject, a.impl->options)) { impl->temporary = a.impl->temporary; }
Address::~Address() { delete impl; }
Address& Address::operator=(const Address& a) { *impl = *a.impl; return *this; }
diff --git a/qpid/cpp/src/qpid/messaging/AddressImpl.h b/qpid/cpp/src/qpid/messaging/AddressImpl.h
new file mode 100644
index 0000000000..e35aa7fef2
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/AddressImpl.h
@@ -0,0 +1,44 @@
+#ifndef QPID_MESSAGING_ADDRESSIMPL_H
+#define QPID_MESSAGING_ADDRESSIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/types/Variant.h"
+namespace qpid {
+namespace messaging {
+
+class AddressImpl
+{
+ public:
+ std::string name;
+ std::string subject;
+ qpid::types::Variant::Map options;
+ bool temporary;
+
+ AddressImpl() : temporary(false) {}
+ AddressImpl(const std::string& n, const std::string& s, const qpid::types::Variant::Map& o) :
+ name(n), subject(s), options(o), temporary(false) {}
+ static void setTemporary(Address& a, bool value) { a.impl->temporary = value; }
+ static bool isTemporary(const Address& a) { return a.impl->temporary; }
+};
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_ADDRESSIMPL_H*/
diff --git a/qpid/cpp/src/qpid/messaging/AddressParser.cpp b/qpid/cpp/src/qpid/messaging/AddressParser.cpp
index 67249e188e..882deba463 100644
--- a/qpid/cpp/src/qpid/messaging/AddressParser.cpp
+++ b/qpid/cpp/src/qpid/messaging/AddressParser.cpp
@@ -19,6 +19,7 @@
*
*/
#include "AddressParser.h"
+#include "AddressImpl.h"
#include "qpid/framing/Uuid.h"
#include <boost/format.hpp>
@@ -38,7 +39,10 @@ bool AddressParser::parse(Address& address)
{
std::string name;
if (readName(name)) {
- if (name.find('#') == 0) name = qpid::framing::Uuid(true).str() + name;
+ if (name.find('#') == 0) {
+ name = qpid::framing::Uuid(true).str() + name;
+ AddressImpl::setTemporary(address, true);
+ }
address.setName(name);
if (readChar('/')) {
std::string subject;
diff --git a/qpid/cpp/src/qpid/messaging/Receiver.cpp b/qpid/cpp/src/qpid/messaging/Receiver.cpp
index 78e0c5daa3..c45ebd6760 100644
--- a/qpid/cpp/src/qpid/messaging/Receiver.cpp
+++ b/qpid/cpp/src/qpid/messaging/Receiver.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Address.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/ReceiverImpl.h"
#include "qpid/messaging/Session.h"
@@ -45,4 +46,5 @@ void Receiver::close() { impl->close(); }
const std::string& Receiver::getName() const { return impl->getName(); }
Session Receiver::getSession() const { return impl->getSession(); }
bool Receiver::isClosed() const { return impl->isClosed(); }
+Address Receiver::getAddress() const { return impl->getAddress(); }
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
index e450693d2c..59ccc3214e 100644
--- a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
+++ b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
@@ -27,6 +27,7 @@
namespace qpid {
namespace messaging {
+class Address;
class Duration;
class Message;
class MessageListener;
@@ -48,6 +49,7 @@ class ReceiverImpl : public virtual qpid::RefCounted
virtual const std::string& getName() const = 0;
virtual Session getSession() const = 0;
virtual bool isClosed() const = 0;
+ virtual Address getAddress() const = 0;
};
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/Sender.cpp b/qpid/cpp/src/qpid/messaging/Sender.cpp
index 53dbb69777..a60de3d606 100644
--- a/qpid/cpp/src/qpid/messaging/Sender.cpp
+++ b/qpid/cpp/src/qpid/messaging/Sender.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Address.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/SenderImpl.h"
#include "qpid/messaging/Session.h"
@@ -40,5 +41,5 @@ uint32_t Sender::getUnsettled() { return impl->getUnsettled(); }
uint32_t Sender::getAvailable() { return getCapacity() - getUnsettled(); }
const std::string& Sender::getName() const { return impl->getName(); }
Session Sender::getSession() const { return impl->getSession(); }
-
+Address Sender::getAddress() const { return impl->getAddress(); }
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/SenderImpl.h b/qpid/cpp/src/qpid/messaging/SenderImpl.h
index d978463fdb..91fd9b1536 100644
--- a/qpid/cpp/src/qpid/messaging/SenderImpl.h
+++ b/qpid/cpp/src/qpid/messaging/SenderImpl.h
@@ -27,6 +27,7 @@
namespace qpid {
namespace messaging {
+class Address;
class Message;
class Session;
@@ -41,6 +42,7 @@ class SenderImpl : public virtual qpid::RefCounted
virtual uint32_t getUnsettled() = 0;
virtual const std::string& getName() const = 0;
virtual Session getSession() const = 0;
+ virtual Address getAddress() const = 0;
private:
};
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 359660dce5..a46606a526 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/Address.h"
+#include "qpid/log/Statement.h"
#include <vector>
#include <boost/assign.hpp>
extern "C" {
@@ -56,7 +57,9 @@ const std::string MOVE("move");
const std::string COPY("copy");
const std::string SUPPORTED_DIST_MODES("supported-dist-modes");
+const std::string CREATE_ON_DEMAND("create-on-demand");
+const std::string DUMMY(".");
const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER);
const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER);
@@ -155,26 +158,33 @@ const qpid::types::Variant::Map& AddressHelper::getLinkProperties() const
return link;
}
-void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
+void AddressHelper::setNodeProperties(pn_terminus_t* terminus, bool dynamic)
{
- pn_terminus_set_dynamic(terminus, true);
+ if (dynamic) {
+ pn_terminus_set_address(terminus, DUMMY.c_str());//Workaround for proton bug
+ pn_terminus_set_dynamic(terminus, true);
+ } else {
+ pn_data_t* capabilities = pn_terminus_capabilities(terminus);
+ if (!capabilities) {
+ QPID_LOG(error, "!!!No capabilities!!!");
+ }
+ pn_data_put_symbol(capabilities, convert(CREATE_ON_DEMAND));
+ }
//properties for dynamically created node:
- pn_data_t* data = pn_terminus_properties(terminus);
if (node.size()) {
+ pn_data_t* data = pn_terminus_properties(terminus);
pn_data_put_map(data);
pn_data_enter(data);
- }
- for (qpid::types::Variant::Map::const_iterator i = node.begin(); i != node.end(); ++i) {
- if (i->first == TYPE) {
- pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
- pn_data_put_string(data, convert(i->second == TOPIC ? COPY : MOVE));
- } else {
- pn_data_put_symbol(data, convert(i->first));
- pn_data_put_string(data, convert(i->second.asString()));
+ for (qpid::types::Variant::Map::const_iterator i = node.begin(); i != node.end(); ++i) {
+ if (i->first == TYPE) {
+ pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
+ pn_data_put_string(data, convert(i->second == TOPIC ? COPY : MOVE));
+ } else {
+ pn_data_put_symbol(data, convert(i->first));
+ pn_data_put_string(data, convert(i->second.asString()));
+ }
}
- }
- if (node.size()) {
pn_data_exit(data);
}
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
index cd0aa1be9e..2442619ed3 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
@@ -40,7 +40,7 @@ class AddressHelper
bool deleteEnabled(CheckMode mode) const;
bool assertEnabled(CheckMode mode) const;
- void setNodeProperties(pn_terminus_t*);
+ void setNodeProperties(pn_terminus_t*, bool dynamic);
const qpid::types::Variant::Map& getNodeProperties() const;
const qpid::types::Variant::Map& getLinkProperties() const;
private:
@@ -49,6 +49,7 @@ class AddressHelper
std::string deletePolicy;
qpid::types::Variant::Map node;
qpid::types::Variant::Map link;
+ std::string name;
bool enabled(const std::string& policy, CheckMode mode) const;
};
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index c74ee01898..9febe66f7e 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -26,6 +26,7 @@
#include "SessionContext.h"
#include "Transport.h"
#include "qpid/messaging/exceptions.h"
+#include "qpid/messaging/AddressImpl.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
@@ -285,34 +286,45 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha
{
lnk->configure();
attach(ssn->session, (pn_link_t*) lnk->sender);
- if (!pn_link_remote_target((pn_link_t*) lnk->sender)) {
+ pn_terminus_t* t = pn_link_remote_target(lnk->sender);
+ if (!pn_terminus_get_address(t)) {
std::string msg("No such target : ");
msg += lnk->getTarget();
+ QPID_LOG(debug, msg);
throw qpid::messaging::NotFound(msg);
+ } else if (AddressImpl::isTemporary(lnk->address)) {
+ lnk->address.setName(pn_terminus_get_address(t));
+ QPID_LOG(debug, "Dynamic target name set to " << lnk->address.getName());
}
+ QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
}
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
lnk->configure();
attach(ssn->session, lnk->receiver, lnk->capacity);
- if (!pn_link_remote_source(lnk->receiver)) {
+ pn_terminus_t* s = pn_link_remote_source(lnk->receiver);
+ if (!pn_terminus_get_address(s)) {
std::string msg("No such source : ");
msg += lnk->getSource();
+ QPID_LOG(debug, msg);
throw qpid::messaging::NotFound(msg);
+ } else if (AddressImpl::isTemporary(lnk->address)) {
+ lnk->address.setName(pn_terminus_get_address(s));
+ QPID_LOG(debug, "Dynamic source name set to " << lnk->address.getName());
}
+ QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
}
void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int credit)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- QPID_LOG(debug, "Attaching link " << link << ", state=" << pn_link_state(link));
pn_link_open(link);
- QPID_LOG(debug, "Link attached " << link << ", state=" << pn_link_state(link));
+ QPID_LOG(debug, "Link attach sent for " << link << ", state=" << pn_link_state(link));
if (credit) pn_link_flow(link, credit);
wakeupDriver();
while (pn_link_state(link) & PN_REMOTE_UNINIT) {
- QPID_LOG(debug, "waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link));
+ QPID_LOG(debug, "Waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link) << "...");
wait();
}
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index e5f6738b61..f7b06ddc05 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -20,9 +20,11 @@
*/
#include "qpid/messaging/amqp/ReceiverContext.h"
#include "qpid/messaging/amqp/AddressHelper.h"
+#include "qpid/messaging/AddressImpl.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Message.h"
#include "qpid/amqp/descriptors.h"
+#include "qpid/log/Statement.h"
extern "C" {
#include <proton/engine.h>
}
@@ -38,7 +40,7 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co
capacity(0) {}
ReceiverContext::~ReceiverContext()
{
- pn_link_free(receiver);
+ //pn_link_free(receiver);
}
void ReceiverContext::setCapacity(uint32_t c)
@@ -76,7 +78,7 @@ uint32_t ReceiverContext::getUnsettled()
void ReceiverContext::close()
{
-
+ pn_link_close(receiver);
}
const std::string& ReceiverContext::getName() const
@@ -113,11 +115,18 @@ void ReceiverContext::configure() const
}
void ReceiverContext::configure(pn_terminus_t* source) const
{
- pn_terminus_set_address(source, address.getName().c_str());
//dynamic create:
AddressHelper helper(address);
- if (helper.createEnabled(AddressHelper::FOR_RECEIVER)) {
- helper.setNodeProperties(source);
+ if (AddressImpl::isTemporary(address)) {
+ //application expects a name to be generated
+ QPID_LOG(debug, "source is dynamic");
+ helper.setNodeProperties(source, true);
+ } else {
+ pn_terminus_set_address(source, address.getName().c_str());
+ if (helper.createEnabled(AddressHelper::FOR_RECEIVER)) {
+ //application expects name of node to be as specified
+ helper.setNodeProperties(source, false);
+ }
}
// Look specifically for qpid.selector link property and add a filter for it
@@ -149,6 +158,11 @@ void ReceiverContext::configure(pn_terminus_t* source) const
}
}
+Address ReceiverContext::getAddress() const
+{
+ return address;
+}
+
bool ReceiverContext::isClosed() const
{
return false;//TODO
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
index 34ecdda6be..9c5386157b 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
@@ -55,10 +55,11 @@ class ReceiverContext
const std::string& getSource() const;
bool isClosed() const;
void configure() const;
+ Address getAddress() const;
private:
friend class ConnectionContext;
const std::string name;
- const Address address;
+ Address address;
pn_link_t* receiver;
uint32_t capacity;
void configure(pn_terminus_t*) const;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp
index 9bf64ebb8d..c601d05ed0 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp
@@ -103,4 +103,9 @@ bool ReceiverHandle::isClosed() const
return receiver->isClosed();
}
+Address ReceiverHandle::getAddress() const
+{
+ return receiver->getAddress();
+}
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h
index a1a6f26025..08a95fb585 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.h
@@ -53,6 +53,7 @@ class ReceiverHandle : public qpid::messaging::ReceiverImpl
const std::string& getName() const;
qpid::messaging::Session getSession() const;
bool isClosed() const;
+ Address getAddress() const;
private:
boost::shared_ptr<ConnectionContext> connection;
boost::shared_ptr<SessionContext> session;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 96c4437b89..fe74a4bca8 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -21,6 +21,7 @@
#include "qpid/messaging/amqp/SenderContext.h"
#include "qpid/messaging/amqp/EncodedMessage.h"
#include "qpid/messaging/amqp/AddressHelper.h"
+#include "qpid/messaging/AddressImpl.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/messaging/exceptions.h"
@@ -44,12 +45,12 @@ SenderContext::SenderContext(pn_session_t* session, const std::string& n, const
SenderContext::~SenderContext()
{
- pn_link_free(sender);
+ //pn_link_free(sender);
}
void SenderContext::close()
{
-
+ pn_link_close(sender);
}
void SenderContext::setCapacity(uint32_t c)
@@ -347,11 +348,16 @@ void SenderContext::configure() const
}
void SenderContext::configure(pn_terminus_t* target) const
{
- pn_terminus_set_address(target, address.getName().c_str());
- //dynamic create:
AddressHelper helper(address);
- if (helper.createEnabled(AddressHelper::FOR_SENDER)) {
- helper.setNodeProperties(target);
+ if (AddressImpl::isTemporary(address)) {
+ //application expects a name to be generated
+ helper.setNodeProperties(target, true);
+ } else {
+ pn_terminus_set_address(target, address.getName().c_str());
+ if (helper.createEnabled(AddressHelper::FOR_SENDER)) {
+ //application expects name of node to be as specified
+ helper.setNodeProperties(target, false);
+ }
}
}
@@ -360,4 +366,9 @@ bool SenderContext::settled()
return processUnsettled() == 0;
}
+Address SenderContext::getAddress() const
+{
+ return address;
+}
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
index 3595379e70..2969e75a16 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -71,12 +71,13 @@ class SenderContext
Delivery* send(const qpid::messaging::Message& message);
void configure() const;
bool settled();
+ Address getAddress() const;
private:
friend class ConnectionContext;
typedef std::deque<Delivery> Deliveries;
const std::string name;
- const qpid::messaging::Address address;
+ qpid::messaging::Address address;
pn_link_t* sender;
int32_t nextId;
Deliveries deliveries;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
index b7168e5b31..4e258e7b38 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
@@ -72,4 +72,9 @@ qpid::messaging::Session SenderHandle::getSession() const
return qpid::messaging::Session(new SessionHandle(connection, session));
}
+Address SenderHandle::getAddress() const
+{
+ return sender->getAddress();
+}
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h
index 3c6b666582..fab158c1ef 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.h
@@ -48,6 +48,7 @@ class SenderHandle : public qpid::messaging::SenderImpl
uint32_t getUnsettled();
const std::string& getName() const;
Session getSession() const;
+ Address getAddress() const;
private:
boost::shared_ptr<ConnectionContext> connection;
boost::shared_ptr<SessionContext> session;