diff options
author | Ted Ross <tross@apache.org> | 2011-02-02 18:16:57 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2011-02-02 18:16:57 +0000 |
commit | 9991fae5ea4415b6ef760a4430658202b90264bc (patch) | |
tree | 93399916cd50e0ac10aa761144b4915d9085e217 | |
parent | 24fb6939e5420ecae9033687c8c6081a62cd42a5 (diff) | |
download | qpid-python-9991fae5ea4415b6ef760a4430658202b90264bc.tar.gz |
QPID-3032 - Modifications to the QMFv2 implementation:
1) Use the topic exchange as the base for direct and reply-to addresses.
2) Add a strict-security option to the Console and Agent APIs that narrows the messaging
patterns used such that they can easily be controlled by broker ACL policy.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1066562 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/include/qmf/AgentSession.h | 4 | ||||
-rw-r--r-- | cpp/include/qmf/ConsoleSession.h | 4 | ||||
-rw-r--r-- | cpp/src/qmf/Agent.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qmf/AgentImpl.h | 3 | ||||
-rw-r--r-- | cpp/src/qmf/AgentSession.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qmf/ConsoleSession.cpp | 27 | ||||
-rw-r--r-- | cpp/src/qmf/ConsoleSessionImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qmf/SchemaId.cpp | 3 |
8 files changed, 90 insertions, 19 deletions
diff --git a/cpp/include/qmf/AgentSession.h b/cpp/include/qmf/AgentSession.h index 090017779f..23058c56c6 100644 --- a/cpp/include/qmf/AgentSession.h +++ b/cpp/include/qmf/AgentSession.h @@ -67,6 +67,10 @@ namespace qmf { * sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300] * public-events:{True,False} - If True: QMF events are sent to the topic exchange [default] * If False: QMF events are only sent to authorized subscribers + * listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default] + * If False: Listen only on the routable direct address + * strict-security:{True,False} - If True: Cooperate with the broker to enforce string access control to the network + * - If False: Operate more flexibly with regard to use of messaging facilities [default] */ QMF_EXTERN AgentSession(qpid::messaging::Connection&, const std::string& options=""); diff --git a/cpp/include/qmf/ConsoleSession.h b/cpp/include/qmf/ConsoleSession.h index ba8b3de92f..2422383fa3 100644 --- a/cpp/include/qmf/ConsoleSession.h +++ b/cpp/include/qmf/ConsoleSession.h @@ -57,6 +57,10 @@ namespace qmf { * domain:NAME - QMF Domain to join [default: "default"] * max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from * an agent before deleting it [default: 5] + * listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default] + * If False: Listen only on the routable direct address + * strict-security:{True,False} - If True: Cooperate with the broker to enforce string access control to the network + * - If False: Operate more flexibly with regard to use of messaging facilities [default] */ QMF_EXTERN ConsoleSession(qpid::messaging::Connection&, const std::string& options=""); QMF_EXTERN void setDomain(const std::string&); diff --git a/cpp/src/qmf/Agent.cpp b/cpp/src/qmf/Agent.cpp index 3a385b3741..1b7d03968e 100644 --- a/cpp/src/qmf/Agent.cpp +++ b/cpp/src/qmf/Agent.cpp @@ -71,8 +71,8 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema( AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) : - name(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0), - nextCorrelator(1), schemaCache(s.schemaCache) + name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0), + sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache) { } @@ -83,6 +83,11 @@ void AgentImpl::setAttribute(const std::string& k, const qpid::types::Variant& v try { capability = v.asUint32(); } catch (std::exception&) {} + if (k == "_direct_subject") + try { + directSubject = v.asString(); + sender = session.topicSender; + } catch (std::exception&) {} } const Variant& AgentImpl::getAttribute(const string& k) const @@ -514,9 +519,10 @@ void AgentImpl::sendQuery(const Query& query, uint32_t correlator) msg.setReplyTo(session.replyAddress); msg.setCorrelationId(boost::lexical_cast<string>(correlator)); - msg.setSubject(name); + msg.setSubject(directSubject); encode(QueryImplAccess::get(query).asMap(), msg); - session.directSender.send(msg); + if (sender.isValid()) + sender.send(msg); QPID_LOG(trace, "SENT QueryRequest to=" << name); } @@ -538,9 +544,10 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const msg.setReplyTo(session.replyAddress); msg.setCorrelationId(boost::lexical_cast<string>(correlator)); - msg.setSubject(name); + msg.setSubject(directSubject); encode(map, msg); - session.directSender.send(msg); + if (sender.isValid()) + sender.send(msg); QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name); } @@ -578,8 +585,9 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id) Message msg; msg.setReplyTo(session.replyAddress); msg.setContent(content); - msg.setSubject(name); - session.directSender.send(msg); + msg.setSubject(directSubject); + if (sender.isValid()) + sender.send(msg); QPID_LOG(trace, "SENT V1SchemaRequest to=" << name); } diff --git a/cpp/src/qmf/AgentImpl.h b/cpp/src/qmf/AgentImpl.h index b852570418..7fa4f4373a 100644 --- a/cpp/src/qmf/AgentImpl.h +++ b/cpp/src/qmf/AgentImpl.h @@ -29,6 +29,7 @@ #include "qmf/SchemaCache.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/Message.h" +#include "qpid/messaging/Sender.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Condition.h" #include <boost/shared_ptr.hpp> @@ -90,11 +91,13 @@ namespace qmf { mutable qpid::sys::Mutex lock; std::string name; + std::string directSubject; uint32_t epoch; ConsoleSessionImpl& session; bool touched; uint32_t untouchedCount; uint32_t capability; + qpid::messaging::Sender sender; qpid::types::Variant::Map attributes; uint32_t nextCorrelator; std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap; diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp index 24356519d7..fb18f27150 100644 --- a/cpp/src/qmf/AgentSession.cpp +++ b/cpp/src/qmf/AgentSession.cpp @@ -116,6 +116,8 @@ namespace qmf { uint32_t minSubInterval; uint32_t subLifetime; bool publicEvents; + bool listenOnDirect; + bool strictSecurity; uint64_t schemaUpdateTime; string directBase; string topicBase; @@ -179,6 +181,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false), externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true), + listenOnDirect(true), strictSecurity(false), schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) { // @@ -231,6 +234,14 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : iter = optMap.find("public-events"); if (iter != optMap.end()) publicEvents = iter->second.asBool(); + + iter = optMap.find("listen-on-direct"); + if (iter != optMap.end()) + listenOnDirect = iter->second.asBool(); + + iter = optMap.find("strict-security"); + if (iter != optMap.end()) + strictSecurity = iter->second.asBool(); } } @@ -248,6 +259,8 @@ void AgentSessionImpl::open() throw QmfException("The session is already open"); const string addrArgs(";{create:never,node:{type:topic}}"); + const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str()); + attributes["_direct_subject"] = routableAddr; // Establish messaging addresses setAgentName(); @@ -256,13 +269,20 @@ void AgentSessionImpl::open() // Create AMQP session, receivers, and senders session = connection.createSession(); - Receiver directRx = session.createReceiver(directBase + "/" + agentName + addrArgs); + Receiver directRx; + Receiver routableDirectRx = session.createReceiver(topicBase + "/" + routableAddr + addrArgs); Receiver topicRx = session.createReceiver(topicBase + "/console.#" + addrArgs); - directRx.setCapacity(64); + if (listenOnDirect && !strictSecurity) { + directRx = session.createReceiver(directBase + "/" + agentName + addrArgs); + directRx.setCapacity(64); + } + + routableDirectRx.setCapacity(64); topicRx.setCapacity(64); - directSender = session.createSender(directBase + addrArgs); + if (!strictSecurity) + directSender = session.createSender(directBase + addrArgs); topicSender = session.createSender(topicBase + addrArgs); // Start the receiver thread @@ -794,6 +814,17 @@ void AgentSessionImpl::dispatch(Message msg) const Variant::Map& properties(msg.getProperties()); Variant::Map::const_iterator iter; + // + // If strict-security is enabled, make sure that reply-to address complies with the + // strict-security addressing pattern (i.e. start with 'qmf.<domain>.topic/direct-console.'). + // + if (strictSecurity && msg.getReplyTo()) { + if (msg.getReplyTo().getName() != topicBase || msg.getReplyTo().getSubject().find("direct-console.") != 0) { + QPID_LOG(warning, "Reply-to violates strict-security policy: " << msg.getReplyTo().str()); + return; + } + } + iter = properties.find(protocol::HEADER_KEY_APP_ID); if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) { // @@ -892,6 +923,11 @@ void AgentSessionImpl::send(Message msg, const Address& to) { Sender sender; + if (strictSecurity && to.getName() != topicBase) { + QPID_LOG(warning, "Address violates strict-security policy: " << to); + return; + } + if (to.getName() == directBase) { msg.setSubject(to.getSubject()); sender = directSender; diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp index f327170c5e..00ea397c4b 100644 --- a/cpp/src/qmf/ConsoleSession.cpp +++ b/cpp/src/qmf/ConsoleSession.cpp @@ -36,6 +36,7 @@ using namespace qmf; using qpid::messaging::Address; using qpid::messaging::Connection; using qpid::messaging::Receiver; +using qpid::messaging::Sender; using qpid::messaging::Duration; using qpid::messaging::Message; using qpid::types::Variant; @@ -82,6 +83,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : iter = optMap.find("max-agent-age"); if (iter != optMap.end()) maxAgentAgeMinutes = iter->second.asUint32(); + + iter = optMap.find("listen-on-direct"); + if (iter != optMap.end()) + listenOnDirect = iter->second.asBool(); + + iter = optMap.find("strict-security"); + if (iter != optMap.end()) + strictSecurity = iter->second.asBool(); } } @@ -148,24 +157,26 @@ void ConsoleSessionImpl::open() directBase = "qmf." + domain + ".direct"; topicBase = "qmf." + domain + ".topic"; - string myKey("qmf-console-" + qpid::types::Uuid(true).str()); + string myKey("direct-console." + qpid::types::Uuid(true).str()); - replyAddress = Address(directBase + "/" + myKey + ";{node:{type:topic}}"); + replyAddress = Address(topicBase + "/" + myKey + ";{node:{type:topic}}"); // Create AMQP session, receivers, and senders session = connection.createSession(); Receiver directRx = session.createReceiver(replyAddress); Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating - Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}"); + if (!strictSecurity) { + Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}"); + legacyRx.setCapacity(64); + directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); + directSender.setCapacity(128); + } directRx.setCapacity(64); topicRx.setCapacity(128); - legacyRx.setCapacity(64); - directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}"); - directSender.setCapacity(64); topicSender.setCapacity(128); // Start the receiver thread @@ -371,7 +382,9 @@ void ConsoleSessionImpl::sendBrokerLocate() msg.setCorrelationId("broker-locate"); msg.setSubject("broker"); - directSender.send(msg); + Sender sender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); + sender.send(msg); + sender.close(); QPID_LOG(trace, "SENT AgentLocate to broker"); } diff --git a/cpp/src/qmf/ConsoleSessionImpl.h b/cpp/src/qmf/ConsoleSessionImpl.h index 85ddc820f4..675c8bcfb5 100644 --- a/cpp/src/qmf/ConsoleSessionImpl.h +++ b/cpp/src/qmf/ConsoleSessionImpl.h @@ -73,6 +73,8 @@ namespace qmf { qpid::messaging::Sender topicSender; std::string domain; uint32_t maxAgentAgeMinutes; + bool listenOnDirect; + bool strictSecurity; Query agentQuery; bool opened; std::queue<ConsoleEvent> eventQueue; diff --git a/cpp/src/qmf/SchemaId.cpp b/cpp/src/qmf/SchemaId.cpp index 110a2553fd..25fa9915ae 100644 --- a/cpp/src/qmf/SchemaId.cpp +++ b/cpp/src/qmf/SchemaId.cpp @@ -78,7 +78,8 @@ Variant::Map SchemaIdImpl::asMap() const result["_type"] = "_data"; else result["_type"] = "_event"; - result["_hash"] = hash; + if (!hash.isNull()) + result["_hash"] = hash; return result; } |