diff options
author | Gordon Sim <gsim@apache.org> | 2015-11-06 20:31:55 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2015-11-06 20:31:55 +0000 |
commit | b1dcd3eb8ac737c9be61c1c67bcb57ff04344c63 (patch) | |
tree | 149d8a83dae2b410cea8b8a11bbe27683f5a0e6f | |
parent | 74f0456e992417d3a382c4b548ef5ac8901f23ec (diff) | |
download | qpid-python-b1dcd3eb8ac737c9be61c1c67bcb57ff04344c63.tar.gz |
QPID-6754: add support for null target
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1713024 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Authorise.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Authorise.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 57 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 2 | ||||
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_1_0/general.py | 10 |
8 files changed, 87 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp b/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp index 3c88414567..57d995cb07 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp @@ -40,6 +40,14 @@ const std::string POLICY_TYPE("qpid.policy_type"); } Authorise::Authorise(const std::string& u, AclModule* a) : user(u), acl(a) {} +void Authorise::access(const std::string& name) +{ + if (acl) { + std::map<acl::Property, std::string> params; + if (!acl->authorise(user, acl::ACT_ACCESS, acl::OBJ_EXCHANGE, name, ¶ms)) + throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied exchange access request from " << user)); + } +} void Authorise::access(boost::shared_ptr<Exchange> exchange) { if (acl) { diff --git a/qpid/cpp/src/qpid/broker/amqp/Authorise.h b/qpid/cpp/src/qpid/broker/amqp/Authorise.h index 3714511177..3506cab845 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Authorise.h +++ b/qpid/cpp/src/qpid/broker/amqp/Authorise.h @@ -40,6 +40,7 @@ class Authorise { public: Authorise(const std::string& user, AclModule*); + void access(const std::string& name); void access(boost::shared_ptr<Exchange>); void access(boost::shared_ptr<Queue>); void incoming(boost::shared_ptr<Exchange>); diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 7780456bd1..2f793aea14 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -206,6 +206,19 @@ class IncomingToExchange : public DecodingIncoming bool isControllingLink; }; +class AnonymousRelay : public DecodingIncoming +{ + public: + AnonymousRelay(Broker& b, Connection& c, Session& p, pn_link_t* l) + : DecodingIncoming(l, b, p, std::string(), "ANONYMOUS-RELAY", pn_link_name(l)), authorise(p.getAuthorise()), context(c) + {} + void handle(qpid::broker::Message& m, qpid::broker::TxBuffer*); + private: + boost::shared_ptr<qpid::broker::Exchange> exchange; + Authorise& authorise; + BrokerContext& context; +}; + class IncomingToCoordinator : public DecodingIncoming { public: @@ -411,7 +424,13 @@ void Session::attach(pn_link_t* link) std::string name; if (pn_terminus_get_type(target) == PN_UNSPECIFIED) { pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); - throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No target specified!"); + authorise.access("ANONYMOUS-RELAY"); + boost::shared_ptr<Incoming> r(new AnonymousRelay(connection.getBroker(), connection, *this, link)); + incoming[link] = r; + if (connection.getBroker().isAuthenticating() && !connection.isLink()) + r->verify(connection.getUserId(), connection.getBroker().getRealm()); + QPID_LOG(debug, "Incoming link attached for ANONYMOUS-RELAY"); + return; } else if (pn_terminus_get_type(target) == PN_COORDINATOR) { QPID_LOG(debug, "Received attach request for incoming link to transaction coordinator on " << this); boost::shared_ptr<Incoming> i(new IncomingToCoordinator(link, connection.getBroker(), *this)); @@ -937,6 +956,42 @@ void IncomingToExchange::handle(qpid::broker::Message& message, qpid::broker::Tx } } +void AnonymousRelay::handle(qpid::broker::Message& message, qpid::broker::TxBuffer* transaction) +{ + // need to retrieve AMQP 1.0 'to' field and resolve it to a queue or exchange + std::string dest = message.getTo(); + QPID_LOG(debug, "AnonymousRelay received message for " << dest); + boost::shared_ptr<qpid::broker::Exchange> exchange; + boost::shared_ptr<qpid::broker::Queue> queue; + boost::shared_ptr<qpid::broker::amqp::Topic> topic; + + queue = context.getBroker().getQueues().find(dest); + if (!queue) { + topic = context.getTopics().get(dest); + if (topic) { + exchange = topic->getExchange(); + } else { + exchange = context.getBroker().getExchanges().find(dest); + } + } + + try { + if (queue) { + authorise.incoming(queue); + queue->deliver(message, transaction); + } else if (exchange) { + authorise.route(exchange, message); + DeliverableMessage deliverable(message, transaction); + exchange->route(deliverable); + } else { + QPID_LOG(info, "AnonymousRelay dropping message for " << dest); + } + } catch (const qpid::SessionException& e) { + throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what()); + } + +} + void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> message, pn_delivery_t* delivery) { if (message && message->isTypedBody()) { diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 8033cc5dee..608d6c2883 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -102,6 +102,8 @@ const std::string X_SUBSCRIBE("x-subscribe"); const std::string ARGUMENTS("arguments"); const std::string EXCHANGE_TYPE("exchange-type"); +const std::string NULL_ADDRESS("<null>"); + const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER); const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER); @@ -562,6 +564,11 @@ bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const return result; } +bool AddressHelper::isNameNull() const +{ + return name == NULL_ADDRESS; +} + bool AddressHelper::isUnreliable() const { return reliability == AT_MOST_ONCE || reliability == UNRELIABLE || @@ -605,6 +612,8 @@ void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMod //application expects a name to be generated pn_terminus_set_dynamic(terminus, true); setNodeProperties(terminus); + } else if (name == NULL_ADDRESS) { + pn_terminus_set_type(terminus, PN_UNSPECIFIED); } else { pn_terminus_set_address(terminus, name.c_str()); if (createEnabled(mode)) { diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h index 3ee58cad8d..381f37ac88 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -40,6 +40,7 @@ class AddressHelper void configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode); void checkAssertion(pn_terminus_t* terminus, CheckMode mode); + bool isNameNull() const; bool isUnreliable() const; const qpid::types::Variant::Map& getNodeProperties() const; bool getLinkSource(std::string& out) const; diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index a28509b0b1..427a6336e9 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -85,7 +85,7 @@ const std::string& ReceiverContext::getSource() const void ReceiverContext::verify() { pn_terminus_t* source = pn_link_remote_source(receiver); - if (!pn_terminus_get_address(source)) { + if (!helper.isNameNull() && !pn_terminus_get_address(source)) { std::string msg("No such source : "); msg += getSource(); QPID_LOG(debug, msg); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 5289fbdf9b..fe8b4d33b7 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -580,7 +580,7 @@ void SenderContext::Delivery::settle() void SenderContext::verify() { pn_terminus_t* target = pn_link_remote_target(sender); - if (!pn_terminus_get_address(target)) { + if (!helper.isNameNull() && !pn_terminus_get_address(target)) { std::string msg("No such target : "); msg += getTarget(); QPID_LOG(debug, msg); diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/general.py b/qpid/tests/src/py/qpid_tests/broker_1_0/general.py index 414d0b48f5..a5b9779add 100644 --- a/qpid/tests/src/py/qpid_tests/broker_1_0/general.py +++ b/qpid/tests/src/py/qpid_tests/broker_1_0/general.py @@ -69,3 +69,13 @@ class GeneralTests (VersionTest): assert msg.content == expected.content self.ssn.acknowledge(msg) + def test_anonymous_relay(self): + snd = self.ssn.sender("<null>") + rcv = self.ssn.receiver("#") + + snd.send(Message(id="a1", content="my-message", properties={'x-amqp-to':rcv.source})) + + request = rcv.fetch(5) + assert request.content == "my-message" and request.id == "a1", request + + self.ssn.acknowledge() |