summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2011-02-02 18:16:57 +0000
committerTed Ross <tross@apache.org>2011-02-02 18:16:57 +0000
commit9991fae5ea4415b6ef760a4430658202b90264bc (patch)
tree93399916cd50e0ac10aa761144b4915d9085e217
parent24fb6939e5420ecae9033687c8c6081a62cd42a5 (diff)
downloadqpid-python-9991fae5ea4415b6ef760a4430658202b90264bc.tar.gz
QPID-3032 - Modifications to the QMFv2 implementation:
1) Use the topic exchange as the base for direct and reply-to addresses. 2) Add a strict-security option to the Console and Agent APIs that narrows the messaging patterns used such that they can easily be controlled by broker ACL policy. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1066562 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/include/qmf/AgentSession.h4
-rw-r--r--cpp/include/qmf/ConsoleSession.h4
-rw-r--r--cpp/src/qmf/Agent.cpp24
-rw-r--r--cpp/src/qmf/AgentImpl.h3
-rw-r--r--cpp/src/qmf/AgentSession.cpp42
-rw-r--r--cpp/src/qmf/ConsoleSession.cpp27
-rw-r--r--cpp/src/qmf/ConsoleSessionImpl.h2
-rw-r--r--cpp/src/qmf/SchemaId.cpp3
8 files changed, 90 insertions, 19 deletions
diff --git a/cpp/include/qmf/AgentSession.h b/cpp/include/qmf/AgentSession.h
index 090017779f..23058c56c6 100644
--- a/cpp/include/qmf/AgentSession.h
+++ b/cpp/include/qmf/AgentSession.h
@@ -67,6 +67,10 @@ namespace qmf {
* sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300]
* public-events:{True,False} - If True: QMF events are sent to the topic exchange [default]
* If False: QMF events are only sent to authorized subscribers
+ * listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default]
+ * If False: Listen only on the routable direct address
+ * strict-security:{True,False} - If True: Cooperate with the broker to enforce string access control to the network
+ * - If False: Operate more flexibly with regard to use of messaging facilities [default]
*/
QMF_EXTERN AgentSession(qpid::messaging::Connection&, const std::string& options="");
diff --git a/cpp/include/qmf/ConsoleSession.h b/cpp/include/qmf/ConsoleSession.h
index ba8b3de92f..2422383fa3 100644
--- a/cpp/include/qmf/ConsoleSession.h
+++ b/cpp/include/qmf/ConsoleSession.h
@@ -57,6 +57,10 @@ namespace qmf {
* domain:NAME - QMF Domain to join [default: "default"]
* max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from
* an agent before deleting it [default: 5]
+ * listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default]
+ * If False: Listen only on the routable direct address
+ * strict-security:{True,False} - If True: Cooperate with the broker to enforce string access control to the network
+ * - If False: Operate more flexibly with regard to use of messaging facilities [default]
*/
QMF_EXTERN ConsoleSession(qpid::messaging::Connection&, const std::string& options="");
QMF_EXTERN void setDomain(const std::string&);
diff --git a/cpp/src/qmf/Agent.cpp b/cpp/src/qmf/Agent.cpp
index 3a385b3741..1b7d03968e 100644
--- a/cpp/src/qmf/Agent.cpp
+++ b/cpp/src/qmf/Agent.cpp
@@ -71,8 +71,8 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema(
AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) :
- name(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
- nextCorrelator(1), schemaCache(s.schemaCache)
+ name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
+ sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache)
{
}
@@ -83,6 +83,11 @@ void AgentImpl::setAttribute(const std::string& k, const qpid::types::Variant& v
try {
capability = v.asUint32();
} catch (std::exception&) {}
+ if (k == "_direct_subject")
+ try {
+ directSubject = v.asString();
+ sender = session.topicSender;
+ } catch (std::exception&) {}
}
const Variant& AgentImpl::getAttribute(const string& k) const
@@ -514,9 +519,10 @@ void AgentImpl::sendQuery(const Query& query, uint32_t correlator)
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
- msg.setSubject(name);
+ msg.setSubject(directSubject);
encode(QueryImplAccess::get(query).asMap(), msg);
- session.directSender.send(msg);
+ if (sender.isValid())
+ sender.send(msg);
QPID_LOG(trace, "SENT QueryRequest to=" << name);
}
@@ -538,9 +544,10 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
- msg.setSubject(name);
+ msg.setSubject(directSubject);
encode(map, msg);
- session.directSender.send(msg);
+ if (sender.isValid())
+ sender.send(msg);
QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name);
}
@@ -578,8 +585,9 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id)
Message msg;
msg.setReplyTo(session.replyAddress);
msg.setContent(content);
- msg.setSubject(name);
- session.directSender.send(msg);
+ msg.setSubject(directSubject);
+ if (sender.isValid())
+ sender.send(msg);
QPID_LOG(trace, "SENT V1SchemaRequest to=" << name);
}
diff --git a/cpp/src/qmf/AgentImpl.h b/cpp/src/qmf/AgentImpl.h
index b852570418..7fa4f4373a 100644
--- a/cpp/src/qmf/AgentImpl.h
+++ b/cpp/src/qmf/AgentImpl.h
@@ -29,6 +29,7 @@
#include "qmf/SchemaCache.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Sender.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Condition.h"
#include <boost/shared_ptr.hpp>
@@ -90,11 +91,13 @@ namespace qmf {
mutable qpid::sys::Mutex lock;
std::string name;
+ std::string directSubject;
uint32_t epoch;
ConsoleSessionImpl& session;
bool touched;
uint32_t untouchedCount;
uint32_t capability;
+ qpid::messaging::Sender sender;
qpid::types::Variant::Map attributes;
uint32_t nextCorrelator;
std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap;
diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp
index 24356519d7..fb18f27150 100644
--- a/cpp/src/qmf/AgentSession.cpp
+++ b/cpp/src/qmf/AgentSession.cpp
@@ -116,6 +116,8 @@ namespace qmf {
uint32_t minSubInterval;
uint32_t subLifetime;
bool publicEvents;
+ bool listenOnDirect;
+ bool strictSecurity;
uint64_t schemaUpdateTime;
string directBase;
string topicBase;
@@ -179,6 +181,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
+ listenOnDirect(true), strictSecurity(false),
schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
{
//
@@ -231,6 +234,14 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
iter = optMap.find("public-events");
if (iter != optMap.end())
publicEvents = iter->second.asBool();
+
+ iter = optMap.find("listen-on-direct");
+ if (iter != optMap.end())
+ listenOnDirect = iter->second.asBool();
+
+ iter = optMap.find("strict-security");
+ if (iter != optMap.end())
+ strictSecurity = iter->second.asBool();
}
}
@@ -248,6 +259,8 @@ void AgentSessionImpl::open()
throw QmfException("The session is already open");
const string addrArgs(";{create:never,node:{type:topic}}");
+ const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
+ attributes["_direct_subject"] = routableAddr;
// Establish messaging addresses
setAgentName();
@@ -256,13 +269,20 @@ void AgentSessionImpl::open()
// Create AMQP session, receivers, and senders
session = connection.createSession();
- Receiver directRx = session.createReceiver(directBase + "/" + agentName + addrArgs);
+ Receiver directRx;
+ Receiver routableDirectRx = session.createReceiver(topicBase + "/" + routableAddr + addrArgs);
Receiver topicRx = session.createReceiver(topicBase + "/console.#" + addrArgs);
- directRx.setCapacity(64);
+ if (listenOnDirect && !strictSecurity) {
+ directRx = session.createReceiver(directBase + "/" + agentName + addrArgs);
+ directRx.setCapacity(64);
+ }
+
+ routableDirectRx.setCapacity(64);
topicRx.setCapacity(64);
- directSender = session.createSender(directBase + addrArgs);
+ if (!strictSecurity)
+ directSender = session.createSender(directBase + addrArgs);
topicSender = session.createSender(topicBase + addrArgs);
// Start the receiver thread
@@ -794,6 +814,17 @@ void AgentSessionImpl::dispatch(Message msg)
const Variant::Map& properties(msg.getProperties());
Variant::Map::const_iterator iter;
+ //
+ // If strict-security is enabled, make sure that reply-to address complies with the
+ // strict-security addressing pattern (i.e. start with 'qmf.<domain>.topic/direct-console.').
+ //
+ if (strictSecurity && msg.getReplyTo()) {
+ if (msg.getReplyTo().getName() != topicBase || msg.getReplyTo().getSubject().find("direct-console.") != 0) {
+ QPID_LOG(warning, "Reply-to violates strict-security policy: " << msg.getReplyTo().str());
+ return;
+ }
+ }
+
iter = properties.find(protocol::HEADER_KEY_APP_ID);
if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) {
//
@@ -892,6 +923,11 @@ void AgentSessionImpl::send(Message msg, const Address& to)
{
Sender sender;
+ if (strictSecurity && to.getName() != topicBase) {
+ QPID_LOG(warning, "Address violates strict-security policy: " << to);
+ return;
+ }
+
if (to.getName() == directBase) {
msg.setSubject(to.getSubject());
sender = directSender;
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp
index f327170c5e..00ea397c4b 100644
--- a/cpp/src/qmf/ConsoleSession.cpp
+++ b/cpp/src/qmf/ConsoleSession.cpp
@@ -36,6 +36,7 @@ using namespace qmf;
using qpid::messaging::Address;
using qpid::messaging::Connection;
using qpid::messaging::Receiver;
+using qpid::messaging::Sender;
using qpid::messaging::Duration;
using qpid::messaging::Message;
using qpid::types::Variant;
@@ -82,6 +83,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
iter = optMap.find("max-agent-age");
if (iter != optMap.end())
maxAgentAgeMinutes = iter->second.asUint32();
+
+ iter = optMap.find("listen-on-direct");
+ if (iter != optMap.end())
+ listenOnDirect = iter->second.asBool();
+
+ iter = optMap.find("strict-security");
+ if (iter != optMap.end())
+ strictSecurity = iter->second.asBool();
}
}
@@ -148,24 +157,26 @@ void ConsoleSessionImpl::open()
directBase = "qmf." + domain + ".direct";
topicBase = "qmf." + domain + ".topic";
- string myKey("qmf-console-" + qpid::types::Uuid(true).str());
+ string myKey("direct-console." + qpid::types::Uuid(true).str());
- replyAddress = Address(directBase + "/" + myKey + ";{node:{type:topic}}");
+ replyAddress = Address(topicBase + "/" + myKey + ";{node:{type:topic}}");
// Create AMQP session, receivers, and senders
session = connection.createSession();
Receiver directRx = session.createReceiver(replyAddress);
Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating
- Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+ if (!strictSecurity) {
+ Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+ legacyRx.setCapacity(64);
+ directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ directSender.setCapacity(128);
+ }
directRx.setCapacity(64);
topicRx.setCapacity(128);
- legacyRx.setCapacity(64);
- directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}");
- directSender.setCapacity(64);
topicSender.setCapacity(128);
// Start the receiver thread
@@ -371,7 +382,9 @@ void ConsoleSessionImpl::sendBrokerLocate()
msg.setCorrelationId("broker-locate");
msg.setSubject("broker");
- directSender.send(msg);
+ Sender sender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ sender.send(msg);
+ sender.close();
QPID_LOG(trace, "SENT AgentLocate to broker");
}
diff --git a/cpp/src/qmf/ConsoleSessionImpl.h b/cpp/src/qmf/ConsoleSessionImpl.h
index 85ddc820f4..675c8bcfb5 100644
--- a/cpp/src/qmf/ConsoleSessionImpl.h
+++ b/cpp/src/qmf/ConsoleSessionImpl.h
@@ -73,6 +73,8 @@ namespace qmf {
qpid::messaging::Sender topicSender;
std::string domain;
uint32_t maxAgentAgeMinutes;
+ bool listenOnDirect;
+ bool strictSecurity;
Query agentQuery;
bool opened;
std::queue<ConsoleEvent> eventQueue;
diff --git a/cpp/src/qmf/SchemaId.cpp b/cpp/src/qmf/SchemaId.cpp
index 110a2553fd..25fa9915ae 100644
--- a/cpp/src/qmf/SchemaId.cpp
+++ b/cpp/src/qmf/SchemaId.cpp
@@ -78,7 +78,8 @@ Variant::Map SchemaIdImpl::asMap() const
result["_type"] = "_data";
else
result["_type"] = "_event";
- result["_hash"] = hash;
+ if (!hash.isNull())
+ result["_hash"] = hash;
return result;
}