diff options
author | Ted Ross <tross@apache.org> | 2011-01-10 14:06:16 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2011-01-10 14:06:16 +0000 |
commit | 120ea440ef9d048d3bb31e6118027f5c9e890fca (patch) | |
tree | 34e14880765c6b79c77c4fb24e834b8d67260149 /cpp/src/qmf/AgentSession.cpp | |
parent | 598e5eacac716a9a3a812e9cf72b14bde57ed45a (diff) | |
download | qpid-python-120ea440ef9d048d3bb31e6118027f5c9e890fca.tar.gz |
Updates to the C++ implementation of QMFv2:
1) Consolidated string constants for the protocol into a definition file.
2) Added hooks for subscription handling.
3) Added checks to validate properties and arguments against the schema (if there is a schema).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1057199 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/AgentSession.cpp')
-rw-r--r-- | cpp/src/qmf/AgentSession.cpp | 190 |
1 files changed, 119 insertions, 71 deletions
diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp index 20beda4851..3426167f87 100644 --- a/cpp/src/qmf/AgentSession.cpp +++ b/cpp/src/qmf/AgentSession.cpp @@ -30,6 +30,7 @@ #include "qmf/DataImpl.h" #include "qmf/QueryImpl.h" #include "qmf/agentCapability.h" +#include "qmf/constants.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Condition.h" #include "qpid/sys/Thread.h" @@ -111,6 +112,10 @@ namespace qmf { bool externalStorage; bool autoAllowQueries; bool autoAllowMethods; + uint32_t maxSubscriptions; + uint32_t minSubInterval; + uint32_t subLifetime; + bool publicEvents; uint64_t schemaUpdateTime; string directBase; string topicBase; @@ -129,6 +134,7 @@ namespace qmf { void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); void dispatch(Message); void sendHeartbeat(); + void send(Message, const Address&); void flushResponses(AgentEvent&, bool); void periodicProcessing(uint64_t); void run(); @@ -172,6 +178,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false), externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), + maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true), schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) { // @@ -208,6 +215,22 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : iter = optMap.find("allow-methods"); if (iter != optMap.end()) autoAllowMethods = iter->second.asBool(); + + iter = optMap.find("max-subscriptions"); + if (iter != optMap.end()) + maxSubscriptions = iter->second.asUint32(); + + iter = optMap.find("min-sub-interval"); + if (iter != optMap.end()) + minSubInterval = iter->second.asUint32(); + + iter = optMap.find("sub-lifetime"); + if (iter != optMap.end()) + subLifetime = iter->second.asUint32(); + + iter = optMap.find("public-events"); + if (iter != optMap.end()) + publicEvents = iter->second.asBool(); } } @@ -421,20 +444,18 @@ void AgentSessionImpl::raiseException(AgentEvent& event, const Data& data) Variant::Map map; Variant::Map& headers(msg.getProperties()); - headers["method"] = "response"; - headers["qmf.opcode"] = "_exception"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_EXCEPTION; + headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); const DataImpl& dataImpl(DataImplAccess::get(data)); msg.setCorrelationId(eventImpl.getCorrelationId()); encode(dataImpl.asMap(), msg); - Sender sender(session.createSender(eventImpl.getReplyTo())); - sender.send(msg); - sender.close(); + send(msg, eventImpl.getReplyTo()); QPID_LOG(trace, "SENT Exception to=" << eventImpl.getReplyTo()); } @@ -461,10 +482,10 @@ void AgentSessionImpl::methodSuccess(AgentEvent& event) Variant::Map map; Variant::Map& headers(msg.getProperties()); - headers["method"] = "response"; - headers["qmf.opcode"] = "_method_response"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); @@ -477,9 +498,7 @@ void AgentSessionImpl::methodSuccess(AgentEvent& event) msg.setCorrelationId(eventImpl.getCorrelationId()); encode(map, msg); - Sender sender(session.createSender(eventImpl.getReplyTo())); - sender.send(msg); - sender.close(); + send(msg, eventImpl.getReplyTo()); QPID_LOG(trace, "SENT MethodResponse to=" << eventImpl.getReplyTo()); } @@ -494,11 +513,11 @@ void AgentSessionImpl::raiseEvent(const Data& data) // TODO: add severity.package.class to key // or modify to send only to subscriptions with matching queries - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_event"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_DATA_INDICATION; + headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_EVENT; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setSubject("agent.ind.event"); encode(DataImplAccess::get(data).asMap(), msg); @@ -573,22 +592,20 @@ void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Variant::Map map; Variant::Map& headers(reply.getProperties()); - headers["method"] = "indication"; - headers["qmf.opcode"] = "_agent_locate_response"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; map["_values"] = attributes; - map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); - map["_values"].asMap()["heartbeat_interval"] = interval; - map["_values"].asMap()["epoch"] = bootSequence; - map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime; + map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval; + map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence; + map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime; encode(map, reply); - Sender sender = session.createSender(msg.getReplyTo()); - sender.send(reply); + send(reply, msg.getReplyTo()); QPID_LOG(trace, "SENT AgentLocateResponse to=" << msg.getReplyTo()); - sender.close(); } @@ -614,10 +631,6 @@ void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Me } eventImpl->setMethodName(iter->second.asString()); - iter = content.find("_object_id"); - if (iter != content.end()) - eventImpl->setDataAddr(DataAddr(new DataAddrImpl(iter->second.asMap()))); - iter = content.find("_arguments"); if (iter != content.end()) eventImpl->setArguments(iter->second.asMap()); @@ -626,6 +639,29 @@ void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Me if (iter != content.end()) eventImpl->setArgumentSubtypes(iter->second.asMap()); + iter = content.find("_object_id"); + if (iter != content.end()) { + DataAddr addr(new DataAddrImpl(iter->second.asMap())); + eventImpl->setDataAddr(addr); + DataIndex::const_iterator iter(globalIndex.find(addr)); + if (iter == globalIndex.end()) { + AgentEvent event(eventImpl.release()); + raiseException(event, "No data object found with the specified address"); + return; + } + + if (DataImplAccess::get(iter->second).getSchema().isValid()) + for (Variant::Map::const_iterator aIter = eventImpl->getArguments().begin(); + aIter != eventImpl->getArguments().end(); aIter++) { + const Schema& schema(DataImplAccess::get(iter->second).getSchema()); + if (!SchemaImplAccess::get(schema).isValidMethodInArg(eventImpl->getMethodName(), aIter->first, aIter->second)) { + AgentEvent event(eventImpl.release()); + raiseException(event, "Invalid argument: " + aIter->first); + return; + } + } + } + enqueueEvent(AgentEvent(eventImpl.release())); } @@ -668,19 +704,19 @@ void AgentSessionImpl::handleSchemaRequest(AgentEvent& event) Variant::Map map; Variant::Map& headers(msg.getProperties()); - headers["method"] = "response"; - headers["qmf.opcode"] = "_query_response"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; { qpid::sys::Mutex::ScopedLock l(lock); if (query.getTarget() == QUERY_SCHEMA_ID) { - headers["qmf.content"] = "_schema_id"; + headers[protocol::HEADER_KEY_CONTENT] = "_schema_id"; for (iter = schemata.begin(); iter != schemata.end(); iter++) content.push_back(SchemaIdImplAccess::get(iter->first).asMap()); } else if (query.getSchemaId().isValid()) { - headers["qmf.content"] = "_schema"; + headers[protocol::HEADER_KEY_CONTENT] = "_schema"; iter = schemata.find(query.getSchemaId()); if (iter != schemata.end()) content.push_back(SchemaImplAccess::get(iter->second).asMap()); @@ -698,9 +734,7 @@ void AgentSessionImpl::handleSchemaRequest(AgentEvent& event) msg.setCorrelationId(eventImpl.getCorrelationId()); encode(content, msg); - Sender sender(session.createSender(eventImpl.getReplyTo())); - sender.send(msg); - sender.close(); + send(msg, eventImpl.getReplyTo()); QPID_LOG(trace, "SENT QueryResponse(Schema) to=" << eventImpl.getReplyTo()); } @@ -744,13 +778,11 @@ void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, u Message reply; Variant::Map& headers(reply.getProperties()); - headers["qmf.agent"] = agentName; + headers[protocol::HEADER_KEY_AGENT] = agentName; reply.setContent(replyContent); - Sender sender = session.createSender(msg.getReplyTo()); - sender.send(reply); + send(reply, msg.getReplyTo()); QPID_LOG(trace, "SENT QMFv1 SchemaResponse to=" << msg.getReplyTo()); - sender.close(); } @@ -759,12 +791,12 @@ void AgentSessionImpl::dispatch(Message msg) const Variant::Map& properties(msg.getProperties()); Variant::Map::const_iterator iter; - iter = properties.find("x-amqp-0-10.app-id"); - if (iter != properties.end() && iter->second.asString() == "qmf2") { + iter = properties.find(protocol::HEADER_KEY_APP_ID); + if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) { // // Dispatch a QMFv2 formatted message // - iter = properties.find("qmf.opcode"); + iter = properties.find(protocol::HEADER_KEY_OPCODE); if (iter == properties.end()) { QPID_LOG(trace, "Message received with no 'qmf.opcode' header"); return; @@ -776,7 +808,7 @@ void AgentSessionImpl::dispatch(Message msg) Variant::List content; decode(msg, content); - if (opcode == "_agent_locate_request") handleLocateRequest(content, msg); + if (opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST) handleLocateRequest(content, msg); else { QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/list' content: " << opcode); } @@ -785,8 +817,8 @@ void AgentSessionImpl::dispatch(Message msg) Variant::Map content; decode(msg, content); - if (opcode == "_method_request") handleMethodRequest(content, msg); - else if (opcode == "_query_request") handleQueryRequest(content, msg); + if (opcode == protocol::HEADER_OPCODE_METHOD_REQUEST) handleMethodRequest(content, msg); + else if (opcode == protocol::HEADER_OPCODE_QUERY_REQUEST) handleQueryRequest(content, msg); else { QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/map' content: " << opcode); } @@ -835,17 +867,17 @@ void AgentSessionImpl::sendHeartbeat() } } - headers["method"] = "indication"; - headers["qmf.opcode"] = "_agent_heartbeat_indication"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setSubject(address.str()); map["_values"] = attributes; - map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); - map["_values"].asMap()["heartbeat_interval"] = interval; - map["_values"].asMap()["epoch"] = bootSequence; - map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime; + map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval; + map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence; + map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime; encode(map, msg); topicSender.send(msg); @@ -853,19 +885,36 @@ void AgentSessionImpl::sendHeartbeat() } +void AgentSessionImpl::send(Message msg, const Address& to) +{ + Sender sender; + + if (to.getName() == directBase) { + msg.setSubject(to.getSubject()); + sender = directSender; + } else if (to.getName() == topicBase) { + msg.setSubject(to.getSubject()); + sender = topicSender; + } else + sender = session.createSender(to); + + sender.send(msg); +} + + void AgentSessionImpl::flushResponses(AgentEvent& event, bool final) { Message msg; Variant::Map map; Variant::Map& headers(msg.getProperties()); - headers["method"] = "response"; - headers["qmf.opcode"] = "_query_response"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE; + headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; if (!final) - headers["partial"] = Variant(); + headers[protocol::HEADER_KEY_PARTIAL] = Variant(); Variant::List body; AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); @@ -878,9 +927,7 @@ void AgentSessionImpl::flushResponses(AgentEvent& event, bool final) msg.setCorrelationId(eventImpl.getCorrelationId()); encode(body, msg); - Sender sender(session.createSender(eventImpl.getReplyTo())); - sender.send(msg); - sender.close(); + send(msg, eventImpl.getReplyTo()); QPID_LOG(trace, "SENT QueryResponse to=" << eventImpl.getReplyTo()); } @@ -894,6 +941,7 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds) // if (seconds == lastVisit) return; + //uint64_t thisInterval(seconds - lastVisit); lastVisit = seconds; // |