diff options
Diffstat (limited to 'qpid/cpp/src/qmf/AgentSession.cpp')
-rw-r--r-- | qpid/cpp/src/qmf/AgentSession.cpp | 82 |
1 files changed, 73 insertions, 9 deletions
diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp index 24356519d7..30176a8c01 100644 --- a/qpid/cpp/src/qmf/AgentSession.cpp +++ b/qpid/cpp/src/qmf/AgentSession.cpp @@ -85,6 +85,7 @@ namespace qmf { void complete(AgentEvent& e); void methodSuccess(AgentEvent& e); void raiseEvent(const Data& d); + void raiseEvent(const Data& d, int s); private: typedef map<DataAddr, Data, DataAddrCompare> DataIndex; @@ -116,6 +117,8 @@ namespace qmf { uint32_t minSubInterval; uint32_t subLifetime; bool publicEvents; + bool listenOnDirect; + bool strictSecurity; uint64_t schemaUpdateTime; string directBase; string topicBase; @@ -169,6 +172,7 @@ void AgentSession::response(AgentEvent& e, const Data& d) { impl->response(e, d) void AgentSession::complete(AgentEvent& e) { impl->complete(e); } void AgentSession::methodSuccess(AgentEvent& e) { impl->methodSuccess(e); } void AgentSession::raiseEvent(const Data& d) { impl->raiseEvent(d); } +void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); } //======================================================================================== // Impl Method Bodies @@ -179,6 +183,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false), externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true), + listenOnDirect(true), strictSecurity(false), schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) { // @@ -231,6 +236,14 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : iter = optMap.find("public-events"); if (iter != optMap.end()) publicEvents = iter->second.asBool(); + + iter = optMap.find("listen-on-direct"); + if (iter != optMap.end()) + listenOnDirect = iter->second.asBool(); + + iter = optMap.find("strict-security"); + if (iter != optMap.end()) + strictSecurity = iter->second.asBool(); } } @@ -248,6 +261,8 @@ void AgentSessionImpl::open() throw QmfException("The session is already open"); const string addrArgs(";{create:never,node:{type:topic}}"); + const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str()); + attributes["_direct_subject"] = routableAddr; // Establish messaging addresses setAgentName(); @@ -256,13 +271,20 @@ void AgentSessionImpl::open() // Create AMQP session, receivers, and senders session = connection.createSession(); - Receiver directRx = session.createReceiver(directBase + "/" + agentName + addrArgs); + Receiver directRx; + Receiver routableDirectRx = session.createReceiver(topicBase + "/" + routableAddr + addrArgs); Receiver topicRx = session.createReceiver(topicBase + "/console.#" + addrArgs); - directRx.setCapacity(64); + if (listenOnDirect && !strictSecurity) { + directRx = session.createReceiver(directBase + "/" + agentName + addrArgs); + directRx.setCapacity(64); + } + + routableDirectRx.setCapacity(64); topicRx.setCapacity(64); - directSender = session.createSender(directBase + addrArgs); + if (!strictSecurity) + directSender = session.createSender(directBase + addrArgs); topicSender = session.createSender(topicBase + addrArgs); // Start the receiver thread @@ -506,24 +528,50 @@ void AgentSessionImpl::methodSuccess(AgentEvent& event) void AgentSessionImpl::raiseEvent(const Data& data) { + int severity(SEV_NOTICE); + if (data.hasSchema()) { + const Schema& schema(DataImplAccess::get(data).getSchema()); + if (schema.isValid()) + severity = schema.getDefaultSeverity(); + } + + raiseEvent(data, severity); +} + + +void AgentSessionImpl::raiseEvent(const Data& data, int severity) +{ Message msg; Variant::Map map; Variant::Map& headers(msg.getProperties()); + string subject("agent.ind.event"); - // TODO: add severity.package.class to key - // or modify to send only to subscriptions with matching queries + if (data.hasSchema()) { + const SchemaId& schemaId(data.getSchemaId()); + if (schemaId.getType() != SCHEMA_TYPE_EVENT) + throw QmfException("Cannot call raiseEvent on data that is not an Event"); + subject = subject + "." + schemaId.getPackageName() + "." + schemaId.getName(); + } + + if (severity < SEV_EMERG || severity > SEV_DEBUG) + throw QmfException("Invalid severity value"); headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_DATA_INDICATION; headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_EVENT; headers[protocol::HEADER_KEY_AGENT] = agentName; headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; - msg.setSubject("agent.ind.event"); - - encode(DataImplAccess::get(data).asMap(), msg); + msg.setSubject(subject); + + Variant::List list; + Variant::Map dataAsMap(DataImplAccess::get(data).asMap()); + dataAsMap["_severity"] = severity; + dataAsMap["_timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + list.push_back(dataAsMap); + encode(list, msg); topicSender.send(msg); - QPID_LOG(trace, "SENT EventIndication to=agent.ind.event"); + QPID_LOG(trace, "SENT EventIndication to=" << subject); } @@ -794,6 +842,17 @@ void AgentSessionImpl::dispatch(Message msg) const Variant::Map& properties(msg.getProperties()); Variant::Map::const_iterator iter; + // + // If strict-security is enabled, make sure that reply-to address complies with the + // strict-security addressing pattern (i.e. start with 'qmf.<domain>.topic/direct-console.'). + // + if (strictSecurity && msg.getReplyTo()) { + if (msg.getReplyTo().getName() != topicBase || msg.getReplyTo().getSubject().find("direct-console.") != 0) { + QPID_LOG(warning, "Reply-to violates strict-security policy: " << msg.getReplyTo().str()); + return; + } + } + iter = properties.find(protocol::HEADER_KEY_APP_ID); if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) { // @@ -892,6 +951,11 @@ void AgentSessionImpl::send(Message msg, const Address& to) { Sender sender; + if (strictSecurity && to.getName() != topicBase) { + QPID_LOG(warning, "Address violates strict-security policy: " << to); + return; + } + if (to.getName() == directBase) { msg.setSubject(to.getSubject()); sender = directSender; |