summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/AgentSession.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/AgentSession.cpp')
-rw-r--r--qpid/cpp/src/qmf/AgentSession.cpp82
1 files changed, 73 insertions, 9 deletions
diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp
index 24356519d7..30176a8c01 100644
--- a/qpid/cpp/src/qmf/AgentSession.cpp
+++ b/qpid/cpp/src/qmf/AgentSession.cpp
@@ -85,6 +85,7 @@ namespace qmf {
void complete(AgentEvent& e);
void methodSuccess(AgentEvent& e);
void raiseEvent(const Data& d);
+ void raiseEvent(const Data& d, int s);
private:
typedef map<DataAddr, Data, DataAddrCompare> DataIndex;
@@ -116,6 +117,8 @@ namespace qmf {
uint32_t minSubInterval;
uint32_t subLifetime;
bool publicEvents;
+ bool listenOnDirect;
+ bool strictSecurity;
uint64_t schemaUpdateTime;
string directBase;
string topicBase;
@@ -169,6 +172,7 @@ void AgentSession::response(AgentEvent& e, const Data& d) { impl->response(e, d)
void AgentSession::complete(AgentEvent& e) { impl->complete(e); }
void AgentSession::methodSuccess(AgentEvent& e) { impl->methodSuccess(e); }
void AgentSession::raiseEvent(const Data& d) { impl->raiseEvent(d); }
+void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); }
//========================================================================================
// Impl Method Bodies
@@ -179,6 +183,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
+ listenOnDirect(true), strictSecurity(false),
schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
{
//
@@ -231,6 +236,14 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
iter = optMap.find("public-events");
if (iter != optMap.end())
publicEvents = iter->second.asBool();
+
+ iter = optMap.find("listen-on-direct");
+ if (iter != optMap.end())
+ listenOnDirect = iter->second.asBool();
+
+ iter = optMap.find("strict-security");
+ if (iter != optMap.end())
+ strictSecurity = iter->second.asBool();
}
}
@@ -248,6 +261,8 @@ void AgentSessionImpl::open()
throw QmfException("The session is already open");
const string addrArgs(";{create:never,node:{type:topic}}");
+ const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
+ attributes["_direct_subject"] = routableAddr;
// Establish messaging addresses
setAgentName();
@@ -256,13 +271,20 @@ void AgentSessionImpl::open()
// Create AMQP session, receivers, and senders
session = connection.createSession();
- Receiver directRx = session.createReceiver(directBase + "/" + agentName + addrArgs);
+ Receiver directRx;
+ Receiver routableDirectRx = session.createReceiver(topicBase + "/" + routableAddr + addrArgs);
Receiver topicRx = session.createReceiver(topicBase + "/console.#" + addrArgs);
- directRx.setCapacity(64);
+ if (listenOnDirect && !strictSecurity) {
+ directRx = session.createReceiver(directBase + "/" + agentName + addrArgs);
+ directRx.setCapacity(64);
+ }
+
+ routableDirectRx.setCapacity(64);
topicRx.setCapacity(64);
- directSender = session.createSender(directBase + addrArgs);
+ if (!strictSecurity)
+ directSender = session.createSender(directBase + addrArgs);
topicSender = session.createSender(topicBase + addrArgs);
// Start the receiver thread
@@ -506,24 +528,50 @@ void AgentSessionImpl::methodSuccess(AgentEvent& event)
void AgentSessionImpl::raiseEvent(const Data& data)
{
+ int severity(SEV_NOTICE);
+ if (data.hasSchema()) {
+ const Schema& schema(DataImplAccess::get(data).getSchema());
+ if (schema.isValid())
+ severity = schema.getDefaultSeverity();
+ }
+
+ raiseEvent(data, severity);
+}
+
+
+void AgentSessionImpl::raiseEvent(const Data& data, int severity)
+{
Message msg;
Variant::Map map;
Variant::Map& headers(msg.getProperties());
+ string subject("agent.ind.event");
- // TODO: add severity.package.class to key
- // or modify to send only to subscriptions with matching queries
+ if (data.hasSchema()) {
+ const SchemaId& schemaId(data.getSchemaId());
+ if (schemaId.getType() != SCHEMA_TYPE_EVENT)
+ throw QmfException("Cannot call raiseEvent on data that is not an Event");
+ subject = subject + "." + schemaId.getPackageName() + "." + schemaId.getName();
+ }
+
+ if (severity < SEV_EMERG || severity > SEV_DEBUG)
+ throw QmfException("Invalid severity value");
headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_DATA_INDICATION;
headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_EVENT;
headers[protocol::HEADER_KEY_AGENT] = agentName;
headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
- msg.setSubject("agent.ind.event");
-
- encode(DataImplAccess::get(data).asMap(), msg);
+ msg.setSubject(subject);
+
+ Variant::List list;
+ Variant::Map dataAsMap(DataImplAccess::get(data).asMap());
+ dataAsMap["_severity"] = severity;
+ dataAsMap["_timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
+ list.push_back(dataAsMap);
+ encode(list, msg);
topicSender.send(msg);
- QPID_LOG(trace, "SENT EventIndication to=agent.ind.event");
+ QPID_LOG(trace, "SENT EventIndication to=" << subject);
}
@@ -794,6 +842,17 @@ void AgentSessionImpl::dispatch(Message msg)
const Variant::Map& properties(msg.getProperties());
Variant::Map::const_iterator iter;
+ //
+ // If strict-security is enabled, make sure that reply-to address complies with the
+ // strict-security addressing pattern (i.e. start with 'qmf.<domain>.topic/direct-console.').
+ //
+ if (strictSecurity && msg.getReplyTo()) {
+ if (msg.getReplyTo().getName() != topicBase || msg.getReplyTo().getSubject().find("direct-console.") != 0) {
+ QPID_LOG(warning, "Reply-to violates strict-security policy: " << msg.getReplyTo().str());
+ return;
+ }
+ }
+
iter = properties.find(protocol::HEADER_KEY_APP_ID);
if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) {
//
@@ -892,6 +951,11 @@ void AgentSessionImpl::send(Message msg, const Address& to)
{
Sender sender;
+ if (strictSecurity && to.getName() != topicBase) {
+ QPID_LOG(warning, "Address violates strict-security policy: " << to);
+ return;
+ }
+
if (to.getName() == directBase) {
msg.setSubject(to.getSubject());
sender = directSender;