summaryrefslogtreecommitdiff
path: root/cpp/src/qmf/AgentSession.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2011-01-10 14:06:16 +0000
committerTed Ross <tross@apache.org>2011-01-10 14:06:16 +0000
commit120ea440ef9d048d3bb31e6118027f5c9e890fca (patch)
tree34e14880765c6b79c77c4fb24e834b8d67260149 /cpp/src/qmf/AgentSession.cpp
parent598e5eacac716a9a3a812e9cf72b14bde57ed45a (diff)
downloadqpid-python-120ea440ef9d048d3bb31e6118027f5c9e890fca.tar.gz
Updates to the C++ implementation of QMFv2:
1) Consolidated string constants for the protocol into a definition file. 2) Added hooks for subscription handling. 3) Added checks to validate properties and arguments against the schema (if there is a schema). git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1057199 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/AgentSession.cpp')
-rw-r--r--cpp/src/qmf/AgentSession.cpp190
1 files changed, 119 insertions, 71 deletions
diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp
index 20beda4851..3426167f87 100644
--- a/cpp/src/qmf/AgentSession.cpp
+++ b/cpp/src/qmf/AgentSession.cpp
@@ -30,6 +30,7 @@
#include "qmf/DataImpl.h"
#include "qmf/QueryImpl.h"
#include "qmf/agentCapability.h"
+#include "qmf/constants.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Condition.h"
#include "qpid/sys/Thread.h"
@@ -111,6 +112,10 @@ namespace qmf {
bool externalStorage;
bool autoAllowQueries;
bool autoAllowMethods;
+ uint32_t maxSubscriptions;
+ uint32_t minSubInterval;
+ uint32_t subLifetime;
+ bool publicEvents;
uint64_t schemaUpdateTime;
string directBase;
string topicBase;
@@ -129,6 +134,7 @@ namespace qmf {
void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&);
void dispatch(Message);
void sendHeartbeat();
+ void send(Message, const Address&);
void flushResponses(AgentEvent&, bool);
void periodicProcessing(uint64_t);
void run();
@@ -172,6 +178,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
connection(c), domain("default"), opened(false), thread(0), threadCanceled(false),
bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
+ maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
{
//
@@ -208,6 +215,22 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
iter = optMap.find("allow-methods");
if (iter != optMap.end())
autoAllowMethods = iter->second.asBool();
+
+ iter = optMap.find("max-subscriptions");
+ if (iter != optMap.end())
+ maxSubscriptions = iter->second.asUint32();
+
+ iter = optMap.find("min-sub-interval");
+ if (iter != optMap.end())
+ minSubInterval = iter->second.asUint32();
+
+ iter = optMap.find("sub-lifetime");
+ if (iter != optMap.end())
+ subLifetime = iter->second.asUint32();
+
+ iter = optMap.find("public-events");
+ if (iter != optMap.end())
+ publicEvents = iter->second.asBool();
}
}
@@ -421,20 +444,18 @@ void AgentSessionImpl::raiseException(AgentEvent& event, const Data& data)
Variant::Map map;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "response";
- headers["qmf.opcode"] = "_exception";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_EXCEPTION;
+ headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
const DataImpl& dataImpl(DataImplAccess::get(data));
msg.setCorrelationId(eventImpl.getCorrelationId());
encode(dataImpl.asMap(), msg);
- Sender sender(session.createSender(eventImpl.getReplyTo()));
- sender.send(msg);
- sender.close();
+ send(msg, eventImpl.getReplyTo());
QPID_LOG(trace, "SENT Exception to=" << eventImpl.getReplyTo());
}
@@ -461,10 +482,10 @@ void AgentSessionImpl::methodSuccess(AgentEvent& event)
Variant::Map map;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "response";
- headers["qmf.opcode"] = "_method_response";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
@@ -477,9 +498,7 @@ void AgentSessionImpl::methodSuccess(AgentEvent& event)
msg.setCorrelationId(eventImpl.getCorrelationId());
encode(map, msg);
- Sender sender(session.createSender(eventImpl.getReplyTo()));
- sender.send(msg);
- sender.close();
+ send(msg, eventImpl.getReplyTo());
QPID_LOG(trace, "SENT MethodResponse to=" << eventImpl.getReplyTo());
}
@@ -494,11 +513,11 @@ void AgentSessionImpl::raiseEvent(const Data& data)
// TODO: add severity.package.class to key
// or modify to send only to subscriptions with matching queries
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_event";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_DATA_INDICATION;
+ headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_EVENT;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
msg.setSubject("agent.ind.event");
encode(DataImplAccess::get(data).asMap(), msg);
@@ -573,22 +592,20 @@ void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const
Variant::Map map;
Variant::Map& headers(reply.getProperties());
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_agent_locate_response";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
map["_values"] = attributes;
- map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
- map["_values"].asMap()["heartbeat_interval"] = interval;
- map["_values"].asMap()["epoch"] = bootSequence;
- map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime;
+ map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
+ map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval;
+ map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence;
+ map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime;
encode(map, reply);
- Sender sender = session.createSender(msg.getReplyTo());
- sender.send(reply);
+ send(reply, msg.getReplyTo());
QPID_LOG(trace, "SENT AgentLocateResponse to=" << msg.getReplyTo());
- sender.close();
}
@@ -614,10 +631,6 @@ void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Me
}
eventImpl->setMethodName(iter->second.asString());
- iter = content.find("_object_id");
- if (iter != content.end())
- eventImpl->setDataAddr(DataAddr(new DataAddrImpl(iter->second.asMap())));
-
iter = content.find("_arguments");
if (iter != content.end())
eventImpl->setArguments(iter->second.asMap());
@@ -626,6 +639,29 @@ void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Me
if (iter != content.end())
eventImpl->setArgumentSubtypes(iter->second.asMap());
+ iter = content.find("_object_id");
+ if (iter != content.end()) {
+ DataAddr addr(new DataAddrImpl(iter->second.asMap()));
+ eventImpl->setDataAddr(addr);
+ DataIndex::const_iterator iter(globalIndex.find(addr));
+ if (iter == globalIndex.end()) {
+ AgentEvent event(eventImpl.release());
+ raiseException(event, "No data object found with the specified address");
+ return;
+ }
+
+ if (DataImplAccess::get(iter->second).getSchema().isValid())
+ for (Variant::Map::const_iterator aIter = eventImpl->getArguments().begin();
+ aIter != eventImpl->getArguments().end(); aIter++) {
+ const Schema& schema(DataImplAccess::get(iter->second).getSchema());
+ if (!SchemaImplAccess::get(schema).isValidMethodInArg(eventImpl->getMethodName(), aIter->first, aIter->second)) {
+ AgentEvent event(eventImpl.release());
+ raiseException(event, "Invalid argument: " + aIter->first);
+ return;
+ }
+ }
+ }
+
enqueueEvent(AgentEvent(eventImpl.release()));
}
@@ -668,19 +704,19 @@ void AgentSessionImpl::handleSchemaRequest(AgentEvent& event)
Variant::Map map;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "response";
- headers["qmf.opcode"] = "_query_response";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
{
qpid::sys::Mutex::ScopedLock l(lock);
if (query.getTarget() == QUERY_SCHEMA_ID) {
- headers["qmf.content"] = "_schema_id";
+ headers[protocol::HEADER_KEY_CONTENT] = "_schema_id";
for (iter = schemata.begin(); iter != schemata.end(); iter++)
content.push_back(SchemaIdImplAccess::get(iter->first).asMap());
} else if (query.getSchemaId().isValid()) {
- headers["qmf.content"] = "_schema";
+ headers[protocol::HEADER_KEY_CONTENT] = "_schema";
iter = schemata.find(query.getSchemaId());
if (iter != schemata.end())
content.push_back(SchemaImplAccess::get(iter->second).asMap());
@@ -698,9 +734,7 @@ void AgentSessionImpl::handleSchemaRequest(AgentEvent& event)
msg.setCorrelationId(eventImpl.getCorrelationId());
encode(content, msg);
- Sender sender(session.createSender(eventImpl.getReplyTo()));
- sender.send(msg);
- sender.close();
+ send(msg, eventImpl.getReplyTo());
QPID_LOG(trace, "SENT QueryResponse(Schema) to=" << eventImpl.getReplyTo());
}
@@ -744,13 +778,11 @@ void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, u
Message reply;
Variant::Map& headers(reply.getProperties());
- headers["qmf.agent"] = agentName;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
reply.setContent(replyContent);
- Sender sender = session.createSender(msg.getReplyTo());
- sender.send(reply);
+ send(reply, msg.getReplyTo());
QPID_LOG(trace, "SENT QMFv1 SchemaResponse to=" << msg.getReplyTo());
- sender.close();
}
@@ -759,12 +791,12 @@ void AgentSessionImpl::dispatch(Message msg)
const Variant::Map& properties(msg.getProperties());
Variant::Map::const_iterator iter;
- iter = properties.find("x-amqp-0-10.app-id");
- if (iter != properties.end() && iter->second.asString() == "qmf2") {
+ iter = properties.find(protocol::HEADER_KEY_APP_ID);
+ if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) {
//
// Dispatch a QMFv2 formatted message
//
- iter = properties.find("qmf.opcode");
+ iter = properties.find(protocol::HEADER_KEY_OPCODE);
if (iter == properties.end()) {
QPID_LOG(trace, "Message received with no 'qmf.opcode' header");
return;
@@ -776,7 +808,7 @@ void AgentSessionImpl::dispatch(Message msg)
Variant::List content;
decode(msg, content);
- if (opcode == "_agent_locate_request") handleLocateRequest(content, msg);
+ if (opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST) handleLocateRequest(content, msg);
else {
QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/list' content: " << opcode);
}
@@ -785,8 +817,8 @@ void AgentSessionImpl::dispatch(Message msg)
Variant::Map content;
decode(msg, content);
- if (opcode == "_method_request") handleMethodRequest(content, msg);
- else if (opcode == "_query_request") handleQueryRequest(content, msg);
+ if (opcode == protocol::HEADER_OPCODE_METHOD_REQUEST) handleMethodRequest(content, msg);
+ else if (opcode == protocol::HEADER_OPCODE_QUERY_REQUEST) handleQueryRequest(content, msg);
else {
QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/map' content: " << opcode);
}
@@ -835,17 +867,17 @@ void AgentSessionImpl::sendHeartbeat()
}
}
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_agent_heartbeat_indication";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
msg.setSubject(address.str());
map["_values"] = attributes;
- map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
- map["_values"].asMap()["heartbeat_interval"] = interval;
- map["_values"].asMap()["epoch"] = bootSequence;
- map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime;
+ map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
+ map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval;
+ map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence;
+ map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime;
encode(map, msg);
topicSender.send(msg);
@@ -853,19 +885,36 @@ void AgentSessionImpl::sendHeartbeat()
}
+void AgentSessionImpl::send(Message msg, const Address& to)
+{
+ Sender sender;
+
+ if (to.getName() == directBase) {
+ msg.setSubject(to.getSubject());
+ sender = directSender;
+ } else if (to.getName() == topicBase) {
+ msg.setSubject(to.getSubject());
+ sender = topicSender;
+ } else
+ sender = session.createSender(to);
+
+ sender.send(msg);
+}
+
+
void AgentSessionImpl::flushResponses(AgentEvent& event, bool final)
{
Message msg;
Variant::Map map;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "response";
- headers["qmf.opcode"] = "_query_response";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE;
+ headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
if (!final)
- headers["partial"] = Variant();
+ headers[protocol::HEADER_KEY_PARTIAL] = Variant();
Variant::List body;
AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
@@ -878,9 +927,7 @@ void AgentSessionImpl::flushResponses(AgentEvent& event, bool final)
msg.setCorrelationId(eventImpl.getCorrelationId());
encode(body, msg);
- Sender sender(session.createSender(eventImpl.getReplyTo()));
- sender.send(msg);
- sender.close();
+ send(msg, eventImpl.getReplyTo());
QPID_LOG(trace, "SENT QueryResponse to=" << eventImpl.getReplyTo());
}
@@ -894,6 +941,7 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds)
//
if (seconds == lastVisit)
return;
+ //uint64_t thisInterval(seconds - lastVisit);
lastVisit = seconds;
//