diff options
| author | Ted Ross <tross@apache.org> | 2011-01-10 14:06:16 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2011-01-10 14:06:16 +0000 |
| commit | 120ea440ef9d048d3bb31e6118027f5c9e890fca (patch) | |
| tree | 34e14880765c6b79c77c4fb24e834b8d67260149 /cpp | |
| parent | 598e5eacac716a9a3a812e9cf72b14bde57ed45a (diff) | |
| download | qpid-python-120ea440ef9d048d3bb31e6118027f5c9e890fca.tar.gz | |
Updates to the C++ implementation of QMFv2:
1) Consolidated string constants for the protocol into a definition file.
2) Added hooks for subscription handling.
3) Added checks to validate properties and arguments against the schema (if there is a schema).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1057199 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/include/qmf/Agent.h | 7 | ||||
| -rw-r--r-- | cpp/include/qmf/AgentSession.h | 9 | ||||
| -rw-r--r-- | cpp/include/qmf/ConsoleEvent.h | 6 | ||||
| -rw-r--r-- | cpp/include/qmf/ConsoleSession.h | 11 | ||||
| -rw-r--r-- | cpp/include/qmf/Data.h | 4 | ||||
| -rw-r--r-- | cpp/include/qmf/Subscription.h | 82 | ||||
| -rw-r--r-- | cpp/src/qmf.mk | 11 | ||||
| -rw-r--r-- | cpp/src/qmf/Agent.cpp | 13 | ||||
| -rw-r--r-- | cpp/src/qmf/AgentSession.cpp | 190 | ||||
| -rw-r--r-- | cpp/src/qmf/AgentSubscription.cpp | 51 | ||||
| -rw-r--r-- | cpp/src/qmf/AgentSubscription.h | 52 | ||||
| -rw-r--r-- | cpp/src/qmf/ConsoleSession.cpp | 66 | ||||
| -rw-r--r-- | cpp/src/qmf/ConsoleSessionImpl.h | 2 | ||||
| -rw-r--r-- | cpp/src/qmf/Data.cpp | 19 | ||||
| -rw-r--r-- | cpp/src/qmf/DataImpl.h | 22 | ||||
| -rw-r--r-- | cpp/src/qmf/Schema.cpp | 100 | ||||
| -rw-r--r-- | cpp/src/qmf/SchemaImpl.h | 4 | ||||
| -rw-r--r-- | cpp/src/qmf/Subscription.cpp | 88 | ||||
| -rw-r--r-- | cpp/src/qmf/SubscriptionImpl.h | 57 | ||||
| -rw-r--r-- | cpp/src/qmf/constants.cpp | 77 | ||||
| -rw-r--r-- | cpp/src/qmf/constants.h | 83 |
21 files changed, 841 insertions, 113 deletions
diff --git a/cpp/include/qmf/Agent.h b/cpp/include/qmf/Agent.h index 8d427ab2fb..8c0f48b8b1 100644 --- a/cpp/include/qmf/Agent.h +++ b/cpp/include/qmf/Agent.h @@ -23,6 +23,7 @@ #include <qmf/ImportExport.h> #include "qmf/Handle.h" +//#include "qmf/Subscription.h" #include "qmf/exceptions.h" #include "qpid/messaging/Duration.h" #include "qpid/types/Variant.h" @@ -61,6 +62,12 @@ namespace qmf { QMF_EXTERN uint32_t queryAsync(const Query&); QMF_EXTERN uint32_t queryAsync(const std::string&); + /** + * Create a subscription to this agent + */ + //QMF_EXTERN Subscription subscribe(const Query&, const std::string& options = ""); + //QMF_EXTERN Subscription subscribe(const std::string&, const std::string& options = ""); + QMF_EXTERN ConsoleEvent callMethod(const std::string&, const qpid::types::Variant::Map&, const DataAddr&, qpid::messaging::Duration timeout=qpid::messaging::Duration::MINUTE); QMF_EXTERN uint32_t callMethodAsync(const std::string&, const qpid::types::Variant::Map&, const DataAddr&); diff --git a/cpp/include/qmf/AgentSession.h b/cpp/include/qmf/AgentSession.h index 4ac2b2f3ed..090017779f 100644 --- a/cpp/include/qmf/AgentSession.h +++ b/cpp/include/qmf/AgentSession.h @@ -57,11 +57,16 @@ namespace qmf { * The options string is of the form "{key:value,key:value}". The following keys are supported: * * interval:N - Heartbeat interval in seconds [default: 60] - * external:{True,False} - Use external data storage (queries are pass-through) [default: False] + * external:{True,False} - Use external data storage (queries and subscriptions are pass-through) [default: False] * allow-queries:{True,False} - If True: automatically allow all queries [default] * If False: generate an AUTH_QUERY event to allow per-query authorization * allow-methods:{True,False} - If True: automatically allow all methods [default] * If False: generate an AUTH_METHOD event to allow per-method authorization + * max-subscriptions:N - Maximum number of concurrent subscription queries permitted [default: 64] + * min-sub-interval:N - Minimum publish interval (in milliseconds) permitted for a subscription [default: 3000] + * sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300] + * public-events:{True,False} - If True: QMF events are sent to the topic exchange [default] + * If False: QMF events are only sent to authorized subscribers */ QMF_EXTERN AgentSession(qpid::messaging::Connection&, const std::string& options=""); @@ -143,7 +148,7 @@ namespace qmf { * authReject - Reject/forbid an authorization request. * raiseException - indicate failure of an operation (i.e. query or method call). * response - Provide data in response to a query (only for option: external:True) - * complete - Indicate that the response to a query is complete (external:true only) + * complete - Indicate that the response to a query is complete (external:True only) * methodSuccess - Indicate the successful completion of a method call. */ QMF_EXTERN void authAccept(AgentEvent&); diff --git a/cpp/include/qmf/ConsoleEvent.h b/cpp/include/qmf/ConsoleEvent.h index 3e75631a61..54272334a4 100644 --- a/cpp/include/qmf/ConsoleEvent.h +++ b/cpp/include/qmf/ConsoleEvent.h @@ -46,8 +46,10 @@ namespace qmf { CONSOLE_QUERY_RESPONSE = 7, CONSOLE_METHOD_RESPONSE = 8, CONSOLE_EXCEPTION = 9, - CONSOLE_SUBSCRIBE_UPDATE = 10, - CONSOLE_THREAD_FAILED = 11 + CONSOLE_SUBSCRIBE_ADD = 10, + CONSOLE_SUBSCRIBE_UPDATE = 11, + CONSOLE_SUBSCRIBE_DEL = 12, + CONSOLE_THREAD_FAILED = 13 }; enum AgentDelReason { diff --git a/cpp/include/qmf/ConsoleSession.h b/cpp/include/qmf/ConsoleSession.h index c17f4510f1..ba8b3de92f 100644 --- a/cpp/include/qmf/ConsoleSession.h +++ b/cpp/include/qmf/ConsoleSession.h @@ -24,6 +24,7 @@ #include <qmf/ImportExport.h> #include "qmf/Handle.h" #include "qmf/Agent.h" +#include "qmf/Subscription.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/Connection.h" #include <string> @@ -67,6 +68,16 @@ namespace qmf { QMF_EXTERN Agent getAgent(uint32_t) const; QMF_EXTERN Agent getConnectedBrokerAgent() const; + /** + * Create a subscription that involves a subset of the known agents. The set of known agents is defined by + * the session's agent-filter (see setAgentFilter). The agentFilter argument to the subscribe method is used + * to further refine the set of agents. If agentFilter is the empty string (i.e. match-all) the subscription + * will involve all known agents. If agentFilter is non-empty, it will be applied only to the set of known + * agents. A subscription cannot be created that involves an agent not known by the session. + */ + QMF_EXTERN Subscription subscribe(const Query&, const std::string& agentFilter = "", const std::string& options = ""); + QMF_EXTERN Subscription subscribe(const std::string&, const std::string& agentFilter = "", const std::string& options = ""); + #ifndef SWIG private: friend class qmf::PrivateImplRef<ConsoleSession>; diff --git a/cpp/include/qmf/Data.h b/cpp/include/qmf/Data.h index 27af1c4b04..82f1569a0b 100644 --- a/cpp/include/qmf/Data.h +++ b/cpp/include/qmf/Data.h @@ -34,6 +34,7 @@ namespace qmf { #endif class DataImpl; + class Schema; class SchemaId; class DataAddr; class Agent; @@ -45,8 +46,7 @@ namespace qmf { QMF_EXTERN Data& operator=(const Data&); QMF_EXTERN ~Data(); - QMF_EXTERN Data(const SchemaId&); - QMF_EXTERN void setSchema(const SchemaId&); + QMF_EXTERN Data(const Schema&); QMF_EXTERN void setAddr(const DataAddr&); QMF_EXTERN void setProperty(const std::string&, const qpid::types::Variant&); QMF_EXTERN void overwriteProperties(const qpid::types::Variant::Map&); diff --git a/cpp/include/qmf/Subscription.h b/cpp/include/qmf/Subscription.h new file mode 100644 index 0000000000..4e60eb984e --- /dev/null +++ b/cpp/include/qmf/Subscription.h @@ -0,0 +1,82 @@ +#ifndef QMF_SUBSCRIPTION_H +#define QMF_SUBSCRIPTION_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qmf/ImportExport.h> +#include "qmf/Handle.h" +#include "qpid/types/Variant.h" +#include <string> + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class SubscriptionImpl; + class Data; + + class Subscription : public qmf::Handle<SubscriptionImpl> { + public: + QMF_EXTERN Subscription(SubscriptionImpl* impl = 0); + QMF_EXTERN Subscription(const Subscription&); + QMF_EXTERN Subscription& operator=(const Subscription&); + QMF_EXTERN ~Subscription(); + + /** + * Construction: A subscription is created by calling ConsoleSession::subscribe. + */ + + /** + * Cancel subscriptions to all subscribed agents. After this is called, the subscription + * shall be inactive. + */ + QMF_EXTERN void cancel(); + + /** + * Check to see if this subscription is active. It is active if it has a live subscription + * on at least one agent. If it is not active, there is nothing that can be done to make it + * active, it can only be deleted. + */ + QMF_EXTERN bool isActive() const; + + /** + * lock and unlock should be used to bracket a traversal of the data set. After lock is called, + * the subscription will not change its set of available data objects. Between calls to getDataCount + * and getData, no data objects will be added or removed. After unlock is called, the set of data + * will catch up to any activity that occurred while the lock was in effect. + */ + QMF_EXTERN void lock(); + QMF_EXTERN void unlock(); + QMF_EXTERN uint32_t getDataCount() const; + QMF_EXTERN Data getData(uint32_t) const; + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<Subscription>; + friend class SubscriptionImplAccess; +#endif + }; + +} + +#endif diff --git a/cpp/src/qmf.mk b/cpp/src/qmf.mk index d0a186e89f..f3462f1a93 100644 --- a/cpp/src/qmf.mk +++ b/cpp/src/qmf.mk @@ -51,7 +51,8 @@ QMF2_API = \ ../include/qmf/SchemaId.h \ ../include/qmf/SchemaMethod.h \ ../include/qmf/SchemaProperty.h \ - ../include/qmf/SchemaTypes.h + ../include/qmf/SchemaTypes.h \ + ../include/qmf/Subscription.h # @@ -91,10 +92,14 @@ libqmf2_la_SOURCES = \ qmf/AgentEventImpl.h \ qmf/AgentImpl.h \ qmf/AgentSession.cpp \ + qmf/AgentSubscription.cpp \ + qmf/AgentSubscription.h \ qmf/ConsoleEvent.cpp \ qmf/ConsoleEventImpl.h \ qmf/ConsoleSession.cpp \ qmf/ConsoleSessionImpl.h \ + qmf/constants.cpp \ + qmf/constants.h \ qmf/DataAddr.cpp \ qmf/DataAddrImpl.h \ qmf/Data.cpp \ @@ -116,7 +121,9 @@ libqmf2_la_SOURCES = \ qmf/SchemaMethod.cpp \ qmf/SchemaMethodImpl.h \ qmf/SchemaProperty.cpp \ - qmf/SchemaPropertyImpl.h + qmf/SchemaPropertyImpl.h \ + qmf/Subscription.cpp \ + qmf/SubscriptionImpl.h libqmfengine_la_SOURCES = \ $(QMF_ENGINE_API) \ diff --git a/cpp/src/qmf/Agent.cpp b/cpp/src/qmf/Agent.cpp index 05bf1a38aa..3a385b3741 100644 --- a/cpp/src/qmf/Agent.cpp +++ b/cpp/src/qmf/Agent.cpp @@ -27,6 +27,7 @@ #include "qmf/Query.h" #include "qmf/SchemaImpl.h" #include "qmf/agentCapability.h" +#include "qmf/constants.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/AddressParser.h" #include "qpid/management/Buffer.h" @@ -507,9 +508,9 @@ void AgentImpl::sendQuery(const Query& query, uint32_t correlator) Variant::Map map; Variant::Map& headers(msg.getProperties()); - headers["method"] = "request"; - headers["qmf.opcode"] = "_query_request"; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_REQUEST; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setReplyTo(session.replyAddress); msg.setCorrelationId(boost::lexical_cast<string>(correlator)); @@ -527,9 +528,9 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const Variant::Map map; Variant::Map& headers(msg.getProperties()); - headers["method"] = "request"; - headers["qmf.opcode"] = "_method_request"; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_METHOD_REQUEST; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; map["_method_name"] = method; map["_object_id"] = addr.asMap(); diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp index 20beda4851..3426167f87 100644 --- a/cpp/src/qmf/AgentSession.cpp +++ b/cpp/src/qmf/AgentSession.cpp @@ -30,6 +30,7 @@ #include "qmf/DataImpl.h" #include "qmf/QueryImpl.h" #include "qmf/agentCapability.h" +#include "qmf/constants.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Condition.h" #include "qpid/sys/Thread.h" @@ -111,6 +112,10 @@ namespace qmf { bool externalStorage; bool autoAllowQueries; bool autoAllowMethods; + uint32_t maxSubscriptions; + uint32_t minSubInterval; + uint32_t subLifetime; + bool publicEvents; uint64_t schemaUpdateTime; string directBase; string topicBase; @@ -129,6 +134,7 @@ namespace qmf { void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); void dispatch(Message); void sendHeartbeat(); + void send(Message, const Address&); void flushResponses(AgentEvent&, bool); void periodicProcessing(uint64_t); void run(); @@ -172,6 +178,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false), externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), + maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true), schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) { // @@ -208,6 +215,22 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : iter = optMap.find("allow-methods"); if (iter != optMap.end()) autoAllowMethods = iter->second.asBool(); + + iter = optMap.find("max-subscriptions"); + if (iter != optMap.end()) + maxSubscriptions = iter->second.asUint32(); + + iter = optMap.find("min-sub-interval"); + if (iter != optMap.end()) + minSubInterval = iter->second.asUint32(); + + iter = optMap.find("sub-lifetime"); + if (iter != optMap.end()) + subLifetime = iter->second.asUint32(); + + iter = optMap.find("public-events"); + if (iter != optMap.end()) + publicEvents = iter->second.asBool(); } } @@ -421,20 +444,18 @@ void AgentSessionImpl::raiseException(AgentEvent& event, const Data& data) Variant::Map map; Variant::Map& headers(msg.getProperties()); - headers["method"] = "response"; - headers["qmf.opcode"] = "_exception"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_EXCEPTION; + headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); const DataImpl& dataImpl(DataImplAccess::get(data)); msg.setCorrelationId(eventImpl.getCorrelationId()); encode(dataImpl.asMap(), msg); - Sender sender(session.createSender(eventImpl.getReplyTo())); - sender.send(msg); - sender.close(); + send(msg, eventImpl.getReplyTo()); QPID_LOG(trace, "SENT Exception to=" << eventImpl.getReplyTo()); } @@ -461,10 +482,10 @@ void AgentSessionImpl::methodSuccess(AgentEvent& event) Variant::Map map; Variant::Map& headers(msg.getProperties()); - headers["method"] = "response"; - headers["qmf.opcode"] = "_method_response"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); @@ -477,9 +498,7 @@ void AgentSessionImpl::methodSuccess(AgentEvent& event) msg.setCorrelationId(eventImpl.getCorrelationId()); encode(map, msg); - Sender sender(session.createSender(eventImpl.getReplyTo())); - sender.send(msg); - sender.close(); + send(msg, eventImpl.getReplyTo()); QPID_LOG(trace, "SENT MethodResponse to=" << eventImpl.getReplyTo()); } @@ -494,11 +513,11 @@ void AgentSessionImpl::raiseEvent(const Data& data) // TODO: add severity.package.class to key // or modify to send only to subscriptions with matching queries - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_event"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_DATA_INDICATION; + headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_EVENT; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setSubject("agent.ind.event"); encode(DataImplAccess::get(data).asMap(), msg); @@ -573,22 +592,20 @@ void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Variant::Map map; Variant::Map& headers(reply.getProperties()); - headers["method"] = "indication"; - headers["qmf.opcode"] = "_agent_locate_response"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; map["_values"] = attributes; - map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); - map["_values"].asMap()["heartbeat_interval"] = interval; - map["_values"].asMap()["epoch"] = bootSequence; - map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime; + map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval; + map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence; + map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime; encode(map, reply); - Sender sender = session.createSender(msg.getReplyTo()); - sender.send(reply); + send(reply, msg.getReplyTo()); QPID_LOG(trace, "SENT AgentLocateResponse to=" << msg.getReplyTo()); - sender.close(); } @@ -614,10 +631,6 @@ void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Me } eventImpl->setMethodName(iter->second.asString()); - iter = content.find("_object_id"); - if (iter != content.end()) - eventImpl->setDataAddr(DataAddr(new DataAddrImpl(iter->second.asMap()))); - iter = content.find("_arguments"); if (iter != content.end()) eventImpl->setArguments(iter->second.asMap()); @@ -626,6 +639,29 @@ void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Me if (iter != content.end()) eventImpl->setArgumentSubtypes(iter->second.asMap()); + iter = content.find("_object_id"); + if (iter != content.end()) { + DataAddr addr(new DataAddrImpl(iter->second.asMap())); + eventImpl->setDataAddr(addr); + DataIndex::const_iterator iter(globalIndex.find(addr)); + if (iter == globalIndex.end()) { + AgentEvent event(eventImpl.release()); + raiseException(event, "No data object found with the specified address"); + return; + } + + if (DataImplAccess::get(iter->second).getSchema().isValid()) + for (Variant::Map::const_iterator aIter = eventImpl->getArguments().begin(); + aIter != eventImpl->getArguments().end(); aIter++) { + const Schema& schema(DataImplAccess::get(iter->second).getSchema()); + if (!SchemaImplAccess::get(schema).isValidMethodInArg(eventImpl->getMethodName(), aIter->first, aIter->second)) { + AgentEvent event(eventImpl.release()); + raiseException(event, "Invalid argument: " + aIter->first); + return; + } + } + } + enqueueEvent(AgentEvent(eventImpl.release())); } @@ -668,19 +704,19 @@ void AgentSessionImpl::handleSchemaRequest(AgentEvent& event) Variant::Map map; Variant::Map& headers(msg.getProperties()); - headers["method"] = "response"; - headers["qmf.opcode"] = "_query_response"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; { qpid::sys::Mutex::ScopedLock l(lock); if (query.getTarget() == QUERY_SCHEMA_ID) { - headers["qmf.content"] = "_schema_id"; + headers[protocol::HEADER_KEY_CONTENT] = "_schema_id"; for (iter = schemata.begin(); iter != schemata.end(); iter++) content.push_back(SchemaIdImplAccess::get(iter->first).asMap()); } else if (query.getSchemaId().isValid()) { - headers["qmf.content"] = "_schema"; + headers[protocol::HEADER_KEY_CONTENT] = "_schema"; iter = schemata.find(query.getSchemaId()); if (iter != schemata.end()) content.push_back(SchemaImplAccess::get(iter->second).asMap()); @@ -698,9 +734,7 @@ void AgentSessionImpl::handleSchemaRequest(AgentEvent& event) msg.setCorrelationId(eventImpl.getCorrelationId()); encode(content, msg); - Sender sender(session.createSender(eventImpl.getReplyTo())); - sender.send(msg); - sender.close(); + send(msg, eventImpl.getReplyTo()); QPID_LOG(trace, "SENT QueryResponse(Schema) to=" << eventImpl.getReplyTo()); } @@ -744,13 +778,11 @@ void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, u Message reply; Variant::Map& headers(reply.getProperties()); - headers["qmf.agent"] = agentName; + headers[protocol::HEADER_KEY_AGENT] = agentName; reply.setContent(replyContent); - Sender sender = session.createSender(msg.getReplyTo()); - sender.send(reply); + send(reply, msg.getReplyTo()); QPID_LOG(trace, "SENT QMFv1 SchemaResponse to=" << msg.getReplyTo()); - sender.close(); } @@ -759,12 +791,12 @@ void AgentSessionImpl::dispatch(Message msg) const Variant::Map& properties(msg.getProperties()); Variant::Map::const_iterator iter; - iter = properties.find("x-amqp-0-10.app-id"); - if (iter != properties.end() && iter->second.asString() == "qmf2") { + iter = properties.find(protocol::HEADER_KEY_APP_ID); + if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) { // // Dispatch a QMFv2 formatted message // - iter = properties.find("qmf.opcode"); + iter = properties.find(protocol::HEADER_KEY_OPCODE); if (iter == properties.end()) { QPID_LOG(trace, "Message received with no 'qmf.opcode' header"); return; @@ -776,7 +808,7 @@ void AgentSessionImpl::dispatch(Message msg) Variant::List content; decode(msg, content); - if (opcode == "_agent_locate_request") handleLocateRequest(content, msg); + if (opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST) handleLocateRequest(content, msg); else { QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/list' content: " << opcode); } @@ -785,8 +817,8 @@ void AgentSessionImpl::dispatch(Message msg) Variant::Map content; decode(msg, content); - if (opcode == "_method_request") handleMethodRequest(content, msg); - else if (opcode == "_query_request") handleQueryRequest(content, msg); + if (opcode == protocol::HEADER_OPCODE_METHOD_REQUEST) handleMethodRequest(content, msg); + else if (opcode == protocol::HEADER_OPCODE_QUERY_REQUEST) handleQueryRequest(content, msg); else { QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/map' content: " << opcode); } @@ -835,17 +867,17 @@ void AgentSessionImpl::sendHeartbeat() } } - headers["method"] = "indication"; - headers["qmf.opcode"] = "_agent_heartbeat_indication"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setSubject(address.str()); map["_values"] = attributes; - map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); - map["_values"].asMap()["heartbeat_interval"] = interval; - map["_values"].asMap()["epoch"] = bootSequence; - map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime; + map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval; + map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence; + map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime; encode(map, msg); topicSender.send(msg); @@ -853,19 +885,36 @@ void AgentSessionImpl::sendHeartbeat() } +void AgentSessionImpl::send(Message msg, const Address& to) +{ + Sender sender; + + if (to.getName() == directBase) { + msg.setSubject(to.getSubject()); + sender = directSender; + } else if (to.getName() == topicBase) { + msg.setSubject(to.getSubject()); + sender = topicSender; + } else + sender = session.createSender(to); + + sender.send(msg); +} + + void AgentSessionImpl::flushResponses(AgentEvent& event, bool final) { Message msg; Variant::Map map; Variant::Map& headers(msg.getProperties()); - headers["method"] = "response"; - headers["qmf.opcode"] = "_query_response"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = agentName; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE; + headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; if (!final) - headers["partial"] = Variant(); + headers[protocol::HEADER_KEY_PARTIAL] = Variant(); Variant::List body; AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); @@ -878,9 +927,7 @@ void AgentSessionImpl::flushResponses(AgentEvent& event, bool final) msg.setCorrelationId(eventImpl.getCorrelationId()); encode(body, msg); - Sender sender(session.createSender(eventImpl.getReplyTo())); - sender.send(msg); - sender.close(); + send(msg, eventImpl.getReplyTo()); QPID_LOG(trace, "SENT QueryResponse to=" << eventImpl.getReplyTo()); } @@ -894,6 +941,7 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds) // if (seconds == lastVisit) return; + //uint64_t thisInterval(seconds - lastVisit); lastVisit = seconds; // diff --git a/cpp/src/qmf/AgentSubscription.cpp b/cpp/src/qmf/AgentSubscription.cpp new file mode 100644 index 0000000000..4dc5cb74a4 --- /dev/null +++ b/cpp/src/qmf/AgentSubscription.cpp @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/AgentSubscription.h" + +using namespace qmf; + +AgentSubscription::AgentSubscription(uint64_t _id, uint64_t _interval, uint64_t _life, + const std::string& _replyTo, const std::string& _cid, Query _query) : + id(_id), interval(_interval), lifetime(_life), timeSincePublish(0), timeSinceKeepalive(0), + replyTo(_replyTo), cid(_cid), query(_query) +{ +} + + +AgentSubscription::~AgentSubscription() +{ +} + + +bool AgentSubscription::tick(uint64_t seconds) +{ + timeSinceKeepalive += seconds; + if (timeSinceKeepalive >= lifetime) + return false; + + timeSincePublish += seconds; + if (timeSincePublish >= interval) { + } + + return true; +} + diff --git a/cpp/src/qmf/AgentSubscription.h b/cpp/src/qmf/AgentSubscription.h new file mode 100644 index 0000000000..01e8f43e9f --- /dev/null +++ b/cpp/src/qmf/AgentSubscription.h @@ -0,0 +1,52 @@ +#ifndef _QMF_AGENT_SUBSCRIPTION_H_ +#define _QMF_AGENT_SUBSCRIPTION_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IntegerTypes.h" +#include "qpid/types/Variant.h" +#include "qmf/Query.h" +#include "qmf/Data.h" +#include <boost/shared_ptr.hpp> + +namespace qmf { + class AgentSubscription { + public: + AgentSubscription(uint64_t _id, uint64_t _interval, uint64_t _life, + const std::string& _replyTo, const std::string& _cid, Query _query); + ~AgentSubscription(); + bool tick(uint64_t seconds); + void keepalive() { timeSinceKeepalive = 0; } + + private: + uint64_t id; + uint64_t interval; + uint64_t lifetime; + uint64_t timeSincePublish; + uint64_t timeSinceKeepalive; + const std::string replyTo; + const std::string cid; + Query query; + }; + +} + +#endif diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp index dc2bbe34ee..f327170c5e 100644 --- a/cpp/src/qmf/ConsoleSession.cpp +++ b/cpp/src/qmf/ConsoleSession.cpp @@ -25,14 +25,19 @@ #include "qmf/SchemaId.h" #include "qmf/SchemaImpl.h" #include "qmf/ConsoleEventImpl.h" +#include "qmf/constants.h" #include "qpid/log/Statement.h" #include "qpid/messaging/AddressParser.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" using namespace std; -using namespace qpid::messaging; using namespace qmf; +using qpid::messaging::Address; +using qpid::messaging::Connection; +using qpid::messaging::Receiver; +using qpid::messaging::Duration; +using qpid::messaging::Message; using qpid::types::Variant; typedef qmf::PrivateImplRef<ConsoleSession> PI; @@ -51,6 +56,8 @@ bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextE uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); } Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); } Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); } +Subscription ConsoleSession::subscribe(const Query& q, const string& f, const string& o) { return impl->subscribe(q, f, o); } +Subscription ConsoleSession::subscribe(const string& q, const string& f, const string& o) { return impl->subscribe(q, f, o); } //======================================================================================== // Impl Method Bodies @@ -227,6 +234,18 @@ Agent ConsoleSessionImpl::getAgent(uint32_t i) const } +Subscription ConsoleSessionImpl::subscribe(const Query&, const string&, const string&) +{ + return Subscription(); +} + + +Subscription ConsoleSessionImpl::subscribe(const string&, const string&, const string&) +{ + return Subscription(); +} + + void ConsoleSessionImpl::enqueueEvent(const ConsoleEvent& event) { qpid::sys::Mutex::ScopedLock l(lock); @@ -249,17 +268,17 @@ void ConsoleSessionImpl::dispatch(Message msg) Variant::Map::const_iterator iter; Variant::Map::const_iterator oiter; - oiter = properties.find("qmf.opcode"); - iter = properties.find("x-amqp-0-10.app-id"); + oiter = properties.find(protocol::HEADER_KEY_OPCODE); + iter = properties.find(protocol::HEADER_KEY_APP_ID); if (iter == properties.end()) iter = properties.find("app_id"); - if (iter != properties.end() && iter->second.asString() == "qmf2" && oiter != properties.end()) { + if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF && oiter != properties.end()) { // // Dispatch a QMFv2 formatted message // const string& opcode = oiter->second.asString(); - iter = properties.find("qmf.agent"); + iter = properties.find(protocol::HEADER_KEY_AGENT); if (iter == properties.end()) { QPID_LOG(trace, "Message received with no 'qmf.agent' header"); return; @@ -277,7 +296,7 @@ void ConsoleSessionImpl::dispatch(Message msg) } if (msg.getContentType() == "amqp/map" && - (opcode == "_agent_heartbeat_indication" || opcode == "_agent_locate_response")) { + (opcode == protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION || opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE)) { // // This is the one case where it's ok (necessary actually) to receive a QMFv2 // message from an unknown agent (how else are they going to get known?) @@ -297,8 +316,8 @@ void ConsoleSessionImpl::dispatch(Message msg) Variant::Map content; decode(msg, content); - if (opcode == "_exception") agentImpl.handleException(content, msg); - else if (opcode == "_method_response") agentImpl.handleMethodResponse(content, msg); + if (opcode == protocol::HEADER_OPCODE_EXCEPTION) agentImpl.handleException(content, msg); + else if (opcode == protocol::HEADER_OPCODE_METHOD_RESPONSE) agentImpl.handleMethodResponse(content, msg); else QPID_LOG(error, "Received a map-formatted QMFv2 message with opcode=" << opcode); @@ -309,8 +328,8 @@ void ConsoleSessionImpl::dispatch(Message msg) Variant::List content; decode(msg, content); - if (opcode == "_query_response") agentImpl.handleQueryResponse(content, msg); - else if (opcode == "_data_indication") agentImpl.handleDataIndication(content, msg); + if (opcode == protocol::HEADER_OPCODE_QUERY_RESPONSE) agentImpl.handleQueryResponse(content, msg); + else if (opcode == protocol::HEADER_OPCODE_DATA_INDICATION) agentImpl.handleDataIndication(content, msg); else QPID_LOG(error, "Received a list-formatted QMFv2 message with opcode=" << opcode); @@ -344,9 +363,9 @@ void ConsoleSessionImpl::sendBrokerLocate() Message msg; Variant::Map& headers(msg.getProperties()); - headers["method"] = "request"; - headers["qmf.opcode"] = "_agent_locate_request"; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setReplyTo(replyAddress); msg.setCorrelationId("broker-locate"); @@ -363,9 +382,9 @@ void ConsoleSessionImpl::sendAgentLocate() Message msg; Variant::Map& headers(msg.getProperties()); - headers["method"] = "request"; - headers["qmf.opcode"] = "_agent_locate_request"; - headers["x-amqp-0-10.app-id"] = "qmf2"; + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; msg.setReplyTo(replyAddress); msg.setCorrelationId("agent-locate"); @@ -390,13 +409,17 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian return; Variant::Map attrs(iter->second.asMap()); - iter = attrs.find("epoch"); + iter = attrs.find(protocol::AGENT_ATTR_EPOCH); if (iter != attrs.end()) epoch = iter->second.asUint32(); if (cid == "broker-locate") { qpid::sys::Mutex::ScopedLock l(lock); - agent = Agent(new AgentImpl(agentName, epoch, *this)); + auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this)); + for (iter = attrs.begin(); iter != attrs.end(); iter++) + if (iter->first != protocol::AGENT_ATTR_EPOCH) + impl->setAttribute(iter->first, iter->second); + agent = Agent(impl.release()); connectedBrokerAgent = agent; if (!agentQuery || agentQuery.matchesPredicate(attrs)) { connectedBrokerInAgentList = true; @@ -430,7 +453,7 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian // auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this)); for (iter = attrs.begin(); iter != attrs.end(); iter++) - if (iter->first != "epoch") + if (iter->first != protocol::AGENT_ATTR_EPOCH) impl->setAttribute(iter->first, iter->second); agent = Agent(impl.release()); agents[agentName] = agent; @@ -459,16 +482,17 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian enqueueEventLH(ConsoleEvent(eventImpl.release())); } - iter = attrs.find("schemaUpdated"); + iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP); if (iter != attrs.end()) { uint64_t ts(iter->second.asUint64()); - if (ts > impl.getAttribute("schemaUpdated").asUint64()) { + if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) { // // The agent has added new schema entries since we last heard from it. // Enqueue a notification. // auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE)); eventImpl->setAgent(agent); + impl.setAttribute(iter->first, iter->second); enqueueEventLH(ConsoleEvent(eventImpl.release())); } } diff --git a/cpp/src/qmf/ConsoleSessionImpl.h b/cpp/src/qmf/ConsoleSessionImpl.h index 3d10b63c69..85ddc820f4 100644 --- a/cpp/src/qmf/ConsoleSessionImpl.h +++ b/cpp/src/qmf/ConsoleSessionImpl.h @@ -61,6 +61,8 @@ namespace qmf { uint32_t getAgentCount() const; Agent getAgent(uint32_t i) const; Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; } + Subscription subscribe(const Query&, const std::string& agentFilter, const std::string& options); + Subscription subscribe(const std::string&, const std::string& agentFilter, const std::string& options); protected: mutable qpid::sys::Mutex lock; diff --git a/cpp/src/qmf/Data.cpp b/cpp/src/qmf/Data.cpp index 0ceca6e1e9..c503bab445 100644 --- a/cpp/src/qmf/Data.cpp +++ b/cpp/src/qmf/Data.cpp @@ -21,8 +21,10 @@ #include "qmf/DataImpl.h" #include "qmf/DataAddrImpl.h" +#include "qmf/SchemaImpl.h" #include "qmf/SchemaIdImpl.h" #include "qmf/PrivateImplRef.h" +#include "qmf/SchemaProperty.h" using namespace std; using namespace qmf; @@ -35,8 +37,7 @@ Data::Data(const Data& s) : qmf::Handle<DataImpl>() { PI::copy(*this, s); } Data::~Data() { PI::dtor(*this); } Data& Data::operator=(const Data& s) { return PI::assign(*this, s); } -Data::Data(const SchemaId& s) { PI::ctor(*this, new DataImpl(s)); } -void Data::setSchema(const SchemaId& s) { impl->setSchema(s); } +Data::Data(const Schema& s) { PI::ctor(*this, new DataImpl(s)); } void Data::setAddr(const DataAddr& a) { impl->setAddr(a); } void Data::setProperty(const string& k, const qpid::types::Variant& v) { impl->setProperty(k, v); } void Data::overwriteProperties(const qpid::types::Variant::Map& m) { impl->overwriteProperties(m); } @@ -103,6 +104,20 @@ Variant::Map DataImpl::asMap() const } +void DataImpl::setProperty(const std::string& k, const qpid::types::Variant& v) +{ + if (schema.isValid()) { + // + // If we have a valid schema, make sure that the property is included in the + // schema and that the variant type is compatible with the schema type. + // + if (!SchemaImplAccess::get(schema).isValidProperty(k, v)) + throw QmfException("Property '" + k + "' either not in the schema or value is of incompatible type"); + } + properties[k] = v; +} + + DataImpl& DataImplAccess::get(Data& item) { return *item.impl; diff --git a/cpp/src/qmf/DataImpl.h b/cpp/src/qmf/DataImpl.h index 38b62791fc..4ac3197da0 100644 --- a/cpp/src/qmf/DataImpl.h +++ b/cpp/src/qmf/DataImpl.h @@ -24,8 +24,10 @@ #include "qpid/RefCounted.h" #include "qmf/Data.h" #include "qmf/SchemaId.h" +#include "qmf/Schema.h" #include "qmf/DataAddr.h" #include "qmf/Agent.h" +#include "qmf/AgentSubscription.h" #include "qpid/types/Variant.h" namespace qmf { @@ -37,18 +39,21 @@ namespace qmf { DataImpl(const qpid::types::Variant::Map&, const Agent&); qpid::types::Variant::Map asMap() const; DataImpl() {} + void addSubscription(boost::shared_ptr<AgentSubscription>); + void delSubscription(uint64_t); + qpid::types::Variant::Map publishSubscription(uint64_t); + const Schema& getSchema() const { return schema; } // // Methods from API handle // - DataImpl(const SchemaId& s) : schemaId(s) {} - void setSchema(const SchemaId& s) { schemaId = s; } + DataImpl(const Schema& s) : schema(s) {} void setAddr(const DataAddr& a) { dataAddr = a; } - void setProperty(const std::string& k, const qpid::types::Variant& v) { properties[k] = v; } + void setProperty(const std::string& k, const qpid::types::Variant& v); void overwriteProperties(const qpid::types::Variant::Map& m); - bool hasSchema() const { return schemaId.isValid(); } + bool hasSchema() const { return schemaId.isValid() || schema.isValid(); } bool hasAddr() const { return dataAddr.isValid(); } - const SchemaId& getSchemaId() const { return schemaId; } + const SchemaId& getSchemaId() const { if (schema.isValid()) return schema.getSchemaId(); else return schemaId; } const DataAddr& getAddr() const { return dataAddr; } const qpid::types::Variant& getProperty(const std::string& k) const; const qpid::types::Variant::Map& getProperties() const { return properties; } @@ -56,7 +61,14 @@ namespace qmf { const Agent& getAgent() const { return agent; } private: + struct Subscr { + boost::shared_ptr<AgentSubscription> subscription; + qpid::types::Variant::Map deltas; + }; + std::map<uint64_t, boost::shared_ptr<Subscr> > subscriptions; + SchemaId schemaId; + Schema schema; DataAddr dataAddr; qpid::types::Variant::Map properties; Agent agent; diff --git a/cpp/src/qmf/Schema.cpp b/cpp/src/qmf/Schema.cpp index e003f9d06f..f25eb4635b 100644 --- a/cpp/src/qmf/Schema.cpp +++ b/cpp/src/qmf/Schema.cpp @@ -192,6 +192,55 @@ string SchemaImpl::asV1Content(uint32_t sequence) const } +bool SchemaImpl::isValidProperty(const std::string& k, const Variant& v) const +{ + for (list<SchemaProperty>::const_iterator iter = properties.begin(); iter != properties.end(); iter++) + if (iter->getName() == k) + return (isCompatibleType(iter->getType(), v.getType())); + return false; +} + + +bool SchemaImpl::isValidMethodInArg(const std::string& m, const std::string& k, const Variant& v) const +{ + for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++) { + if (mIter->getName() == m) { + uint32_t count(mIter->getArgumentCount()); + for (uint32_t i = 0; i < count; i++) { + const SchemaProperty prop(mIter->getArgument(i)); + if (prop.getName() == k) { + if (prop.getDirection() == DIR_IN || prop.getDirection() == DIR_IN_OUT) + return (isCompatibleType(prop.getType(), v.getType())); + else + return false; + } + } + } + } + return false; +} + + +bool SchemaImpl::isValidMethodOutArg(const std::string& m, const std::string& k, const Variant& v) const +{ + for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++) { + if (mIter->getName() == m) { + uint32_t count(mIter->getArgumentCount()); + for (uint32_t i = 0; i < count; i++) { + const SchemaProperty prop(mIter->getArgument(i)); + if (prop.getName() == k) { + if (prop.getDirection() == DIR_OUT || prop.getDirection() == DIR_IN_OUT) + return (isCompatibleType(prop.getType(), v.getType())); + else + return false; + } + } + } + } + return false; +} + + void SchemaImpl::finalize() { Hash hash; @@ -246,6 +295,57 @@ void SchemaImpl::checkNotFinal() const } +bool SchemaImpl::isCompatibleType(int qmfType, qpid::types::VariantType qpidType) const +{ + bool typeValid(false); + + switch (qpidType) { + case qpid::types::VAR_VOID: + if (qmfType == SCHEMA_DATA_VOID) + typeValid = true; + break; + case qpid::types::VAR_BOOL: + if (qmfType == SCHEMA_DATA_BOOL) + typeValid = true; + break; + case qpid::types::VAR_UINT8: + case qpid::types::VAR_UINT16: + case qpid::types::VAR_UINT32: + case qpid::types::VAR_UINT64: + case qpid::types::VAR_INT8: + case qpid::types::VAR_INT16: + case qpid::types::VAR_INT32: + case qpid::types::VAR_INT64: + if (qmfType == SCHEMA_DATA_INT) + typeValid = true; + break; + case qpid::types::VAR_FLOAT: + case qpid::types::VAR_DOUBLE: + if (qmfType == SCHEMA_DATA_FLOAT) + typeValid = true; + break; + case qpid::types::VAR_STRING: + if (qmfType == SCHEMA_DATA_STRING) + typeValid = true; + break; + case qpid::types::VAR_MAP: + if (qmfType == SCHEMA_DATA_BOOL) + typeValid = true; + break; + case qpid::types::VAR_LIST: + if (qmfType == SCHEMA_DATA_LIST) + typeValid = true; + break; + case qpid::types::VAR_UUID: + if (qmfType == SCHEMA_DATA_UUID) + typeValid = true; + break; + } + + return typeValid; +} + + SchemaImpl& SchemaImplAccess::get(Schema& item) { return *item.impl; diff --git a/cpp/src/qmf/SchemaImpl.h b/cpp/src/qmf/SchemaImpl.h index 267a5d5138..1c88f87808 100644 --- a/cpp/src/qmf/SchemaImpl.h +++ b/cpp/src/qmf/SchemaImpl.h @@ -46,6 +46,9 @@ namespace qmf { qpid::types::Variant::Map asMap() const; SchemaImpl(qpid::management::Buffer& v1Buffer); std::string asV1Content(uint32_t sequence) const; + bool isValidProperty(const std::string& k, const qpid::types::Variant& v) const; + bool isValidMethodInArg(const std::string& m, const std::string& k, const qpid::types::Variant& v) const; + bool isValidMethodOutArg(const std::string& m, const std::string& k, const qpid::types::Variant& v) const; // // Methods from API handle @@ -79,6 +82,7 @@ namespace qmf { void checkFinal() const; void checkNotFinal() const; + bool isCompatibleType(int qmfType, qpid::types::VariantType qpidType) const; }; struct SchemaImplAccess diff --git a/cpp/src/qmf/Subscription.cpp b/cpp/src/qmf/Subscription.cpp new file mode 100644 index 0000000000..73afc8c79d --- /dev/null +++ b/cpp/src/qmf/Subscription.cpp @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/PrivateImplRef.h" +#include "qmf/exceptions.h" +#include "qmf/SubscriptionImpl.h" +#include "qmf/DataImpl.h" + +using namespace std; +using namespace qmf; +using qpid::types::Variant; + +typedef PrivateImplRef<Subscription> PI; + +Subscription::Subscription(SubscriptionImpl* impl) { PI::ctor(*this, impl); } +Subscription::Subscription(const Subscription& s) : qmf::Handle<SubscriptionImpl>() { PI::copy(*this, s); } +Subscription::~Subscription() { PI::dtor(*this); } +Subscription& Subscription::operator=(const Subscription& s) { return PI::assign(*this, s); } + +void Subscription::cancel() { impl->cancel(); } +bool Subscription::isActive() const { return impl->isActive(); } +void Subscription::lock() { impl->lock(); } +void Subscription::unlock() { impl->unlock(); } +uint32_t Subscription::getDataCount() const { return impl->getDataCount(); } +Data Subscription::getData(uint32_t i) const { return impl->getData(i); } + + +void SubscriptionImpl::cancel() +{ +} + + +bool SubscriptionImpl::isActive() const +{ + return false; +} + + +void SubscriptionImpl::lock() +{ +} + + +void SubscriptionImpl::unlock() +{ +} + + +uint32_t SubscriptionImpl::getDataCount() const +{ + return 0; +} + + +Data SubscriptionImpl::getData(uint32_t) const +{ + return Data(); +} + + +SubscriptionImpl& SubscriptionImplAccess::get(Subscription& item) +{ + return *item.impl; +} + + +const SubscriptionImpl& SubscriptionImplAccess::get(const Subscription& item) +{ + return *item.impl; +} diff --git a/cpp/src/qmf/SubscriptionImpl.h b/cpp/src/qmf/SubscriptionImpl.h new file mode 100644 index 0000000000..053e3cd00e --- /dev/null +++ b/cpp/src/qmf/SubscriptionImpl.h @@ -0,0 +1,57 @@ +#ifndef _QMF_SUBSCRIPTION_IMPL_H_ +#define _QMF_SUBSCRIPTION_IMPL_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/Subscription.h" + +namespace qmf { + class SubscriptionImpl : public virtual qpid::RefCounted { + public: + // + // Public impl-only methods + // + SubscriptionImpl(int p) : placeholder(p) {} + ~SubscriptionImpl(); + + // + // Methods from API handle + // + void cancel(); + bool isActive() const; + void lock(); + void unlock(); + uint32_t getDataCount() const; + Data getData(uint32_t) const; + + private: + int placeholder; + }; + + struct SubscriptionImplAccess + { + static SubscriptionImpl& get(Subscription&); + static const SubscriptionImpl& get(const Subscription&); + }; +} + +#endif diff --git a/cpp/src/qmf/constants.cpp b/cpp/src/qmf/constants.cpp new file mode 100644 index 0000000000..6e2fd935a9 --- /dev/null +++ b/cpp/src/qmf/constants.cpp @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "constants.h" + +using namespace std; +using namespace qmf; + +/** + * Header key strings + */ +const string protocol::HEADER_KEY_APP_ID = "x-amqp-0-10.app-id"; +const string protocol::HEADER_KEY_METHOD = "method"; +const string protocol::HEADER_KEY_OPCODE = "qmf.opcode"; +const string protocol::HEADER_KEY_AGENT = "qmf.agent"; +const string protocol::HEADER_KEY_CONTENT = "qmf.content"; +const string protocol::HEADER_KEY_PARTIAL = "partial"; + +/** + * Header values per-key + */ +const string protocol::HEADER_APP_ID_QMF = "qmf2"; + +const string protocol::HEADER_METHOD_REQUEST = "request"; +const string protocol::HEADER_METHOD_RESPONSE = "response"; +const string protocol::HEADER_METHOD_INDICATION = "indication"; + +const string protocol::HEADER_OPCODE_EXCEPTION = "_exception"; +const string protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST = "_agent_locate_request"; +const string protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE = "_agent_locate_response"; +const string protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION = "_agent_heartbeat_indication"; +const string protocol::HEADER_OPCODE_QUERY_REQUEST = "_query_request"; +const string protocol::HEADER_OPCODE_QUERY_RESPONSE = "_query_response"; +const string protocol::HEADER_OPCODE_SUBSCRIBE_REQUEST = "_subscribe_request"; +const string protocol::HEADER_OPCODE_SUBSCRIBE_RESPONSE = "_subscribe_response"; +const string protocol::HEADER_OPCODE_SUBSCRIBE_CANCEL_INDICATION = "_subscribe_cancel_indication"; +const string protocol::HEADER_OPCODE_SUBSCRIBE_REFRESH_INDICATION = "_subscribe_refresh_indication"; +const string protocol::HEADER_OPCODE_DATA_INDICATION = "_data_indication"; +const string protocol::HEADER_OPCODE_METHOD_REQUEST = "_method_request"; +const string protocol::HEADER_OPCODE_METHOD_RESPONSE = "_method_response"; + +const string protocol::HEADER_CONTENT_SCHEMA_ID = "_schema_id"; +const string protocol::HEADER_CONTENT_SCHEMA_CLASS = "_schema_class"; +const string protocol::HEADER_CONTENT_OBJECT_ID = "_object_id"; +const string protocol::HEADER_CONTENT_DATA = "_data"; +const string protocol::HEADER_CONTENT_EVENT = "_event"; +const string protocol::HEADER_CONTENT_QUERY = "_query"; + +/** + * Keywords for Agent attributes + */ +const string protocol::AGENT_ATTR_VENDOR = "_vendor"; +const string protocol::AGENT_ATTR_PRODUCT = "_product"; +const string protocol::AGENT_ATTR_INSTANCE = "_instance"; +const string protocol::AGENT_ATTR_NAME = "_name"; +const string protocol::AGENT_ATTR_TIMESTAMP = "_timestamp"; +const string protocol::AGENT_ATTR_HEARTBEAT_INTERVAL = "_heartbeat_interval"; +const string protocol::AGENT_ATTR_EPOCH = "_epoch"; +const string protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP = "_schema_updated"; diff --git a/cpp/src/qmf/constants.h b/cpp/src/qmf/constants.h new file mode 100644 index 0000000000..79beaaf1ca --- /dev/null +++ b/cpp/src/qmf/constants.h @@ -0,0 +1,83 @@ +#ifndef QMF_CONSTANTS_H +#define QMF_CONSTANTS_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> + +namespace qmf { + + struct protocol { + /** + * Header key strings + */ + static const std::string HEADER_KEY_APP_ID; + static const std::string HEADER_KEY_METHOD; + static const std::string HEADER_KEY_OPCODE; + static const std::string HEADER_KEY_AGENT; + static const std::string HEADER_KEY_CONTENT; + static const std::string HEADER_KEY_PARTIAL; + + /** + * Header values per-key + */ + static const std::string HEADER_APP_ID_QMF; + + static const std::string HEADER_METHOD_REQUEST; + static const std::string HEADER_METHOD_RESPONSE; + static const std::string HEADER_METHOD_INDICATION; + + static const std::string HEADER_OPCODE_EXCEPTION; + static const std::string HEADER_OPCODE_AGENT_LOCATE_REQUEST; + static const std::string HEADER_OPCODE_AGENT_LOCATE_RESPONSE; + static const std::string HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION; + static const std::string HEADER_OPCODE_QUERY_REQUEST; + static const std::string HEADER_OPCODE_QUERY_RESPONSE; + static const std::string HEADER_OPCODE_SUBSCRIBE_REQUEST; + static const std::string HEADER_OPCODE_SUBSCRIBE_RESPONSE; + static const std::string HEADER_OPCODE_SUBSCRIBE_CANCEL_INDICATION; + static const std::string HEADER_OPCODE_SUBSCRIBE_REFRESH_INDICATION; + static const std::string HEADER_OPCODE_DATA_INDICATION; + static const std::string HEADER_OPCODE_METHOD_REQUEST; + static const std::string HEADER_OPCODE_METHOD_RESPONSE; + + static const std::string HEADER_CONTENT_SCHEMA_ID; + static const std::string HEADER_CONTENT_SCHEMA_CLASS; + static const std::string HEADER_CONTENT_OBJECT_ID; + static const std::string HEADER_CONTENT_DATA; + static const std::string HEADER_CONTENT_EVENT; + static const std::string HEADER_CONTENT_QUERY; + + /** + * Keywords for Agent attributes + */ + static const std::string AGENT_ATTR_VENDOR; + static const std::string AGENT_ATTR_PRODUCT; + static const std::string AGENT_ATTR_INSTANCE; + static const std::string AGENT_ATTR_NAME; + static const std::string AGENT_ATTR_TIMESTAMP; + static const std::string AGENT_ATTR_HEARTBEAT_INTERVAL; + static const std::string AGENT_ATTR_EPOCH; + static const std::string AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP; + }; +} + +#endif |
