summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2011-01-10 14:06:16 +0000
committerTed Ross <tross@apache.org>2011-01-10 14:06:16 +0000
commit120ea440ef9d048d3bb31e6118027f5c9e890fca (patch)
tree34e14880765c6b79c77c4fb24e834b8d67260149 /cpp
parent598e5eacac716a9a3a812e9cf72b14bde57ed45a (diff)
downloadqpid-python-120ea440ef9d048d3bb31e6118027f5c9e890fca.tar.gz
Updates to the C++ implementation of QMFv2:
1) Consolidated string constants for the protocol into a definition file. 2) Added hooks for subscription handling. 3) Added checks to validate properties and arguments against the schema (if there is a schema). git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1057199 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/qmf/Agent.h7
-rw-r--r--cpp/include/qmf/AgentSession.h9
-rw-r--r--cpp/include/qmf/ConsoleEvent.h6
-rw-r--r--cpp/include/qmf/ConsoleSession.h11
-rw-r--r--cpp/include/qmf/Data.h4
-rw-r--r--cpp/include/qmf/Subscription.h82
-rw-r--r--cpp/src/qmf.mk11
-rw-r--r--cpp/src/qmf/Agent.cpp13
-rw-r--r--cpp/src/qmf/AgentSession.cpp190
-rw-r--r--cpp/src/qmf/AgentSubscription.cpp51
-rw-r--r--cpp/src/qmf/AgentSubscription.h52
-rw-r--r--cpp/src/qmf/ConsoleSession.cpp66
-rw-r--r--cpp/src/qmf/ConsoleSessionImpl.h2
-rw-r--r--cpp/src/qmf/Data.cpp19
-rw-r--r--cpp/src/qmf/DataImpl.h22
-rw-r--r--cpp/src/qmf/Schema.cpp100
-rw-r--r--cpp/src/qmf/SchemaImpl.h4
-rw-r--r--cpp/src/qmf/Subscription.cpp88
-rw-r--r--cpp/src/qmf/SubscriptionImpl.h57
-rw-r--r--cpp/src/qmf/constants.cpp77
-rw-r--r--cpp/src/qmf/constants.h83
21 files changed, 841 insertions, 113 deletions
diff --git a/cpp/include/qmf/Agent.h b/cpp/include/qmf/Agent.h
index 8d427ab2fb..8c0f48b8b1 100644
--- a/cpp/include/qmf/Agent.h
+++ b/cpp/include/qmf/Agent.h
@@ -23,6 +23,7 @@
#include <qmf/ImportExport.h>
#include "qmf/Handle.h"
+//#include "qmf/Subscription.h"
#include "qmf/exceptions.h"
#include "qpid/messaging/Duration.h"
#include "qpid/types/Variant.h"
@@ -61,6 +62,12 @@ namespace qmf {
QMF_EXTERN uint32_t queryAsync(const Query&);
QMF_EXTERN uint32_t queryAsync(const std::string&);
+ /**
+ * Create a subscription to this agent
+ */
+ //QMF_EXTERN Subscription subscribe(const Query&, const std::string& options = "");
+ //QMF_EXTERN Subscription subscribe(const std::string&, const std::string& options = "");
+
QMF_EXTERN ConsoleEvent callMethod(const std::string&, const qpid::types::Variant::Map&, const DataAddr&,
qpid::messaging::Duration timeout=qpid::messaging::Duration::MINUTE);
QMF_EXTERN uint32_t callMethodAsync(const std::string&, const qpid::types::Variant::Map&, const DataAddr&);
diff --git a/cpp/include/qmf/AgentSession.h b/cpp/include/qmf/AgentSession.h
index 4ac2b2f3ed..090017779f 100644
--- a/cpp/include/qmf/AgentSession.h
+++ b/cpp/include/qmf/AgentSession.h
@@ -57,11 +57,16 @@ namespace qmf {
* The options string is of the form "{key:value,key:value}". The following keys are supported:
*
* interval:N - Heartbeat interval in seconds [default: 60]
- * external:{True,False} - Use external data storage (queries are pass-through) [default: False]
+ * external:{True,False} - Use external data storage (queries and subscriptions are pass-through) [default: False]
* allow-queries:{True,False} - If True: automatically allow all queries [default]
* If False: generate an AUTH_QUERY event to allow per-query authorization
* allow-methods:{True,False} - If True: automatically allow all methods [default]
* If False: generate an AUTH_METHOD event to allow per-method authorization
+ * max-subscriptions:N - Maximum number of concurrent subscription queries permitted [default: 64]
+ * min-sub-interval:N - Minimum publish interval (in milliseconds) permitted for a subscription [default: 3000]
+ * sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300]
+ * public-events:{True,False} - If True: QMF events are sent to the topic exchange [default]
+ * If False: QMF events are only sent to authorized subscribers
*/
QMF_EXTERN AgentSession(qpid::messaging::Connection&, const std::string& options="");
@@ -143,7 +148,7 @@ namespace qmf {
* authReject - Reject/forbid an authorization request.
* raiseException - indicate failure of an operation (i.e. query or method call).
* response - Provide data in response to a query (only for option: external:True)
- * complete - Indicate that the response to a query is complete (external:true only)
+ * complete - Indicate that the response to a query is complete (external:True only)
* methodSuccess - Indicate the successful completion of a method call.
*/
QMF_EXTERN void authAccept(AgentEvent&);
diff --git a/cpp/include/qmf/ConsoleEvent.h b/cpp/include/qmf/ConsoleEvent.h
index 3e75631a61..54272334a4 100644
--- a/cpp/include/qmf/ConsoleEvent.h
+++ b/cpp/include/qmf/ConsoleEvent.h
@@ -46,8 +46,10 @@ namespace qmf {
CONSOLE_QUERY_RESPONSE = 7,
CONSOLE_METHOD_RESPONSE = 8,
CONSOLE_EXCEPTION = 9,
- CONSOLE_SUBSCRIBE_UPDATE = 10,
- CONSOLE_THREAD_FAILED = 11
+ CONSOLE_SUBSCRIBE_ADD = 10,
+ CONSOLE_SUBSCRIBE_UPDATE = 11,
+ CONSOLE_SUBSCRIBE_DEL = 12,
+ CONSOLE_THREAD_FAILED = 13
};
enum AgentDelReason {
diff --git a/cpp/include/qmf/ConsoleSession.h b/cpp/include/qmf/ConsoleSession.h
index c17f4510f1..ba8b3de92f 100644
--- a/cpp/include/qmf/ConsoleSession.h
+++ b/cpp/include/qmf/ConsoleSession.h
@@ -24,6 +24,7 @@
#include <qmf/ImportExport.h>
#include "qmf/Handle.h"
#include "qmf/Agent.h"
+#include "qmf/Subscription.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Connection.h"
#include <string>
@@ -67,6 +68,16 @@ namespace qmf {
QMF_EXTERN Agent getAgent(uint32_t) const;
QMF_EXTERN Agent getConnectedBrokerAgent() const;
+ /**
+ * Create a subscription that involves a subset of the known agents. The set of known agents is defined by
+ * the session's agent-filter (see setAgentFilter). The agentFilter argument to the subscribe method is used
+ * to further refine the set of agents. If agentFilter is the empty string (i.e. match-all) the subscription
+ * will involve all known agents. If agentFilter is non-empty, it will be applied only to the set of known
+ * agents. A subscription cannot be created that involves an agent not known by the session.
+ */
+ QMF_EXTERN Subscription subscribe(const Query&, const std::string& agentFilter = "", const std::string& options = "");
+ QMF_EXTERN Subscription subscribe(const std::string&, const std::string& agentFilter = "", const std::string& options = "");
+
#ifndef SWIG
private:
friend class qmf::PrivateImplRef<ConsoleSession>;
diff --git a/cpp/include/qmf/Data.h b/cpp/include/qmf/Data.h
index 27af1c4b04..82f1569a0b 100644
--- a/cpp/include/qmf/Data.h
+++ b/cpp/include/qmf/Data.h
@@ -34,6 +34,7 @@ namespace qmf {
#endif
class DataImpl;
+ class Schema;
class SchemaId;
class DataAddr;
class Agent;
@@ -45,8 +46,7 @@ namespace qmf {
QMF_EXTERN Data& operator=(const Data&);
QMF_EXTERN ~Data();
- QMF_EXTERN Data(const SchemaId&);
- QMF_EXTERN void setSchema(const SchemaId&);
+ QMF_EXTERN Data(const Schema&);
QMF_EXTERN void setAddr(const DataAddr&);
QMF_EXTERN void setProperty(const std::string&, const qpid::types::Variant&);
QMF_EXTERN void overwriteProperties(const qpid::types::Variant::Map&);
diff --git a/cpp/include/qmf/Subscription.h b/cpp/include/qmf/Subscription.h
new file mode 100644
index 0000000000..4e60eb984e
--- /dev/null
+++ b/cpp/include/qmf/Subscription.h
@@ -0,0 +1,82 @@
+#ifndef QMF_SUBSCRIPTION_H
+#define QMF_SUBSCRIPTION_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qmf/ImportExport.h>
+#include "qmf/Handle.h"
+#include "qpid/types/Variant.h"
+#include <string>
+
+namespace qmf {
+
+#ifndef SWIG
+ template <class> class PrivateImplRef;
+#endif
+
+ class SubscriptionImpl;
+ class Data;
+
+ class Subscription : public qmf::Handle<SubscriptionImpl> {
+ public:
+ QMF_EXTERN Subscription(SubscriptionImpl* impl = 0);
+ QMF_EXTERN Subscription(const Subscription&);
+ QMF_EXTERN Subscription& operator=(const Subscription&);
+ QMF_EXTERN ~Subscription();
+
+ /**
+ * Construction: A subscription is created by calling ConsoleSession::subscribe.
+ */
+
+ /**
+ * Cancel subscriptions to all subscribed agents. After this is called, the subscription
+ * shall be inactive.
+ */
+ QMF_EXTERN void cancel();
+
+ /**
+ * Check to see if this subscription is active. It is active if it has a live subscription
+ * on at least one agent. If it is not active, there is nothing that can be done to make it
+ * active, it can only be deleted.
+ */
+ QMF_EXTERN bool isActive() const;
+
+ /**
+ * lock and unlock should be used to bracket a traversal of the data set. After lock is called,
+ * the subscription will not change its set of available data objects. Between calls to getDataCount
+ * and getData, no data objects will be added or removed. After unlock is called, the set of data
+ * will catch up to any activity that occurred while the lock was in effect.
+ */
+ QMF_EXTERN void lock();
+ QMF_EXTERN void unlock();
+ QMF_EXTERN uint32_t getDataCount() const;
+ QMF_EXTERN Data getData(uint32_t) const;
+
+#ifndef SWIG
+ private:
+ friend class qmf::PrivateImplRef<Subscription>;
+ friend class SubscriptionImplAccess;
+#endif
+ };
+
+}
+
+#endif
diff --git a/cpp/src/qmf.mk b/cpp/src/qmf.mk
index d0a186e89f..f3462f1a93 100644
--- a/cpp/src/qmf.mk
+++ b/cpp/src/qmf.mk
@@ -51,7 +51,8 @@ QMF2_API = \
../include/qmf/SchemaId.h \
../include/qmf/SchemaMethod.h \
../include/qmf/SchemaProperty.h \
- ../include/qmf/SchemaTypes.h
+ ../include/qmf/SchemaTypes.h \
+ ../include/qmf/Subscription.h
#
@@ -91,10 +92,14 @@ libqmf2_la_SOURCES = \
qmf/AgentEventImpl.h \
qmf/AgentImpl.h \
qmf/AgentSession.cpp \
+ qmf/AgentSubscription.cpp \
+ qmf/AgentSubscription.h \
qmf/ConsoleEvent.cpp \
qmf/ConsoleEventImpl.h \
qmf/ConsoleSession.cpp \
qmf/ConsoleSessionImpl.h \
+ qmf/constants.cpp \
+ qmf/constants.h \
qmf/DataAddr.cpp \
qmf/DataAddrImpl.h \
qmf/Data.cpp \
@@ -116,7 +121,9 @@ libqmf2_la_SOURCES = \
qmf/SchemaMethod.cpp \
qmf/SchemaMethodImpl.h \
qmf/SchemaProperty.cpp \
- qmf/SchemaPropertyImpl.h
+ qmf/SchemaPropertyImpl.h \
+ qmf/Subscription.cpp \
+ qmf/SubscriptionImpl.h
libqmfengine_la_SOURCES = \
$(QMF_ENGINE_API) \
diff --git a/cpp/src/qmf/Agent.cpp b/cpp/src/qmf/Agent.cpp
index 05bf1a38aa..3a385b3741 100644
--- a/cpp/src/qmf/Agent.cpp
+++ b/cpp/src/qmf/Agent.cpp
@@ -27,6 +27,7 @@
#include "qmf/Query.h"
#include "qmf/SchemaImpl.h"
#include "qmf/agentCapability.h"
+#include "qmf/constants.h"
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/AddressParser.h"
#include "qpid/management/Buffer.h"
@@ -507,9 +508,9 @@ void AgentImpl::sendQuery(const Query& query, uint32_t correlator)
Variant::Map map;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "request";
- headers["qmf.opcode"] = "_query_request";
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_REQUEST;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
@@ -527,9 +528,9 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const
Variant::Map map;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "request";
- headers["qmf.opcode"] = "_method_request";
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
map["_method_name"] = method;
map["_object_id"] = addr.asMap();
diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp
index 20beda4851..3426167f87 100644
--- a/cpp/src/qmf/AgentSession.cpp
+++ b/cpp/src/qmf/AgentSession.cpp
@@ -30,6 +30,7 @@
#include "qmf/DataImpl.h"
#include "qmf/QueryImpl.h"
#include "qmf/agentCapability.h"
+#include "qmf/constants.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Condition.h"
#include "qpid/sys/Thread.h"
@@ -111,6 +112,10 @@ namespace qmf {
bool externalStorage;
bool autoAllowQueries;
bool autoAllowMethods;
+ uint32_t maxSubscriptions;
+ uint32_t minSubInterval;
+ uint32_t subLifetime;
+ bool publicEvents;
uint64_t schemaUpdateTime;
string directBase;
string topicBase;
@@ -129,6 +134,7 @@ namespace qmf {
void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&);
void dispatch(Message);
void sendHeartbeat();
+ void send(Message, const Address&);
void flushResponses(AgentEvent&, bool);
void periodicProcessing(uint64_t);
void run();
@@ -172,6 +178,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
connection(c), domain("default"), opened(false), thread(0), threadCanceled(false),
bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
+ maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
{
//
@@ -208,6 +215,22 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
iter = optMap.find("allow-methods");
if (iter != optMap.end())
autoAllowMethods = iter->second.asBool();
+
+ iter = optMap.find("max-subscriptions");
+ if (iter != optMap.end())
+ maxSubscriptions = iter->second.asUint32();
+
+ iter = optMap.find("min-sub-interval");
+ if (iter != optMap.end())
+ minSubInterval = iter->second.asUint32();
+
+ iter = optMap.find("sub-lifetime");
+ if (iter != optMap.end())
+ subLifetime = iter->second.asUint32();
+
+ iter = optMap.find("public-events");
+ if (iter != optMap.end())
+ publicEvents = iter->second.asBool();
}
}
@@ -421,20 +444,18 @@ void AgentSessionImpl::raiseException(AgentEvent& event, const Data& data)
Variant::Map map;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "response";
- headers["qmf.opcode"] = "_exception";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_EXCEPTION;
+ headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
const DataImpl& dataImpl(DataImplAccess::get(data));
msg.setCorrelationId(eventImpl.getCorrelationId());
encode(dataImpl.asMap(), msg);
- Sender sender(session.createSender(eventImpl.getReplyTo()));
- sender.send(msg);
- sender.close();
+ send(msg, eventImpl.getReplyTo());
QPID_LOG(trace, "SENT Exception to=" << eventImpl.getReplyTo());
}
@@ -461,10 +482,10 @@ void AgentSessionImpl::methodSuccess(AgentEvent& event)
Variant::Map map;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "response";
- headers["qmf.opcode"] = "_method_response";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
@@ -477,9 +498,7 @@ void AgentSessionImpl::methodSuccess(AgentEvent& event)
msg.setCorrelationId(eventImpl.getCorrelationId());
encode(map, msg);
- Sender sender(session.createSender(eventImpl.getReplyTo()));
- sender.send(msg);
- sender.close();
+ send(msg, eventImpl.getReplyTo());
QPID_LOG(trace, "SENT MethodResponse to=" << eventImpl.getReplyTo());
}
@@ -494,11 +513,11 @@ void AgentSessionImpl::raiseEvent(const Data& data)
// TODO: add severity.package.class to key
// or modify to send only to subscriptions with matching queries
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_event";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_DATA_INDICATION;
+ headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_EVENT;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
msg.setSubject("agent.ind.event");
encode(DataImplAccess::get(data).asMap(), msg);
@@ -573,22 +592,20 @@ void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const
Variant::Map map;
Variant::Map& headers(reply.getProperties());
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_agent_locate_response";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
map["_values"] = attributes;
- map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
- map["_values"].asMap()["heartbeat_interval"] = interval;
- map["_values"].asMap()["epoch"] = bootSequence;
- map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime;
+ map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
+ map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval;
+ map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence;
+ map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime;
encode(map, reply);
- Sender sender = session.createSender(msg.getReplyTo());
- sender.send(reply);
+ send(reply, msg.getReplyTo());
QPID_LOG(trace, "SENT AgentLocateResponse to=" << msg.getReplyTo());
- sender.close();
}
@@ -614,10 +631,6 @@ void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Me
}
eventImpl->setMethodName(iter->second.asString());
- iter = content.find("_object_id");
- if (iter != content.end())
- eventImpl->setDataAddr(DataAddr(new DataAddrImpl(iter->second.asMap())));
-
iter = content.find("_arguments");
if (iter != content.end())
eventImpl->setArguments(iter->second.asMap());
@@ -626,6 +639,29 @@ void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Me
if (iter != content.end())
eventImpl->setArgumentSubtypes(iter->second.asMap());
+ iter = content.find("_object_id");
+ if (iter != content.end()) {
+ DataAddr addr(new DataAddrImpl(iter->second.asMap()));
+ eventImpl->setDataAddr(addr);
+ DataIndex::const_iterator iter(globalIndex.find(addr));
+ if (iter == globalIndex.end()) {
+ AgentEvent event(eventImpl.release());
+ raiseException(event, "No data object found with the specified address");
+ return;
+ }
+
+ if (DataImplAccess::get(iter->second).getSchema().isValid())
+ for (Variant::Map::const_iterator aIter = eventImpl->getArguments().begin();
+ aIter != eventImpl->getArguments().end(); aIter++) {
+ const Schema& schema(DataImplAccess::get(iter->second).getSchema());
+ if (!SchemaImplAccess::get(schema).isValidMethodInArg(eventImpl->getMethodName(), aIter->first, aIter->second)) {
+ AgentEvent event(eventImpl.release());
+ raiseException(event, "Invalid argument: " + aIter->first);
+ return;
+ }
+ }
+ }
+
enqueueEvent(AgentEvent(eventImpl.release()));
}
@@ -668,19 +704,19 @@ void AgentSessionImpl::handleSchemaRequest(AgentEvent& event)
Variant::Map map;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "response";
- headers["qmf.opcode"] = "_query_response";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
{
qpid::sys::Mutex::ScopedLock l(lock);
if (query.getTarget() == QUERY_SCHEMA_ID) {
- headers["qmf.content"] = "_schema_id";
+ headers[protocol::HEADER_KEY_CONTENT] = "_schema_id";
for (iter = schemata.begin(); iter != schemata.end(); iter++)
content.push_back(SchemaIdImplAccess::get(iter->first).asMap());
} else if (query.getSchemaId().isValid()) {
- headers["qmf.content"] = "_schema";
+ headers[protocol::HEADER_KEY_CONTENT] = "_schema";
iter = schemata.find(query.getSchemaId());
if (iter != schemata.end())
content.push_back(SchemaImplAccess::get(iter->second).asMap());
@@ -698,9 +734,7 @@ void AgentSessionImpl::handleSchemaRequest(AgentEvent& event)
msg.setCorrelationId(eventImpl.getCorrelationId());
encode(content, msg);
- Sender sender(session.createSender(eventImpl.getReplyTo()));
- sender.send(msg);
- sender.close();
+ send(msg, eventImpl.getReplyTo());
QPID_LOG(trace, "SENT QueryResponse(Schema) to=" << eventImpl.getReplyTo());
}
@@ -744,13 +778,11 @@ void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, u
Message reply;
Variant::Map& headers(reply.getProperties());
- headers["qmf.agent"] = agentName;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
reply.setContent(replyContent);
- Sender sender = session.createSender(msg.getReplyTo());
- sender.send(reply);
+ send(reply, msg.getReplyTo());
QPID_LOG(trace, "SENT QMFv1 SchemaResponse to=" << msg.getReplyTo());
- sender.close();
}
@@ -759,12 +791,12 @@ void AgentSessionImpl::dispatch(Message msg)
const Variant::Map& properties(msg.getProperties());
Variant::Map::const_iterator iter;
- iter = properties.find("x-amqp-0-10.app-id");
- if (iter != properties.end() && iter->second.asString() == "qmf2") {
+ iter = properties.find(protocol::HEADER_KEY_APP_ID);
+ if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) {
//
// Dispatch a QMFv2 formatted message
//
- iter = properties.find("qmf.opcode");
+ iter = properties.find(protocol::HEADER_KEY_OPCODE);
if (iter == properties.end()) {
QPID_LOG(trace, "Message received with no 'qmf.opcode' header");
return;
@@ -776,7 +808,7 @@ void AgentSessionImpl::dispatch(Message msg)
Variant::List content;
decode(msg, content);
- if (opcode == "_agent_locate_request") handleLocateRequest(content, msg);
+ if (opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST) handleLocateRequest(content, msg);
else {
QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/list' content: " << opcode);
}
@@ -785,8 +817,8 @@ void AgentSessionImpl::dispatch(Message msg)
Variant::Map content;
decode(msg, content);
- if (opcode == "_method_request") handleMethodRequest(content, msg);
- else if (opcode == "_query_request") handleQueryRequest(content, msg);
+ if (opcode == protocol::HEADER_OPCODE_METHOD_REQUEST) handleMethodRequest(content, msg);
+ else if (opcode == protocol::HEADER_OPCODE_QUERY_REQUEST) handleQueryRequest(content, msg);
else {
QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/map' content: " << opcode);
}
@@ -835,17 +867,17 @@ void AgentSessionImpl::sendHeartbeat()
}
}
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_agent_heartbeat_indication";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
msg.setSubject(address.str());
map["_values"] = attributes;
- map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
- map["_values"].asMap()["heartbeat_interval"] = interval;
- map["_values"].asMap()["epoch"] = bootSequence;
- map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime;
+ map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
+ map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval;
+ map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence;
+ map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime;
encode(map, msg);
topicSender.send(msg);
@@ -853,19 +885,36 @@ void AgentSessionImpl::sendHeartbeat()
}
+void AgentSessionImpl::send(Message msg, const Address& to)
+{
+ Sender sender;
+
+ if (to.getName() == directBase) {
+ msg.setSubject(to.getSubject());
+ sender = directSender;
+ } else if (to.getName() == topicBase) {
+ msg.setSubject(to.getSubject());
+ sender = topicSender;
+ } else
+ sender = session.createSender(to);
+
+ sender.send(msg);
+}
+
+
void AgentSessionImpl::flushResponses(AgentEvent& event, bool final)
{
Message msg;
Variant::Map map;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "response";
- headers["qmf.opcode"] = "_query_response";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = agentName;
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE;
+ headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
if (!final)
- headers["partial"] = Variant();
+ headers[protocol::HEADER_KEY_PARTIAL] = Variant();
Variant::List body;
AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
@@ -878,9 +927,7 @@ void AgentSessionImpl::flushResponses(AgentEvent& event, bool final)
msg.setCorrelationId(eventImpl.getCorrelationId());
encode(body, msg);
- Sender sender(session.createSender(eventImpl.getReplyTo()));
- sender.send(msg);
- sender.close();
+ send(msg, eventImpl.getReplyTo());
QPID_LOG(trace, "SENT QueryResponse to=" << eventImpl.getReplyTo());
}
@@ -894,6 +941,7 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds)
//
if (seconds == lastVisit)
return;
+ //uint64_t thisInterval(seconds - lastVisit);
lastVisit = seconds;
//
diff --git a/cpp/src/qmf/AgentSubscription.cpp b/cpp/src/qmf/AgentSubscription.cpp
new file mode 100644
index 0000000000..4dc5cb74a4
--- /dev/null
+++ b/cpp/src/qmf/AgentSubscription.cpp
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/AgentSubscription.h"
+
+using namespace qmf;
+
+AgentSubscription::AgentSubscription(uint64_t _id, uint64_t _interval, uint64_t _life,
+ const std::string& _replyTo, const std::string& _cid, Query _query) :
+ id(_id), interval(_interval), lifetime(_life), timeSincePublish(0), timeSinceKeepalive(0),
+ replyTo(_replyTo), cid(_cid), query(_query)
+{
+}
+
+
+AgentSubscription::~AgentSubscription()
+{
+}
+
+
+bool AgentSubscription::tick(uint64_t seconds)
+{
+ timeSinceKeepalive += seconds;
+ if (timeSinceKeepalive >= lifetime)
+ return false;
+
+ timeSincePublish += seconds;
+ if (timeSincePublish >= interval) {
+ }
+
+ return true;
+}
+
diff --git a/cpp/src/qmf/AgentSubscription.h b/cpp/src/qmf/AgentSubscription.h
new file mode 100644
index 0000000000..01e8f43e9f
--- /dev/null
+++ b/cpp/src/qmf/AgentSubscription.h
@@ -0,0 +1,52 @@
+#ifndef _QMF_AGENT_SUBSCRIPTION_H_
+#define _QMF_AGENT_SUBSCRIPTION_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/types/Variant.h"
+#include "qmf/Query.h"
+#include "qmf/Data.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+ class AgentSubscription {
+ public:
+ AgentSubscription(uint64_t _id, uint64_t _interval, uint64_t _life,
+ const std::string& _replyTo, const std::string& _cid, Query _query);
+ ~AgentSubscription();
+ bool tick(uint64_t seconds);
+ void keepalive() { timeSinceKeepalive = 0; }
+
+ private:
+ uint64_t id;
+ uint64_t interval;
+ uint64_t lifetime;
+ uint64_t timeSincePublish;
+ uint64_t timeSinceKeepalive;
+ const std::string replyTo;
+ const std::string cid;
+ Query query;
+ };
+
+}
+
+#endif
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp
index dc2bbe34ee..f327170c5e 100644
--- a/cpp/src/qmf/ConsoleSession.cpp
+++ b/cpp/src/qmf/ConsoleSession.cpp
@@ -25,14 +25,19 @@
#include "qmf/SchemaId.h"
#include "qmf/SchemaImpl.h"
#include "qmf/ConsoleEventImpl.h"
+#include "qmf/constants.h"
#include "qpid/log/Statement.h"
#include "qpid/messaging/AddressParser.h"
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Receiver.h"
using namespace std;
-using namespace qpid::messaging;
using namespace qmf;
+using qpid::messaging::Address;
+using qpid::messaging::Connection;
+using qpid::messaging::Receiver;
+using qpid::messaging::Duration;
+using qpid::messaging::Message;
using qpid::types::Variant;
typedef qmf::PrivateImplRef<ConsoleSession> PI;
@@ -51,6 +56,8 @@ bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextE
uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); }
Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); }
Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); }
+Subscription ConsoleSession::subscribe(const Query& q, const string& f, const string& o) { return impl->subscribe(q, f, o); }
+Subscription ConsoleSession::subscribe(const string& q, const string& f, const string& o) { return impl->subscribe(q, f, o); }
//========================================================================================
// Impl Method Bodies
@@ -227,6 +234,18 @@ Agent ConsoleSessionImpl::getAgent(uint32_t i) const
}
+Subscription ConsoleSessionImpl::subscribe(const Query&, const string&, const string&)
+{
+ return Subscription();
+}
+
+
+Subscription ConsoleSessionImpl::subscribe(const string&, const string&, const string&)
+{
+ return Subscription();
+}
+
+
void ConsoleSessionImpl::enqueueEvent(const ConsoleEvent& event)
{
qpid::sys::Mutex::ScopedLock l(lock);
@@ -249,17 +268,17 @@ void ConsoleSessionImpl::dispatch(Message msg)
Variant::Map::const_iterator iter;
Variant::Map::const_iterator oiter;
- oiter = properties.find("qmf.opcode");
- iter = properties.find("x-amqp-0-10.app-id");
+ oiter = properties.find(protocol::HEADER_KEY_OPCODE);
+ iter = properties.find(protocol::HEADER_KEY_APP_ID);
if (iter == properties.end())
iter = properties.find("app_id");
- if (iter != properties.end() && iter->second.asString() == "qmf2" && oiter != properties.end()) {
+ if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF && oiter != properties.end()) {
//
// Dispatch a QMFv2 formatted message
//
const string& opcode = oiter->second.asString();
- iter = properties.find("qmf.agent");
+ iter = properties.find(protocol::HEADER_KEY_AGENT);
if (iter == properties.end()) {
QPID_LOG(trace, "Message received with no 'qmf.agent' header");
return;
@@ -277,7 +296,7 @@ void ConsoleSessionImpl::dispatch(Message msg)
}
if (msg.getContentType() == "amqp/map" &&
- (opcode == "_agent_heartbeat_indication" || opcode == "_agent_locate_response")) {
+ (opcode == protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION || opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE)) {
//
// This is the one case where it's ok (necessary actually) to receive a QMFv2
// message from an unknown agent (how else are they going to get known?)
@@ -297,8 +316,8 @@ void ConsoleSessionImpl::dispatch(Message msg)
Variant::Map content;
decode(msg, content);
- if (opcode == "_exception") agentImpl.handleException(content, msg);
- else if (opcode == "_method_response") agentImpl.handleMethodResponse(content, msg);
+ if (opcode == protocol::HEADER_OPCODE_EXCEPTION) agentImpl.handleException(content, msg);
+ else if (opcode == protocol::HEADER_OPCODE_METHOD_RESPONSE) agentImpl.handleMethodResponse(content, msg);
else
QPID_LOG(error, "Received a map-formatted QMFv2 message with opcode=" << opcode);
@@ -309,8 +328,8 @@ void ConsoleSessionImpl::dispatch(Message msg)
Variant::List content;
decode(msg, content);
- if (opcode == "_query_response") agentImpl.handleQueryResponse(content, msg);
- else if (opcode == "_data_indication") agentImpl.handleDataIndication(content, msg);
+ if (opcode == protocol::HEADER_OPCODE_QUERY_RESPONSE) agentImpl.handleQueryResponse(content, msg);
+ else if (opcode == protocol::HEADER_OPCODE_DATA_INDICATION) agentImpl.handleDataIndication(content, msg);
else
QPID_LOG(error, "Received a list-formatted QMFv2 message with opcode=" << opcode);
@@ -344,9 +363,9 @@ void ConsoleSessionImpl::sendBrokerLocate()
Message msg;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "request";
- headers["qmf.opcode"] = "_agent_locate_request";
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
msg.setReplyTo(replyAddress);
msg.setCorrelationId("broker-locate");
@@ -363,9 +382,9 @@ void ConsoleSessionImpl::sendAgentLocate()
Message msg;
Variant::Map& headers(msg.getProperties());
- headers["method"] = "request";
- headers["qmf.opcode"] = "_agent_locate_request";
- headers["x-amqp-0-10.app-id"] = "qmf2";
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
msg.setReplyTo(replyAddress);
msg.setCorrelationId("agent-locate");
@@ -390,13 +409,17 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
return;
Variant::Map attrs(iter->second.asMap());
- iter = attrs.find("epoch");
+ iter = attrs.find(protocol::AGENT_ATTR_EPOCH);
if (iter != attrs.end())
epoch = iter->second.asUint32();
if (cid == "broker-locate") {
qpid::sys::Mutex::ScopedLock l(lock);
- agent = Agent(new AgentImpl(agentName, epoch, *this));
+ auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
+ for (iter = attrs.begin(); iter != attrs.end(); iter++)
+ if (iter->first != protocol::AGENT_ATTR_EPOCH)
+ impl->setAttribute(iter->first, iter->second);
+ agent = Agent(impl.release());
connectedBrokerAgent = agent;
if (!agentQuery || agentQuery.matchesPredicate(attrs)) {
connectedBrokerInAgentList = true;
@@ -430,7 +453,7 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
//
auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
for (iter = attrs.begin(); iter != attrs.end(); iter++)
- if (iter->first != "epoch")
+ if (iter->first != protocol::AGENT_ATTR_EPOCH)
impl->setAttribute(iter->first, iter->second);
agent = Agent(impl.release());
agents[agentName] = agent;
@@ -459,16 +482,17 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
enqueueEventLH(ConsoleEvent(eventImpl.release()));
}
- iter = attrs.find("schemaUpdated");
+ iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP);
if (iter != attrs.end()) {
uint64_t ts(iter->second.asUint64());
- if (ts > impl.getAttribute("schemaUpdated").asUint64()) {
+ if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) {
//
// The agent has added new schema entries since we last heard from it.
// Enqueue a notification.
//
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
eventImpl->setAgent(agent);
+ impl.setAttribute(iter->first, iter->second);
enqueueEventLH(ConsoleEvent(eventImpl.release()));
}
}
diff --git a/cpp/src/qmf/ConsoleSessionImpl.h b/cpp/src/qmf/ConsoleSessionImpl.h
index 3d10b63c69..85ddc820f4 100644
--- a/cpp/src/qmf/ConsoleSessionImpl.h
+++ b/cpp/src/qmf/ConsoleSessionImpl.h
@@ -61,6 +61,8 @@ namespace qmf {
uint32_t getAgentCount() const;
Agent getAgent(uint32_t i) const;
Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; }
+ Subscription subscribe(const Query&, const std::string& agentFilter, const std::string& options);
+ Subscription subscribe(const std::string&, const std::string& agentFilter, const std::string& options);
protected:
mutable qpid::sys::Mutex lock;
diff --git a/cpp/src/qmf/Data.cpp b/cpp/src/qmf/Data.cpp
index 0ceca6e1e9..c503bab445 100644
--- a/cpp/src/qmf/Data.cpp
+++ b/cpp/src/qmf/Data.cpp
@@ -21,8 +21,10 @@
#include "qmf/DataImpl.h"
#include "qmf/DataAddrImpl.h"
+#include "qmf/SchemaImpl.h"
#include "qmf/SchemaIdImpl.h"
#include "qmf/PrivateImplRef.h"
+#include "qmf/SchemaProperty.h"
using namespace std;
using namespace qmf;
@@ -35,8 +37,7 @@ Data::Data(const Data& s) : qmf::Handle<DataImpl>() { PI::copy(*this, s); }
Data::~Data() { PI::dtor(*this); }
Data& Data::operator=(const Data& s) { return PI::assign(*this, s); }
-Data::Data(const SchemaId& s) { PI::ctor(*this, new DataImpl(s)); }
-void Data::setSchema(const SchemaId& s) { impl->setSchema(s); }
+Data::Data(const Schema& s) { PI::ctor(*this, new DataImpl(s)); }
void Data::setAddr(const DataAddr& a) { impl->setAddr(a); }
void Data::setProperty(const string& k, const qpid::types::Variant& v) { impl->setProperty(k, v); }
void Data::overwriteProperties(const qpid::types::Variant::Map& m) { impl->overwriteProperties(m); }
@@ -103,6 +104,20 @@ Variant::Map DataImpl::asMap() const
}
+void DataImpl::setProperty(const std::string& k, const qpid::types::Variant& v)
+{
+ if (schema.isValid()) {
+ //
+ // If we have a valid schema, make sure that the property is included in the
+ // schema and that the variant type is compatible with the schema type.
+ //
+ if (!SchemaImplAccess::get(schema).isValidProperty(k, v))
+ throw QmfException("Property '" + k + "' either not in the schema or value is of incompatible type");
+ }
+ properties[k] = v;
+}
+
+
DataImpl& DataImplAccess::get(Data& item)
{
return *item.impl;
diff --git a/cpp/src/qmf/DataImpl.h b/cpp/src/qmf/DataImpl.h
index 38b62791fc..4ac3197da0 100644
--- a/cpp/src/qmf/DataImpl.h
+++ b/cpp/src/qmf/DataImpl.h
@@ -24,8 +24,10 @@
#include "qpid/RefCounted.h"
#include "qmf/Data.h"
#include "qmf/SchemaId.h"
+#include "qmf/Schema.h"
#include "qmf/DataAddr.h"
#include "qmf/Agent.h"
+#include "qmf/AgentSubscription.h"
#include "qpid/types/Variant.h"
namespace qmf {
@@ -37,18 +39,21 @@ namespace qmf {
DataImpl(const qpid::types::Variant::Map&, const Agent&);
qpid::types::Variant::Map asMap() const;
DataImpl() {}
+ void addSubscription(boost::shared_ptr<AgentSubscription>);
+ void delSubscription(uint64_t);
+ qpid::types::Variant::Map publishSubscription(uint64_t);
+ const Schema& getSchema() const { return schema; }
//
// Methods from API handle
//
- DataImpl(const SchemaId& s) : schemaId(s) {}
- void setSchema(const SchemaId& s) { schemaId = s; }
+ DataImpl(const Schema& s) : schema(s) {}
void setAddr(const DataAddr& a) { dataAddr = a; }
- void setProperty(const std::string& k, const qpid::types::Variant& v) { properties[k] = v; }
+ void setProperty(const std::string& k, const qpid::types::Variant& v);
void overwriteProperties(const qpid::types::Variant::Map& m);
- bool hasSchema() const { return schemaId.isValid(); }
+ bool hasSchema() const { return schemaId.isValid() || schema.isValid(); }
bool hasAddr() const { return dataAddr.isValid(); }
- const SchemaId& getSchemaId() const { return schemaId; }
+ const SchemaId& getSchemaId() const { if (schema.isValid()) return schema.getSchemaId(); else return schemaId; }
const DataAddr& getAddr() const { return dataAddr; }
const qpid::types::Variant& getProperty(const std::string& k) const;
const qpid::types::Variant::Map& getProperties() const { return properties; }
@@ -56,7 +61,14 @@ namespace qmf {
const Agent& getAgent() const { return agent; }
private:
+ struct Subscr {
+ boost::shared_ptr<AgentSubscription> subscription;
+ qpid::types::Variant::Map deltas;
+ };
+ std::map<uint64_t, boost::shared_ptr<Subscr> > subscriptions;
+
SchemaId schemaId;
+ Schema schema;
DataAddr dataAddr;
qpid::types::Variant::Map properties;
Agent agent;
diff --git a/cpp/src/qmf/Schema.cpp b/cpp/src/qmf/Schema.cpp
index e003f9d06f..f25eb4635b 100644
--- a/cpp/src/qmf/Schema.cpp
+++ b/cpp/src/qmf/Schema.cpp
@@ -192,6 +192,55 @@ string SchemaImpl::asV1Content(uint32_t sequence) const
}
+bool SchemaImpl::isValidProperty(const std::string& k, const Variant& v) const
+{
+ for (list<SchemaProperty>::const_iterator iter = properties.begin(); iter != properties.end(); iter++)
+ if (iter->getName() == k)
+ return (isCompatibleType(iter->getType(), v.getType()));
+ return false;
+}
+
+
+bool SchemaImpl::isValidMethodInArg(const std::string& m, const std::string& k, const Variant& v) const
+{
+ for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++) {
+ if (mIter->getName() == m) {
+ uint32_t count(mIter->getArgumentCount());
+ for (uint32_t i = 0; i < count; i++) {
+ const SchemaProperty prop(mIter->getArgument(i));
+ if (prop.getName() == k) {
+ if (prop.getDirection() == DIR_IN || prop.getDirection() == DIR_IN_OUT)
+ return (isCompatibleType(prop.getType(), v.getType()));
+ else
+ return false;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+
+bool SchemaImpl::isValidMethodOutArg(const std::string& m, const std::string& k, const Variant& v) const
+{
+ for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++) {
+ if (mIter->getName() == m) {
+ uint32_t count(mIter->getArgumentCount());
+ for (uint32_t i = 0; i < count; i++) {
+ const SchemaProperty prop(mIter->getArgument(i));
+ if (prop.getName() == k) {
+ if (prop.getDirection() == DIR_OUT || prop.getDirection() == DIR_IN_OUT)
+ return (isCompatibleType(prop.getType(), v.getType()));
+ else
+ return false;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+
void SchemaImpl::finalize()
{
Hash hash;
@@ -246,6 +295,57 @@ void SchemaImpl::checkNotFinal() const
}
+bool SchemaImpl::isCompatibleType(int qmfType, qpid::types::VariantType qpidType) const
+{
+ bool typeValid(false);
+
+ switch (qpidType) {
+ case qpid::types::VAR_VOID:
+ if (qmfType == SCHEMA_DATA_VOID)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_BOOL:
+ if (qmfType == SCHEMA_DATA_BOOL)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_UINT8:
+ case qpid::types::VAR_UINT16:
+ case qpid::types::VAR_UINT32:
+ case qpid::types::VAR_UINT64:
+ case qpid::types::VAR_INT8:
+ case qpid::types::VAR_INT16:
+ case qpid::types::VAR_INT32:
+ case qpid::types::VAR_INT64:
+ if (qmfType == SCHEMA_DATA_INT)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_FLOAT:
+ case qpid::types::VAR_DOUBLE:
+ if (qmfType == SCHEMA_DATA_FLOAT)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_STRING:
+ if (qmfType == SCHEMA_DATA_STRING)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_MAP:
+ if (qmfType == SCHEMA_DATA_BOOL)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_LIST:
+ if (qmfType == SCHEMA_DATA_LIST)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_UUID:
+ if (qmfType == SCHEMA_DATA_UUID)
+ typeValid = true;
+ break;
+ }
+
+ return typeValid;
+}
+
+
SchemaImpl& SchemaImplAccess::get(Schema& item)
{
return *item.impl;
diff --git a/cpp/src/qmf/SchemaImpl.h b/cpp/src/qmf/SchemaImpl.h
index 267a5d5138..1c88f87808 100644
--- a/cpp/src/qmf/SchemaImpl.h
+++ b/cpp/src/qmf/SchemaImpl.h
@@ -46,6 +46,9 @@ namespace qmf {
qpid::types::Variant::Map asMap() const;
SchemaImpl(qpid::management::Buffer& v1Buffer);
std::string asV1Content(uint32_t sequence) const;
+ bool isValidProperty(const std::string& k, const qpid::types::Variant& v) const;
+ bool isValidMethodInArg(const std::string& m, const std::string& k, const qpid::types::Variant& v) const;
+ bool isValidMethodOutArg(const std::string& m, const std::string& k, const qpid::types::Variant& v) const;
//
// Methods from API handle
@@ -79,6 +82,7 @@ namespace qmf {
void checkFinal() const;
void checkNotFinal() const;
+ bool isCompatibleType(int qmfType, qpid::types::VariantType qpidType) const;
};
struct SchemaImplAccess
diff --git a/cpp/src/qmf/Subscription.cpp b/cpp/src/qmf/Subscription.cpp
new file mode 100644
index 0000000000..73afc8c79d
--- /dev/null
+++ b/cpp/src/qmf/Subscription.cpp
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/PrivateImplRef.h"
+#include "qmf/exceptions.h"
+#include "qmf/SubscriptionImpl.h"
+#include "qmf/DataImpl.h"
+
+using namespace std;
+using namespace qmf;
+using qpid::types::Variant;
+
+typedef PrivateImplRef<Subscription> PI;
+
+Subscription::Subscription(SubscriptionImpl* impl) { PI::ctor(*this, impl); }
+Subscription::Subscription(const Subscription& s) : qmf::Handle<SubscriptionImpl>() { PI::copy(*this, s); }
+Subscription::~Subscription() { PI::dtor(*this); }
+Subscription& Subscription::operator=(const Subscription& s) { return PI::assign(*this, s); }
+
+void Subscription::cancel() { impl->cancel(); }
+bool Subscription::isActive() const { return impl->isActive(); }
+void Subscription::lock() { impl->lock(); }
+void Subscription::unlock() { impl->unlock(); }
+uint32_t Subscription::getDataCount() const { return impl->getDataCount(); }
+Data Subscription::getData(uint32_t i) const { return impl->getData(i); }
+
+
+void SubscriptionImpl::cancel()
+{
+}
+
+
+bool SubscriptionImpl::isActive() const
+{
+ return false;
+}
+
+
+void SubscriptionImpl::lock()
+{
+}
+
+
+void SubscriptionImpl::unlock()
+{
+}
+
+
+uint32_t SubscriptionImpl::getDataCount() const
+{
+ return 0;
+}
+
+
+Data SubscriptionImpl::getData(uint32_t) const
+{
+ return Data();
+}
+
+
+SubscriptionImpl& SubscriptionImplAccess::get(Subscription& item)
+{
+ return *item.impl;
+}
+
+
+const SubscriptionImpl& SubscriptionImplAccess::get(const Subscription& item)
+{
+ return *item.impl;
+}
diff --git a/cpp/src/qmf/SubscriptionImpl.h b/cpp/src/qmf/SubscriptionImpl.h
new file mode 100644
index 0000000000..053e3cd00e
--- /dev/null
+++ b/cpp/src/qmf/SubscriptionImpl.h
@@ -0,0 +1,57 @@
+#ifndef _QMF_SUBSCRIPTION_IMPL_H_
+#define _QMF_SUBSCRIPTION_IMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/Subscription.h"
+
+namespace qmf {
+ class SubscriptionImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Public impl-only methods
+ //
+ SubscriptionImpl(int p) : placeholder(p) {}
+ ~SubscriptionImpl();
+
+ //
+ // Methods from API handle
+ //
+ void cancel();
+ bool isActive() const;
+ void lock();
+ void unlock();
+ uint32_t getDataCount() const;
+ Data getData(uint32_t) const;
+
+ private:
+ int placeholder;
+ };
+
+ struct SubscriptionImplAccess
+ {
+ static SubscriptionImpl& get(Subscription&);
+ static const SubscriptionImpl& get(const Subscription&);
+ };
+}
+
+#endif
diff --git a/cpp/src/qmf/constants.cpp b/cpp/src/qmf/constants.cpp
new file mode 100644
index 0000000000..6e2fd935a9
--- /dev/null
+++ b/cpp/src/qmf/constants.cpp
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "constants.h"
+
+using namespace std;
+using namespace qmf;
+
+/**
+ * Header key strings
+ */
+const string protocol::HEADER_KEY_APP_ID = "x-amqp-0-10.app-id";
+const string protocol::HEADER_KEY_METHOD = "method";
+const string protocol::HEADER_KEY_OPCODE = "qmf.opcode";
+const string protocol::HEADER_KEY_AGENT = "qmf.agent";
+const string protocol::HEADER_KEY_CONTENT = "qmf.content";
+const string protocol::HEADER_KEY_PARTIAL = "partial";
+
+/**
+ * Header values per-key
+ */
+const string protocol::HEADER_APP_ID_QMF = "qmf2";
+
+const string protocol::HEADER_METHOD_REQUEST = "request";
+const string protocol::HEADER_METHOD_RESPONSE = "response";
+const string protocol::HEADER_METHOD_INDICATION = "indication";
+
+const string protocol::HEADER_OPCODE_EXCEPTION = "_exception";
+const string protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST = "_agent_locate_request";
+const string protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE = "_agent_locate_response";
+const string protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION = "_agent_heartbeat_indication";
+const string protocol::HEADER_OPCODE_QUERY_REQUEST = "_query_request";
+const string protocol::HEADER_OPCODE_QUERY_RESPONSE = "_query_response";
+const string protocol::HEADER_OPCODE_SUBSCRIBE_REQUEST = "_subscribe_request";
+const string protocol::HEADER_OPCODE_SUBSCRIBE_RESPONSE = "_subscribe_response";
+const string protocol::HEADER_OPCODE_SUBSCRIBE_CANCEL_INDICATION = "_subscribe_cancel_indication";
+const string protocol::HEADER_OPCODE_SUBSCRIBE_REFRESH_INDICATION = "_subscribe_refresh_indication";
+const string protocol::HEADER_OPCODE_DATA_INDICATION = "_data_indication";
+const string protocol::HEADER_OPCODE_METHOD_REQUEST = "_method_request";
+const string protocol::HEADER_OPCODE_METHOD_RESPONSE = "_method_response";
+
+const string protocol::HEADER_CONTENT_SCHEMA_ID = "_schema_id";
+const string protocol::HEADER_CONTENT_SCHEMA_CLASS = "_schema_class";
+const string protocol::HEADER_CONTENT_OBJECT_ID = "_object_id";
+const string protocol::HEADER_CONTENT_DATA = "_data";
+const string protocol::HEADER_CONTENT_EVENT = "_event";
+const string protocol::HEADER_CONTENT_QUERY = "_query";
+
+/**
+ * Keywords for Agent attributes
+ */
+const string protocol::AGENT_ATTR_VENDOR = "_vendor";
+const string protocol::AGENT_ATTR_PRODUCT = "_product";
+const string protocol::AGENT_ATTR_INSTANCE = "_instance";
+const string protocol::AGENT_ATTR_NAME = "_name";
+const string protocol::AGENT_ATTR_TIMESTAMP = "_timestamp";
+const string protocol::AGENT_ATTR_HEARTBEAT_INTERVAL = "_heartbeat_interval";
+const string protocol::AGENT_ATTR_EPOCH = "_epoch";
+const string protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP = "_schema_updated";
diff --git a/cpp/src/qmf/constants.h b/cpp/src/qmf/constants.h
new file mode 100644
index 0000000000..79beaaf1ca
--- /dev/null
+++ b/cpp/src/qmf/constants.h
@@ -0,0 +1,83 @@
+#ifndef QMF_CONSTANTS_H
+#define QMF_CONSTANTS_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+
+namespace qmf {
+
+ struct protocol {
+ /**
+ * Header key strings
+ */
+ static const std::string HEADER_KEY_APP_ID;
+ static const std::string HEADER_KEY_METHOD;
+ static const std::string HEADER_KEY_OPCODE;
+ static const std::string HEADER_KEY_AGENT;
+ static const std::string HEADER_KEY_CONTENT;
+ static const std::string HEADER_KEY_PARTIAL;
+
+ /**
+ * Header values per-key
+ */
+ static const std::string HEADER_APP_ID_QMF;
+
+ static const std::string HEADER_METHOD_REQUEST;
+ static const std::string HEADER_METHOD_RESPONSE;
+ static const std::string HEADER_METHOD_INDICATION;
+
+ static const std::string HEADER_OPCODE_EXCEPTION;
+ static const std::string HEADER_OPCODE_AGENT_LOCATE_REQUEST;
+ static const std::string HEADER_OPCODE_AGENT_LOCATE_RESPONSE;
+ static const std::string HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION;
+ static const std::string HEADER_OPCODE_QUERY_REQUEST;
+ static const std::string HEADER_OPCODE_QUERY_RESPONSE;
+ static const std::string HEADER_OPCODE_SUBSCRIBE_REQUEST;
+ static const std::string HEADER_OPCODE_SUBSCRIBE_RESPONSE;
+ static const std::string HEADER_OPCODE_SUBSCRIBE_CANCEL_INDICATION;
+ static const std::string HEADER_OPCODE_SUBSCRIBE_REFRESH_INDICATION;
+ static const std::string HEADER_OPCODE_DATA_INDICATION;
+ static const std::string HEADER_OPCODE_METHOD_REQUEST;
+ static const std::string HEADER_OPCODE_METHOD_RESPONSE;
+
+ static const std::string HEADER_CONTENT_SCHEMA_ID;
+ static const std::string HEADER_CONTENT_SCHEMA_CLASS;
+ static const std::string HEADER_CONTENT_OBJECT_ID;
+ static const std::string HEADER_CONTENT_DATA;
+ static const std::string HEADER_CONTENT_EVENT;
+ static const std::string HEADER_CONTENT_QUERY;
+
+ /**
+ * Keywords for Agent attributes
+ */
+ static const std::string AGENT_ATTR_VENDOR;
+ static const std::string AGENT_ATTR_PRODUCT;
+ static const std::string AGENT_ATTR_INSTANCE;
+ static const std::string AGENT_ATTR_NAME;
+ static const std::string AGENT_ATTR_TIMESTAMP;
+ static const std::string AGENT_ATTR_HEARTBEAT_INTERVAL;
+ static const std::string AGENT_ATTR_EPOCH;
+ static const std::string AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP;
+ };
+}
+
+#endif