summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2015-11-06 20:31:55 +0000
committerGordon Sim <gsim@apache.org>2015-11-06 20:31:55 +0000
commitb1dcd3eb8ac737c9be61c1c67bcb57ff04344c63 (patch)
tree149d8a83dae2b410cea8b8a11bbe27683f5a0e6f
parent74f0456e992417d3a382c4b548ef5ac8901f23ec (diff)
downloadqpid-python-b1dcd3eb8ac737c9be61c1c67bcb57ff04344c63.tar.gz
QPID-6754: add support for null target
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1713024 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Authorise.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Authorise.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp57
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp9
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp2
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_1_0/general.py10
8 files changed, 87 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp b/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp
index 3c88414567..57d995cb07 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp
@@ -40,6 +40,14 @@ const std::string POLICY_TYPE("qpid.policy_type");
}
Authorise::Authorise(const std::string& u, AclModule* a) : user(u), acl(a) {}
+void Authorise::access(const std::string& name)
+{
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ if (!acl->authorise(user, acl::ACT_ACCESS, acl::OBJ_EXCHANGE, name, &params))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied exchange access request from " << user));
+ }
+}
void Authorise::access(boost::shared_ptr<Exchange> exchange)
{
if (acl) {
diff --git a/qpid/cpp/src/qpid/broker/amqp/Authorise.h b/qpid/cpp/src/qpid/broker/amqp/Authorise.h
index 3714511177..3506cab845 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Authorise.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Authorise.h
@@ -40,6 +40,7 @@ class Authorise
{
public:
Authorise(const std::string& user, AclModule*);
+ void access(const std::string& name);
void access(boost::shared_ptr<Exchange>);
void access(boost::shared_ptr<Queue>);
void incoming(boost::shared_ptr<Exchange>);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 7780456bd1..2f793aea14 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -206,6 +206,19 @@ class IncomingToExchange : public DecodingIncoming
bool isControllingLink;
};
+class AnonymousRelay : public DecodingIncoming
+{
+ public:
+ AnonymousRelay(Broker& b, Connection& c, Session& p, pn_link_t* l)
+ : DecodingIncoming(l, b, p, std::string(), "ANONYMOUS-RELAY", pn_link_name(l)), authorise(p.getAuthorise()), context(c)
+ {}
+ void handle(qpid::broker::Message& m, qpid::broker::TxBuffer*);
+ private:
+ boost::shared_ptr<qpid::broker::Exchange> exchange;
+ Authorise& authorise;
+ BrokerContext& context;
+};
+
class IncomingToCoordinator : public DecodingIncoming
{
public:
@@ -411,7 +424,13 @@ void Session::attach(pn_link_t* link)
std::string name;
if (pn_terminus_get_type(target) == PN_UNSPECIFIED) {
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
- throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No target specified!");
+ authorise.access("ANONYMOUS-RELAY");
+ boost::shared_ptr<Incoming> r(new AnonymousRelay(connection.getBroker(), connection, *this, link));
+ incoming[link] = r;
+ if (connection.getBroker().isAuthenticating() && !connection.isLink())
+ r->verify(connection.getUserId(), connection.getBroker().getRealm());
+ QPID_LOG(debug, "Incoming link attached for ANONYMOUS-RELAY");
+ return;
} else if (pn_terminus_get_type(target) == PN_COORDINATOR) {
QPID_LOG(debug, "Received attach request for incoming link to transaction coordinator on " << this);
boost::shared_ptr<Incoming> i(new IncomingToCoordinator(link, connection.getBroker(), *this));
@@ -937,6 +956,42 @@ void IncomingToExchange::handle(qpid::broker::Message& message, qpid::broker::Tx
}
}
+void AnonymousRelay::handle(qpid::broker::Message& message, qpid::broker::TxBuffer* transaction)
+{
+ // need to retrieve AMQP 1.0 'to' field and resolve it to a queue or exchange
+ std::string dest = message.getTo();
+ QPID_LOG(debug, "AnonymousRelay received message for " << dest);
+ boost::shared_ptr<qpid::broker::Exchange> exchange;
+ boost::shared_ptr<qpid::broker::Queue> queue;
+ boost::shared_ptr<qpid::broker::amqp::Topic> topic;
+
+ queue = context.getBroker().getQueues().find(dest);
+ if (!queue) {
+ topic = context.getTopics().get(dest);
+ if (topic) {
+ exchange = topic->getExchange();
+ } else {
+ exchange = context.getBroker().getExchanges().find(dest);
+ }
+ }
+
+ try {
+ if (queue) {
+ authorise.incoming(queue);
+ queue->deliver(message, transaction);
+ } else if (exchange) {
+ authorise.route(exchange, message);
+ DeliverableMessage deliverable(message, transaction);
+ exchange->route(deliverable);
+ } else {
+ QPID_LOG(info, "AnonymousRelay dropping message for " << dest);
+ }
+ } catch (const qpid::SessionException& e) {
+ throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
+ }
+
+}
+
void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> message, pn_delivery_t* delivery)
{
if (message && message->isTypedBody()) {
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 8033cc5dee..608d6c2883 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -102,6 +102,8 @@ const std::string X_SUBSCRIBE("x-subscribe");
const std::string ARGUMENTS("arguments");
const std::string EXCHANGE_TYPE("exchange-type");
+const std::string NULL_ADDRESS("<null>");
+
const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER);
const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER);
@@ -562,6 +564,11 @@ bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const
return result;
}
+bool AddressHelper::isNameNull() const
+{
+ return name == NULL_ADDRESS;
+}
+
bool AddressHelper::isUnreliable() const
{
return reliability == AT_MOST_ONCE || reliability == UNRELIABLE ||
@@ -605,6 +612,8 @@ void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMod
//application expects a name to be generated
pn_terminus_set_dynamic(terminus, true);
setNodeProperties(terminus);
+ } else if (name == NULL_ADDRESS) {
+ pn_terminus_set_type(terminus, PN_UNSPECIFIED);
} else {
pn_terminus_set_address(terminus, name.c_str());
if (createEnabled(mode)) {
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
index 3ee58cad8d..381f37ac88 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
@@ -40,6 +40,7 @@ class AddressHelper
void configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode);
void checkAssertion(pn_terminus_t* terminus, CheckMode mode);
+ bool isNameNull() const;
bool isUnreliable() const;
const qpid::types::Variant::Map& getNodeProperties() const;
bool getLinkSource(std::string& out) const;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index a28509b0b1..427a6336e9 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -85,7 +85,7 @@ const std::string& ReceiverContext::getSource() const
void ReceiverContext::verify()
{
pn_terminus_t* source = pn_link_remote_source(receiver);
- if (!pn_terminus_get_address(source)) {
+ if (!helper.isNameNull() && !pn_terminus_get_address(source)) {
std::string msg("No such source : ");
msg += getSource();
QPID_LOG(debug, msg);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 5289fbdf9b..fe8b4d33b7 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -580,7 +580,7 @@ void SenderContext::Delivery::settle()
void SenderContext::verify()
{
pn_terminus_t* target = pn_link_remote_target(sender);
- if (!pn_terminus_get_address(target)) {
+ if (!helper.isNameNull() && !pn_terminus_get_address(target)) {
std::string msg("No such target : ");
msg += getTarget();
QPID_LOG(debug, msg);
diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/general.py b/qpid/tests/src/py/qpid_tests/broker_1_0/general.py
index 414d0b48f5..a5b9779add 100644
--- a/qpid/tests/src/py/qpid_tests/broker_1_0/general.py
+++ b/qpid/tests/src/py/qpid_tests/broker_1_0/general.py
@@ -69,3 +69,13 @@ class GeneralTests (VersionTest):
assert msg.content == expected.content
self.ssn.acknowledge(msg)
+ def test_anonymous_relay(self):
+ snd = self.ssn.sender("<null>")
+ rcv = self.ssn.receiver("#")
+
+ snd.send(Message(id="a1", content="my-message", properties={'x-amqp-to':rcv.source}))
+
+ request = rcv.fetch(5)
+ assert request.content == "my-message" and request.id == "a1", request
+
+ self.ssn.acknowledge()