summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf')
-rw-r--r--qpid/cpp/src/qmf/Agent.cpp640
-rw-r--r--qpid/cpp/src/qmf/AgentEvent.cpp85
-rw-r--r--qpid/cpp/src/qmf/AgentEventImpl.h96
-rw-r--r--qpid/cpp/src/qmf/AgentImpl.h122
-rw-r--r--qpid/cpp/src/qmf/AgentSession.cpp1031
-rw-r--r--qpid/cpp/src/qmf/AgentSessionImpl.h168
-rw-r--r--qpid/cpp/src/qmf/AgentSubscription.cpp51
-rw-r--r--qpid/cpp/src/qmf/AgentSubscription.h52
-rw-r--r--qpid/cpp/src/qmf/ConsoleEvent.cpp82
-rw-r--r--qpid/cpp/src/qmf/ConsoleEventImpl.h84
-rw-r--r--qpid/cpp/src/qmf/ConsoleSession.cpp683
-rw-r--r--qpid/cpp/src/qmf/ConsoleSessionImpl.h127
-rw-r--r--qpid/cpp/src/qmf/Data.cpp130
-rw-r--r--qpid/cpp/src/qmf/DataAddr.cpp108
-rw-r--r--qpid/cpp/src/qmf/DataAddrImpl.h73
-rw-r--r--qpid/cpp/src/qmf/DataImpl.h84
-rw-r--r--qpid/cpp/src/qmf/EventNotifierImpl.cpp60
-rw-r--r--qpid/cpp/src/qmf/EventNotifierImpl.h48
-rw-r--r--qpid/cpp/src/qmf/Expression.cpp441
-rw-r--r--qpid/cpp/src/qmf/Expression.h73
-rw-r--r--qpid/cpp/src/qmf/Hash.cpp45
-rw-r--r--qpid/cpp/src/qmf/Hash.h44
-rw-r--r--qpid/cpp/src/qmf/PosixEventNotifier.cpp65
-rw-r--r--qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp112
-rw-r--r--qpid/cpp/src/qmf/PosixEventNotifierImpl.h61
-rw-r--r--qpid/cpp/src/qmf/PrivateImplRef.h93
-rw-r--r--qpid/cpp/src/qmf/Query.cpp153
-rw-r--r--qpid/cpp/src/qmf/QueryImpl.h77
-rw-r--r--qpid/cpp/src/qmf/Schema.cpp358
-rw-r--r--qpid/cpp/src/qmf/SchemaCache.cpp91
-rw-r--r--qpid/cpp/src/qmf/SchemaCache.h56
-rw-r--r--qpid/cpp/src/qmf/SchemaId.cpp96
-rw-r--r--qpid/cpp/src/qmf/SchemaIdImpl.h83
-rw-r--r--qpid/cpp/src/qmf/SchemaImpl.h95
-rw-r--r--qpid/cpp/src/qmf/SchemaMethod.cpp186
-rw-r--r--qpid/cpp/src/qmf/SchemaMethodImpl.h75
-rw-r--r--qpid/cpp/src/qmf/SchemaProperty.cpp434
-rw-r--r--qpid/cpp/src/qmf/SchemaPropertyImpl.h93
-rw-r--r--qpid/cpp/src/qmf/Subscription.cpp88
-rw-r--r--qpid/cpp/src/qmf/SubscriptionImpl.h57
-rw-r--r--qpid/cpp/src/qmf/agentCapability.h39
-rw-r--r--qpid/cpp/src/qmf/constants.cpp77
-rw-r--r--qpid/cpp/src/qmf/constants.h83
-rw-r--r--qpid/cpp/src/qmf/exceptions.cpp37
44 files changed, 6836 insertions, 0 deletions
diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp
new file mode 100644
index 0000000000..fa3987e0c9
--- /dev/null
+++ b/qpid/cpp/src/qmf/Agent.cpp
@@ -0,0 +1,640 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/AgentImpl.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/ConsoleEventImpl.h"
+#include "qmf/ConsoleSession.h"
+#include "qmf/DataImpl.h"
+#include "qmf/Query.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/agentCapability.h"
+#include "qmf/constants.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/AddressParser.h"
+#include "qpid/management/Buffer.h"
+#include "qpid/log/Statement.h"
+#include <boost/lexical_cast.hpp>
+
+using qpid::types::Variant;
+using qpid::messaging::Duration;
+using qpid::messaging::Message;
+using qpid::messaging::Sender;
+using namespace std;
+using namespace qmf;
+
+typedef PrivateImplRef<Agent> PI;
+
+Agent::Agent(AgentImpl* impl) { PI::ctor(*this, impl); }
+Agent::Agent(const Agent& s) : qmf::Handle<AgentImpl>() { PI::copy(*this, s); }
+Agent::~Agent() { PI::dtor(*this); }
+Agent& Agent::operator=(const Agent& s) { return PI::assign(*this, s); }
+string Agent::getName() const { return isValid() ? impl->getName() : ""; }
+uint32_t Agent::getEpoch() const { return isValid() ? impl->getEpoch() : 0; }
+string Agent::getVendor() const { return isValid() ? impl->getVendor() : ""; }
+string Agent::getProduct() const { return isValid() ? impl->getProduct() : ""; }
+string Agent::getInstance() const { return isValid() ? impl->getInstance() : ""; }
+const Variant& Agent::getAttribute(const string& k) const { return impl->getAttribute(k); }
+const Variant::Map& Agent::getAttributes() const { return impl->getAttributes(); }
+ConsoleEvent Agent::querySchema(Duration t) { return impl->querySchema(t); }
+uint32_t Agent::querySchemaAsync() { return impl->querySchemaAsync(); }
+ConsoleEvent Agent::query(const Query& q, Duration t) { return impl->query(q, t); }
+ConsoleEvent Agent::query(const string& q, Duration t) { return impl->query(q, t); }
+uint32_t Agent::queryAsync(const Query& q) { return impl->queryAsync(q); }
+uint32_t Agent::queryAsync(const string& q) { return impl->queryAsync(q); }
+ConsoleEvent Agent::callMethod(const string& m, const Variant::Map& a, const DataAddr& d, Duration t) { return impl->callMethod(m, a, d, t); }
+uint32_t Agent::callMethodAsync(const string& m, const Variant::Map& a, const DataAddr& d) { return impl->callMethodAsync(m, a, d); }
+uint32_t Agent::getPackageCount() const { return impl->getPackageCount(); }
+const string& Agent::getPackage(uint32_t i) const { return impl->getPackage(i); }
+uint32_t Agent::getSchemaIdCount(const string& p) const { return impl->getSchemaIdCount(p); }
+SchemaId Agent::getSchemaId(const string& p, uint32_t i) const { return impl->getSchemaId(p, i); }
+Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema(s, t); }
+
+
+
+AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) :
+ name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
+ sender(session.directSender), schemaCache(s.schemaCache)
+{
+}
+
+void AgentImpl::setAttribute(const std::string& k, const qpid::types::Variant& v)
+{
+ attributes[k] = v;
+ if (k == "qmf.agent_capability")
+ try {
+ capability = v.asUint32();
+ } catch (std::exception&) {}
+ if (k == "_direct_subject")
+ try {
+ directSubject = v.asString();
+ sender = session.topicSender;
+ } catch (std::exception&) {}
+}
+
+const Variant& AgentImpl::getAttribute(const string& k) const
+{
+ Variant::Map::const_iterator iter = attributes.find(k);
+ if (iter == attributes.end())
+ throw KeyNotFound(k);
+ return iter->second;
+}
+
+
+ConsoleEvent AgentImpl::query(const Query& query, Duration timeout)
+{
+ boost::shared_ptr<SyncContext> context(new SyncContext());
+ uint32_t correlator(session.correlator());
+ ConsoleEvent result;
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ contextMap[correlator] = context;
+ }
+ try {
+ sendQuery(query, correlator);
+ {
+ uint64_t milliseconds = timeout.getMilliseconds();
+ qpid::sys::Mutex::ScopedLock cl(context->lock);
+ if (!context->response.isValid() || !context->response.isFinal())
+ context->cond.wait(context->lock,
+ qpid::sys::AbsTime(qpid::sys::now(),
+ qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+ if (context->response.isValid() &&
+ ((context->response.getType() == CONSOLE_QUERY_RESPONSE && context->response.isFinal()) ||
+ (context->response.getType() == CONSOLE_EXCEPTION)))
+ result = context->response;
+ else {
+ auto_ptr<ConsoleEventImpl> impl(new ConsoleEventImpl(CONSOLE_EXCEPTION));
+ Data exception(new DataImpl());
+ exception.setProperty("error_text", "Timed out waiting for the agent to respond");
+ impl->addData(exception);
+ result = ConsoleEvent(impl.release());
+ }
+ }
+ } catch (qpid::types::Exception&) {
+ }
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ contextMap.erase(correlator);
+ }
+
+ return result;
+}
+
+
+ConsoleEvent AgentImpl::query(const string& text, Duration timeout)
+{
+ return query(stringToQuery(text), timeout);
+}
+
+
+uint32_t AgentImpl::queryAsync(const Query& query)
+{
+ uint32_t correlator(session.correlator());
+
+ sendQuery(query, correlator);
+ return correlator;
+}
+
+
+uint32_t AgentImpl::queryAsync(const string& text)
+{
+ return queryAsync(stringToQuery(text));
+}
+
+
+ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout)
+{
+ boost::shared_ptr<SyncContext> context(new SyncContext());
+ uint32_t correlator(session.correlator());
+ ConsoleEvent result;
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ contextMap[correlator] = context;
+ }
+ try {
+ sendMethod(method, args, addr, correlator);
+ {
+ uint64_t milliseconds = timeout.getMilliseconds();
+ qpid::sys::Mutex::ScopedLock cl(context->lock);
+ if (!context->response.isValid())
+ context->cond.wait(context->lock,
+ qpid::sys::AbsTime(qpid::sys::now(),
+ qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+ if (context->response.isValid())
+ result = context->response;
+ else {
+ auto_ptr<ConsoleEventImpl> impl(new ConsoleEventImpl(CONSOLE_EXCEPTION));
+ Data exception(new DataImpl());
+ exception.setProperty("error_text", "Timed out waiting for the agent to respond");
+ impl->addData(exception);
+ result = ConsoleEvent(impl.release());
+ }
+ }
+ } catch (qpid::types::Exception&) {
+ }
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ contextMap.erase(correlator);
+ }
+
+ return result;
+}
+
+
+uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr)
+{
+ uint32_t correlator(session.correlator());
+
+ sendMethod(method, args, addr, correlator);
+ return correlator;
+}
+
+
+uint32_t AgentImpl::getPackageCount() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+
+ //
+ // Populate the package set.
+ //
+ for (set<SchemaId, SchemaIdCompare>::const_iterator iter = schemaIdSet.begin(); iter != schemaIdSet.end(); iter++)
+ packageSet.insert(iter->getPackageName());
+
+ return packageSet.size();
+}
+
+
+const string& AgentImpl::getPackage(uint32_t idx) const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ uint32_t count(0);
+ for (set<string>::const_iterator iter = packageSet.begin(); iter != packageSet.end(); iter++) {
+ if (idx == count)
+ return *iter;
+ count++;
+ }
+ throw IndexOutOfRange();
+}
+
+
+uint32_t AgentImpl::getSchemaIdCount(const string& pname) const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ uint32_t count(0);
+ for (set<SchemaId, SchemaIdCompare>::const_iterator iter = schemaIdSet.begin(); iter != schemaIdSet.end(); iter++)
+ if (iter->getPackageName() == pname)
+ count++;
+ return count;
+}
+
+
+SchemaId AgentImpl::getSchemaId(const string& pname, uint32_t idx) const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ uint32_t count(0);
+ for (set<SchemaId, SchemaIdCompare>::const_iterator iter = schemaIdSet.begin(); iter != schemaIdSet.end(); iter++) {
+ if (iter->getPackageName() == pname) {
+ if (idx == count)
+ return *iter;
+ count++;
+ }
+ }
+ throw IndexOutOfRange();
+}
+
+
+Schema AgentImpl::getSchema(const SchemaId& id, Duration timeout)
+{
+ if (!schemaCache->haveSchema(id))
+ //
+ // The desired schema is not in the cache. We need to asynchronously query the remote
+ // agent for the information. The call to schemaCache->getSchema will block waiting for
+ // the response to be received.
+ //
+ sendSchemaRequest(id);
+
+ return schemaCache->getSchema(id, timeout);
+}
+
+
+void AgentImpl::handleException(const Variant::Map& content, const Message& msg)
+{
+ const string& cid(msg.getCorrelationId());
+ Variant::Map::const_iterator aIter;
+ uint32_t correlator;
+ boost::shared_ptr<SyncContext> context;
+
+ try { correlator = boost::lexical_cast<uint32_t>(cid); }
+ catch(const boost::bad_lexical_cast&) { correlator = 0; }
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ map<uint32_t, boost::shared_ptr<SyncContext> >::iterator iter = contextMap.find(correlator);
+ if (iter != contextMap.end())
+ context = iter->second;
+ }
+
+ if (context.get() != 0) {
+ //
+ // This exception is associated with a synchronous request.
+ //
+ qpid::sys::Mutex::ScopedLock cl(context->lock);
+ context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_EXCEPTION));
+ ConsoleEventImplAccess::get(context->response).addData(new DataImpl(content, this));
+ ConsoleEventImplAccess::get(context->response).setAgent(this);
+ context->cond.notify();
+ } else {
+ //
+ // This exception is associated with an asynchronous request.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_EXCEPTION));
+ eventImpl->setCorrelator(correlator);
+ eventImpl->setAgent(this);
+ eventImpl->addData(new DataImpl(content, this));
+ session.enqueueEvent(eventImpl.release());
+ }
+}
+
+
+void AgentImpl::handleMethodResponse(const Variant::Map& response, const Message& msg)
+{
+ const string& cid(msg.getCorrelationId());
+ Variant::Map::const_iterator aIter;
+ Variant::Map argMap;
+ uint32_t correlator;
+ boost::shared_ptr<SyncContext> context;
+
+ QPID_LOG(trace, "RCVD MethodResponse cid=" << cid << " map=" << response);
+
+ aIter = response.find("_arguments");
+ if (aIter != response.end())
+ argMap = aIter->second.asMap();
+
+ try { correlator = boost::lexical_cast<uint32_t>(cid); }
+ catch(const boost::bad_lexical_cast&) { correlator = 0; }
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ map<uint32_t, boost::shared_ptr<SyncContext> >::iterator iter = contextMap.find(correlator);
+ if (iter != contextMap.end())
+ context = iter->second;
+ }
+
+ if (context.get() != 0) {
+ //
+ // This response is associated with a synchronous request.
+ //
+ qpid::sys::Mutex::ScopedLock cl(context->lock);
+ context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_METHOD_RESPONSE));
+ ConsoleEventImplAccess::get(context->response).setArguments(argMap);
+ ConsoleEventImplAccess::get(context->response).setAgent(this);
+ context->cond.notify();
+ } else {
+ //
+ // This response is associated with an asynchronous request.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_METHOD_RESPONSE));
+ eventImpl->setCorrelator(correlator);
+ eventImpl->setAgent(this);
+ eventImpl->setArguments(argMap);
+ session.enqueueEvent(eventImpl.release());
+ }
+}
+
+
+void AgentImpl::handleDataIndication(const Variant::List& list, const Message& msg)
+{
+ Variant::Map::const_iterator aIter;
+ const Variant::Map& props(msg.getProperties());
+ boost::shared_ptr<SyncContext> context;
+
+ aIter = props.find("qmf.content");
+ if (aIter == props.end())
+ return;
+
+ string content_type(aIter->second.asString());
+ if (content_type != "_event")
+ return;
+
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ const Variant::Map& eventMap(lIter->asMap());
+ Data data(new DataImpl(eventMap, this));
+ int severity(SEV_NOTICE);
+ uint64_t timestamp(0);
+
+ aIter = eventMap.find("_severity");
+ if (aIter != eventMap.end())
+ severity = int(aIter->second.asInt8());
+
+ aIter = eventMap.find("_timestamp");
+ if (aIter != eventMap.end())
+ timestamp = aIter->second.asUint64();
+
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_EVENT));
+ eventImpl->setAgent(this);
+ eventImpl->addData(data);
+ eventImpl->setSeverity(severity);
+ eventImpl->setTimestamp(timestamp);
+ if (data.hasSchema())
+ learnSchemaId(data.getSchemaId());
+ session.enqueueEvent(eventImpl.release());
+ }
+}
+
+
+void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& msg)
+{
+ const string& cid(msg.getCorrelationId());
+ Variant::Map::const_iterator aIter;
+ const Variant::Map& props(msg.getProperties());
+ uint32_t correlator;
+ bool final(false);
+ boost::shared_ptr<SyncContext> context;
+
+ aIter = props.find("partial");
+ if (aIter == props.end())
+ final = true;
+
+ aIter = props.find("qmf.content");
+ if (aIter == props.end())
+ return;
+
+ string content_type(aIter->second.asString());
+ if (content_type != "_schema" && content_type != "_schema_id" && content_type != "_data")
+ return;
+
+ try { correlator = boost::lexical_cast<uint32_t>(cid); }
+ catch(const boost::bad_lexical_cast&) { correlator = 0; }
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ map<uint32_t, boost::shared_ptr<SyncContext> >::iterator iter = contextMap.find(correlator);
+ if (iter != contextMap.end())
+ context = iter->second;
+ }
+
+ if (context.get() != 0) {
+ //
+ // This response is associated with a synchronous request.
+ //
+ qpid::sys::Mutex::ScopedLock cl(context->lock);
+ if (!context->response.isValid())
+ context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE));
+
+ if (content_type == "_data")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ Data data(new DataImpl(lIter->asMap(), this));
+ ConsoleEventImplAccess::get(context->response).addData(data);
+ if (data.hasSchema())
+ learnSchemaId(data.getSchemaId());
+ }
+ else if (content_type == "_schema_id")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ SchemaId schemaId(new SchemaIdImpl(lIter->asMap()));
+ ConsoleEventImplAccess::get(context->response).addSchemaId(schemaId);
+ learnSchemaId(schemaId);
+ }
+ else if (content_type == "_schema")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ Schema schema(new SchemaImpl(lIter->asMap()));
+ schemaCache->declareSchema(schema);
+ }
+
+ if (final) {
+ ConsoleEventImplAccess::get(context->response).setFinal();
+ ConsoleEventImplAccess::get(context->response).setAgent(this);
+ context->cond.notify();
+ }
+ } else {
+ //
+ // This response is associated with an asynchronous request.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE));
+ eventImpl->setCorrelator(correlator);
+ eventImpl->setAgent(this);
+
+ if (content_type == "_data")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ Data data(new DataImpl(lIter->asMap(), this));
+ eventImpl->addData(data);
+ if (data.hasSchema())
+ learnSchemaId(data.getSchemaId());
+ }
+ else if (content_type == "_schema_id")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ SchemaId schemaId(new SchemaIdImpl(lIter->asMap()));
+ eventImpl->addSchemaId(schemaId);
+ learnSchemaId(schemaId);
+ }
+ else if (content_type == "_schema")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ Schema schema(new SchemaImpl(lIter->asMap()));
+ schemaCache->declareSchema(schema);
+ }
+
+ if (final)
+ eventImpl->setFinal();
+ if (content_type != "_schema")
+ session.enqueueEvent(eventImpl.release());
+ }
+}
+
+
+Query AgentImpl::stringToQuery(const std::string& text)
+{
+ qpid::messaging::AddressParser parser(text);
+ Variant::Map map;
+ Variant::Map::const_iterator iter;
+ string className;
+ string packageName;
+
+ parser.parseMap(map);
+
+ iter = map.find("class");
+ if (iter != map.end())
+ className = iter->second.asString();
+
+ iter = map.find("package");
+ if (iter != map.end())
+ packageName = iter->second.asString();
+
+ Query query(QUERY_OBJECT, className, packageName);
+
+ iter = map.find("where");
+ if (iter != map.end())
+ query.setPredicate(iter->second.asList());
+
+ return query;
+}
+
+
+void AgentImpl::sendQuery(const Query& query, uint32_t correlator)
+{
+ Message msg;
+ Variant::Map map;
+ Variant::Map& headers(msg.getProperties());
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_REQUEST;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+
+ msg.setReplyTo(session.replyAddress);
+ msg.setCorrelationId(boost::lexical_cast<string>(correlator));
+ msg.setSubject(directSubject);
+ string userId(session.connection.getAuthenticatedUsername());
+ if (!userId.empty())
+ msg.setUserId(userId);
+ encode(QueryImplAccess::get(query).asMap(), msg);
+ if (sender.isValid()) {
+ sender.send(msg);
+ QPID_LOG(trace, "SENT QueryRequest to=" << sender.getName() << "/" << directSubject << " cid=" << correlator);
+ }
+}
+
+
+void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const DataAddr& addr, uint32_t correlator)
+{
+ Message msg;
+ Variant::Map map;
+ Variant::Map& headers(msg.getProperties());
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+
+ map["_method_name"] = method;
+ map["_object_id"] = addr.asMap();
+ map["_arguments"] = args;
+
+ msg.setReplyTo(session.replyAddress);
+ msg.setCorrelationId(boost::lexical_cast<string>(correlator));
+ msg.setSubject(directSubject);
+ string userId(session.connection.getAuthenticatedUsername());
+ if (!userId.empty())
+ msg.setUserId(userId);
+ encode(map, msg);
+ if (sender.isValid()) {
+ sender.send(msg);
+ QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << sender.getName() << "/" << directSubject << " content=" << map << " cid=" << correlator);
+ }
+}
+
+void AgentImpl::sendSchemaRequest(const SchemaId& id)
+{
+ uint32_t correlator(session.correlator());
+
+ if (capability >= AGENT_CAPABILITY_V2_SCHEMA) {
+ Query query(QUERY_SCHEMA, id);
+ sendQuery(query, correlator);
+ return;
+ }
+
+#define RAW_BUFFER_SIZE 1024
+ char rawBuffer[RAW_BUFFER_SIZE];
+ qpid::management::Buffer buffer(rawBuffer, RAW_BUFFER_SIZE);
+
+ buffer.putOctet('A');
+ buffer.putOctet('M');
+ buffer.putOctet('2');
+ buffer.putOctet('S');
+ buffer.putLong(correlator);
+ buffer.putShortString(id.getPackageName());
+ buffer.putShortString(id.getName());
+ buffer.putBin128(id.getHash().data());
+
+ string content(rawBuffer, buffer.getPosition());
+
+ Message msg;
+ msg.setReplyTo(session.replyAddress);
+ msg.setContent(content);
+ msg.setSubject(directSubject);
+ string userId(session.connection.getAuthenticatedUsername());
+ if (!userId.empty())
+ msg.setUserId(userId);
+ if (sender.isValid()) {
+ sender.send(msg);
+ QPID_LOG(trace, "SENT V1SchemaRequest to=" << sender.getName() << "/" << directSubject);
+ }
+}
+
+
+void AgentImpl::learnSchemaId(const SchemaId& id)
+{
+ schemaCache->declareSchemaId(id);
+ schemaIdSet.insert(id);
+}
+
+
+AgentImpl& AgentImplAccess::get(Agent& item)
+{
+ return *item.impl;
+}
+
+
+const AgentImpl& AgentImplAccess::get(const Agent& item)
+{
+ return *item.impl;
+}
diff --git a/qpid/cpp/src/qmf/AgentEvent.cpp b/qpid/cpp/src/qmf/AgentEvent.cpp
new file mode 100644
index 0000000000..2dc24ecac1
--- /dev/null
+++ b/qpid/cpp/src/qmf/AgentEvent.cpp
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/AgentEventImpl.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/SchemaImpl.h"
+
+using namespace std;
+using namespace qmf;
+using qpid::types::Variant;
+
+typedef PrivateImplRef<AgentEvent> PI;
+
+AgentEvent::AgentEvent(AgentEventImpl* impl) { PI::ctor(*this, impl); }
+AgentEvent::AgentEvent(const AgentEvent& s) : qmf::Handle<AgentEventImpl>() { PI::copy(*this, s); }
+AgentEvent::~AgentEvent() { PI::dtor(*this); }
+AgentEvent& AgentEvent::operator=(const AgentEvent& s) { return PI::assign(*this, s); }
+
+AgentEventCode AgentEvent::getType() const { return impl->getType(); }
+const string& AgentEvent::getUserId() const { return impl->getUserId(); }
+Query AgentEvent::getQuery() const { return impl->getQuery(); }
+bool AgentEvent::hasDataAddr() const { return impl->hasDataAddr(); }
+DataAddr AgentEvent::getDataAddr() const { return impl->getDataAddr(); }
+const string& AgentEvent::getMethodName() const { return impl->getMethodName(); }
+qpid::types::Variant::Map& AgentEvent::getArguments() { return impl->getArguments(); }
+qpid::types::Variant::Map& AgentEvent::getArgumentSubtypes() { return impl->getArgumentSubtypes(); }
+void AgentEvent::addReturnArgument(const std::string& k, const qpid::types::Variant& v, const std::string& s) { impl->addReturnArgument(k, v, s); }
+
+uint32_t AgentEventImpl::enqueueData(const Data& data)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ dataQueue.push(data);
+ return dataQueue.size();
+}
+
+
+Data AgentEventImpl::dequeueData()
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (dataQueue.empty())
+ return Data();
+ Data data(dataQueue.front());
+ dataQueue.pop();
+ return data;
+}
+
+
+void AgentEventImpl::addReturnArgument(const string& key, const Variant& val, const string& subtype)
+{
+ if (schema.isValid() && !SchemaImplAccess::get(schema).isValidMethodOutArg(methodName, key, val))
+ throw QmfException("Output argument is unknown or the type is incompatible");
+ outArguments[key] = val;
+ if (!subtype.empty())
+ outArgumentSubtypes[key] = subtype;
+}
+
+
+AgentEventImpl& AgentEventImplAccess::get(AgentEvent& item)
+{
+ return *item.impl;
+}
+
+
+const AgentEventImpl& AgentEventImplAccess::get(const AgentEvent& item)
+{
+ return *item.impl;
+}
diff --git a/qpid/cpp/src/qmf/AgentEventImpl.h b/qpid/cpp/src/qmf/AgentEventImpl.h
new file mode 100644
index 0000000000..1ecb41775a
--- /dev/null
+++ b/qpid/cpp/src/qmf/AgentEventImpl.h
@@ -0,0 +1,96 @@
+#ifndef _QMF_AGENT_EVENT_IMPL_H_
+#define _QMF_AGENT_EVENT_IMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/messaging/Address.h"
+#include "qmf/AgentEvent.h"
+#include "qmf/Query.h"
+#include "qmf/DataAddr.h"
+#include "qmf/Data.h"
+#include "qmf/Schema.h"
+#include <queue>
+
+namespace qmf {
+ class AgentEventImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Impl-only methods
+ //
+ AgentEventImpl(AgentEventCode e) : eventType(e) {}
+ void setUserId(const std::string& u) { userId = u; }
+ void setQuery(const Query& q) { query = q; }
+ void setDataAddr(const DataAddr& d) { dataAddr = d; }
+ void setMethodName(const std::string& m) { methodName = m; }
+ void setArguments(const qpid::types::Variant::Map& a) { arguments = a; }
+ void setArgumentSubtypes(const qpid::types::Variant::Map& a) { argumentSubtypes = a; }
+ void setReplyTo(const qpid::messaging::Address& r) { replyTo = r; }
+ void setSchema(const Schema& s) { schema = s; }
+ const qpid::messaging::Address& getReplyTo() { return replyTo; }
+ void setCorrelationId(const std::string& c) { correlationId = c; }
+ const std::string& getCorrelationId() { return correlationId; }
+ const qpid::types::Variant::Map& getReturnArguments() const { return outArguments; }
+ const qpid::types::Variant::Map& getReturnArgumentSubtypes() const { return outArgumentSubtypes; }
+ uint32_t enqueueData(const Data&);
+ Data dequeueData();
+
+ //
+ // Methods from API handle
+ //
+ AgentEventCode getType() const { return eventType; }
+ const std::string& getUserId() const { return userId; }
+ Query getQuery() const { return query; }
+ bool hasDataAddr() const { return dataAddr.isValid(); }
+ DataAddr getDataAddr() const { return dataAddr; }
+ const std::string& getMethodName() const { return methodName; }
+ qpid::types::Variant::Map& getArguments() { return arguments; }
+ qpid::types::Variant::Map& getArgumentSubtypes() { return argumentSubtypes; }
+ void addReturnArgument(const std::string&, const qpid::types::Variant&, const std::string&);
+
+ private:
+ const AgentEventCode eventType;
+ std::string userId;
+ qpid::messaging::Address replyTo;
+ std::string correlationId;
+ Query query;
+ DataAddr dataAddr;
+ Schema schema;
+ std::string methodName;
+ qpid::types::Variant::Map arguments;
+ qpid::types::Variant::Map argumentSubtypes;
+ qpid::types::Variant::Map outArguments;
+ qpid::types::Variant::Map outArgumentSubtypes;
+
+ qpid::sys::Mutex lock;
+ std::queue<Data> dataQueue;
+ };
+
+ struct AgentEventImplAccess
+ {
+ static AgentEventImpl& get(AgentEvent&);
+ static const AgentEventImpl& get(const AgentEvent&);
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/AgentImpl.h b/qpid/cpp/src/qmf/AgentImpl.h
new file mode 100644
index 0000000000..09754a3a7e
--- /dev/null
+++ b/qpid/cpp/src/qmf/AgentImpl.h
@@ -0,0 +1,122 @@
+#ifndef _QMF_AGENT_IMPL_H_
+#define _QMF_AGENT_IMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/Agent.h"
+#include "qmf/ConsoleEventImpl.h"
+#include "qmf/ConsoleSessionImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/SchemaCache.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
+#include <boost/shared_ptr.hpp>
+#include <map>
+#include <set>
+
+namespace qmf {
+ class AgentImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Impl-only methods
+ //
+ AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s);
+ void setAttribute(const std::string& k, const qpid::types::Variant& v);
+ void setAttribute(const std::string& k, const std::string& v) { attributes[k] = v; }
+ void touch() { touched = true; }
+ uint32_t age() { untouchedCount = touched ? 0 : untouchedCount + 1; touched = false; return untouchedCount; }
+ uint32_t getCapability() const { return capability; }
+ void handleException(const qpid::types::Variant::Map&, const qpid::messaging::Message&);
+ void handleMethodResponse(const qpid::types::Variant::Map&, const qpid::messaging::Message&);
+ void handleDataIndication(const qpid::types::Variant::List&, const qpid::messaging::Message&);
+ void handleQueryResponse(const qpid::types::Variant::List&, const qpid::messaging::Message&);
+
+ //
+ // Methods from API handle
+ //
+ const std::string& getName() const { return name; }
+ uint32_t getEpoch() const { return epoch; }
+ void setEpoch(uint32_t e) { epoch = e; }
+ std::string getVendor() const { return getAttribute("_vendor").asString(); }
+ std::string getProduct() const { return getAttribute("_product").asString(); }
+ std::string getInstance() const { return getAttribute("_instance").asString(); }
+ const qpid::types::Variant& getAttribute(const std::string& k) const;
+ const qpid::types::Variant::Map& getAttributes() const { return attributes; }
+
+ ConsoleEvent querySchema(qpid::messaging::Duration t) { return query(Query(QUERY_SCHEMA_ID), t); }
+ uint32_t querySchemaAsync() { return queryAsync(Query(QUERY_SCHEMA_ID)); }
+
+ ConsoleEvent query(const Query& q, qpid::messaging::Duration t);
+ ConsoleEvent query(const std::string& q, qpid::messaging::Duration t);
+ uint32_t queryAsync(const Query& q);
+ uint32_t queryAsync(const std::string& q);
+
+ ConsoleEvent callMethod(const std::string& m, const qpid::types::Variant::Map& a, const DataAddr&, qpid::messaging::Duration t);
+ uint32_t callMethodAsync(const std::string& m, const qpid::types::Variant::Map& a, const DataAddr&);
+
+ uint32_t getPackageCount() const;
+ const std::string& getPackage(uint32_t i) const;
+ uint32_t getSchemaIdCount(const std::string& p) const;
+ SchemaId getSchemaId(const std::string& p, uint32_t i) const;
+ Schema getSchema(const SchemaId& s, qpid::messaging::Duration t);
+
+ private:
+ struct SyncContext {
+ qpid::sys::Mutex lock;
+ qpid::sys::Condition cond;
+ ConsoleEvent response;
+ };
+
+ mutable qpid::sys::Mutex lock;
+ std::string name;
+ std::string directSubject;
+ uint32_t epoch;
+ ConsoleSessionImpl& session;
+ bool touched;
+ uint32_t untouchedCount;
+ uint32_t capability;
+ qpid::messaging::Sender sender;
+ qpid::types::Variant::Map attributes;
+ std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap;
+ boost::shared_ptr<SchemaCache> schemaCache;
+ mutable std::set<std::string> packageSet;
+ std::set<SchemaId, SchemaIdCompare> schemaIdSet;
+
+ Query stringToQuery(const std::string&);
+ void sendQuery(const Query&, uint32_t);
+ void sendSchemaIdQuery(uint32_t);
+ void sendMethod(const std::string&, const qpid::types::Variant::Map&, const DataAddr&, uint32_t);
+ void sendSchemaRequest(const SchemaId&);
+ void learnSchemaId(const SchemaId&);
+ };
+
+ struct AgentImplAccess
+ {
+ static AgentImpl& get(Agent&);
+ static const AgentImpl& get(const Agent&);
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp
new file mode 100644
index 0000000000..4605285448
--- /dev/null
+++ b/qpid/cpp/src/qmf/AgentSession.cpp
@@ -0,0 +1,1031 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/AgentSessionImpl.h"
+
+#include <iostream>
+#include <memory>
+
+namespace qmf {
+
+using std::string;
+using std::map;
+
+using qpid::messaging::Address;
+using qpid::messaging::Connection;
+using qpid::messaging::Duration;
+using qpid::messaging::Message;
+using qpid::messaging::Receiver;
+using qpid::messaging::Sender;
+using qpid::types::Variant;
+
+AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); }
+AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); }
+AgentSession::~AgentSession() { PI::dtor(*this); }
+AgentSession& AgentSession::operator=(const AgentSession& s) { return PI::assign(*this, s); }
+
+AgentSession::AgentSession(Connection& c, const string& o) { PI::ctor(*this, new AgentSessionImpl(c, o)); }
+void AgentSession::setDomain(const string& d) { impl->setDomain(d); }
+void AgentSession::setVendor(const string& v) { impl->setVendor(v); }
+void AgentSession::setProduct(const string& p) { impl->setProduct(p); }
+void AgentSession::setInstance(const string& i) { impl->setInstance(i); }
+void AgentSession::setAttribute(const string& k, const qpid::types::Variant& v) { impl->setAttribute(k, v); }
+const string& AgentSession::getName() const { return impl->getName(); }
+void AgentSession::open() { impl->open(); }
+void AgentSession::close() { impl->close(); }
+bool AgentSession::nextEvent(AgentEvent& e, Duration t) { return impl->nextEvent(e, t); }
+int AgentSession::pendingEvents() const { return impl->pendingEvents(); }
+void AgentSession::registerSchema(Schema& s) { impl->registerSchema(s); }
+DataAddr AgentSession::addData(Data& d, const string& n, bool p) { return impl->addData(d, n, p); }
+void AgentSession::delData(const DataAddr& a) { impl->delData(a); }
+void AgentSession::authAccept(AgentEvent& e) { impl->authAccept(e); }
+void AgentSession::authReject(AgentEvent& e, const string& m) { impl->authReject(e, m); }
+void AgentSession::raiseException(AgentEvent& e, const string& s) { impl->raiseException(e, s); }
+void AgentSession::raiseException(AgentEvent& e, const Data& d) { impl->raiseException(e, d); }
+void AgentSession::response(AgentEvent& e, const Data& d) { impl->response(e, d); }
+void AgentSession::complete(AgentEvent& e) { impl->complete(e); }
+void AgentSession::methodSuccess(AgentEvent& e) { impl->methodSuccess(e); }
+void AgentSession::raiseEvent(const Data& d) { impl->raiseEvent(d); }
+void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); }
+
+//========================================================================================
+// Impl Method Bodies
+//========================================================================================
+
+AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
+ connection(c), domain("default"), opened(false), eventNotifier(0), thread(0), threadCanceled(false),
+ bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
+ externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
+ maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
+ listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
+ schemaUpdateTime(uint64_t(qpid::sys::Duration::FromEpoch()))
+{
+ //
+ // Set Agent Capability Level
+ //
+ attributes["qmf.agent_capability"] = AGENT_CAPABILITY_0_8;
+
+ if (!options.empty()) {
+ qpid::messaging::AddressParser parser(options);
+ Variant::Map optMap;
+ Variant::Map::const_iterator iter;
+
+ parser.parseMap(optMap);
+
+ iter = optMap.find("domain");
+ if (iter != optMap.end())
+ domain = iter->second.asString();
+
+ iter = optMap.find("interval");
+ if (iter != optMap.end()) {
+ interval = iter->second.asUint32();
+ if (interval < 1)
+ interval = 1;
+ }
+
+ iter = optMap.find("external");
+ if (iter != optMap.end())
+ externalStorage = iter->second.asBool();
+
+ iter = optMap.find("allow-queries");
+ if (iter != optMap.end())
+ autoAllowQueries = iter->second.asBool();
+
+ iter = optMap.find("allow-methods");
+ if (iter != optMap.end())
+ autoAllowMethods = iter->second.asBool();
+
+ iter = optMap.find("max-subscriptions");
+ if (iter != optMap.end())
+ maxSubscriptions = iter->second.asUint32();
+
+ iter = optMap.find("min-sub-interval");
+ if (iter != optMap.end())
+ minSubInterval = iter->second.asUint32();
+
+ iter = optMap.find("sub-lifetime");
+ if (iter != optMap.end())
+ subLifetime = iter->second.asUint32();
+
+ iter = optMap.find("public-events");
+ if (iter != optMap.end())
+ publicEvents = iter->second.asBool();
+
+ iter = optMap.find("listen-on-direct");
+ if (iter != optMap.end())
+ listenOnDirect = iter->second.asBool();
+
+ iter = optMap.find("strict-security");
+ if (iter != optMap.end())
+ strictSecurity = iter->second.asBool();
+
+ iter = optMap.find("max-thread-wait-time");
+ if (iter != optMap.end())
+ maxThreadWaitTime = iter->second.asUint32();
+ }
+
+ if (maxThreadWaitTime > interval)
+ maxThreadWaitTime = interval;
+}
+
+
+AgentSessionImpl::~AgentSessionImpl()
+{
+ if (opened)
+ close();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+}
+
+
+void AgentSessionImpl::open()
+{
+ if (opened)
+ throw QmfException("The session is already open");
+
+ // If the thread exists, join and delete it before creating a new one.
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+
+ const string addrArgs(";{create:never,node:{type:topic}}");
+ const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
+ attributes["_direct_subject"] = routableAddr;
+
+ // Establish messaging addresses
+ setAgentName();
+ directBase = "qmf." + domain + ".direct";
+ topicBase = "qmf." + domain + ".topic";
+
+ // Create AMQP session, receivers, and senders
+ session = connection.createSession();
+ Receiver directRx;
+ Receiver routableDirectRx = session.createReceiver(topicBase + "/" + routableAddr + addrArgs);
+ Receiver topicRx = session.createReceiver(topicBase + "/console.#" + addrArgs);
+
+ if (listenOnDirect && !strictSecurity) {
+ directRx = session.createReceiver(directBase + "/" + agentName + addrArgs);
+ directRx.setCapacity(64);
+ }
+
+ routableDirectRx.setCapacity(64);
+ topicRx.setCapacity(64);
+
+ if (!strictSecurity)
+ directSender = session.createSender(directBase + addrArgs);
+ topicSender = session.createSender(topicBase + addrArgs);
+
+ // Start the receiver thread
+ threadCanceled = false;
+ opened = true;
+ thread = new qpid::sys::Thread(*this);
+
+ // Send an initial agent heartbeat message
+ sendHeartbeat();
+}
+
+
+void AgentSessionImpl::closeAsync()
+{
+ if (!opened)
+ return;
+
+ // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
+ threadCanceled = true;
+ opened = false;
+}
+
+
+void AgentSessionImpl::close()
+{
+ closeAsync();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ thread = 0;
+ }
+}
+
+
+bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
+{
+ uint64_t milliseconds = timeout.getMilliseconds();
+ qpid::sys::Mutex::ScopedLock l(lock);
+
+ if (eventQueue.empty() && milliseconds > 0) {
+ int64_t nsecs(qpid::sys::TIME_INFINITE);
+ if ((uint64_t)(nsecs / 1000000) > milliseconds)
+ nsecs = (int64_t) milliseconds * 1000000;
+ qpid::sys::Duration then(nsecs);
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+ }
+
+ if (!eventQueue.empty()) {
+ event = eventQueue.front();
+ eventQueue.pop();
+ if (eventQueue.empty())
+ alertEventNotifierLH(false);
+ return true;
+ }
+
+ return false;
+}
+
+
+int AgentSessionImpl::pendingEvents() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventQueue.size();
+}
+
+
+void AgentSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ eventNotifier = notifier;
+}
+
+EventNotifierImpl* AgentSessionImpl::getEventNotifier() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventNotifier;
+}
+
+
+void AgentSessionImpl::registerSchema(Schema& schema)
+{
+ if (!schema.isFinalized())
+ schema.finalize();
+ const SchemaId& schemaId(schema.getSchemaId());
+
+ qpid::sys::Mutex::ScopedLock l(lock);
+ schemata[schemaId] = schema;
+ schemaIndex[schemaId] = DataIndex();
+
+ //
+ // Get the news out at the next periodic interval that there is new schema information.
+ //
+ schemaUpdateTime = uint64_t(qpid::sys::Duration::FromEpoch());
+ forceHeartbeat = true;
+}
+
+
+DataAddr AgentSessionImpl::addData(Data& data, const string& name, bool persistent)
+{
+ if (externalStorage)
+ throw QmfException("addData() must not be called when the 'external' option is enabled.");
+
+ string dataName;
+ if (name.empty())
+ dataName = qpid::types::Uuid(true).str();
+ else
+ dataName = name;
+
+ DataAddr addr(dataName, agentName, persistent ? 0 : bootSequence);
+ data.setAddr(addr);
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ DataIndex::const_iterator iter = globalIndex.find(addr);
+ if (iter != globalIndex.end())
+ throw QmfException("Duplicate Data Address");
+
+ globalIndex[addr] = data;
+ if (data.hasSchema())
+ schemaIndex[data.getSchemaId()][addr] = data;
+ }
+
+ //
+ // TODO: Survey active subscriptions to see if they need to hear about this new data.
+ //
+
+ return addr;
+}
+
+
+void AgentSessionImpl::delData(const DataAddr& addr)
+{
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ DataIndex::iterator iter = globalIndex.find(addr);
+ if (iter == globalIndex.end())
+ return;
+ if (iter->second.hasSchema()) {
+ const SchemaId& schemaId(iter->second.getSchemaId());
+ schemaIndex[schemaId].erase(addr);
+ }
+ globalIndex.erase(iter);
+ }
+
+ //
+ // TODO: Survey active subscriptions to see if they need to hear about this deleted data.
+ //
+}
+
+
+void AgentSessionImpl::authAccept(AgentEvent& authEvent)
+{
+ std::auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_QUERY));
+ eventImpl->setQuery(authEvent.getQuery());
+ eventImpl->setUserId(authEvent.getUserId());
+ eventImpl->setReplyTo(AgentEventImplAccess::get(authEvent).getReplyTo());
+ eventImpl->setCorrelationId(AgentEventImplAccess::get(authEvent).getCorrelationId());
+ AgentEvent event(eventImpl.release());
+
+ if (externalStorage) {
+ enqueueEvent(event);
+ return;
+ }
+
+ const Query& query(authEvent.getQuery());
+ if (query.getDataAddr().isValid()) {
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ DataIndex::const_iterator iter = globalIndex.find(query.getDataAddr());
+ if (iter != globalIndex.end())
+ response(event, iter->second);
+ }
+ complete(event);
+ return;
+ }
+
+ if (query.getSchemaId().isValid()) {
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ map<SchemaId, DataIndex, SchemaIdCompareNoHash>::const_iterator iter = schemaIndex.find(query.getSchemaId());
+ if (iter != schemaIndex.end())
+ for (DataIndex::const_iterator dIter = iter->second.begin(); dIter != iter->second.end(); dIter++)
+ if (query.matchesPredicate(dIter->second.getProperties()))
+ response(event, dIter->second);
+ }
+ complete(event);
+ return;
+ }
+
+ raiseException(event, "Query is Invalid");
+}
+
+
+void AgentSessionImpl::authReject(AgentEvent& event, const string& error)
+{
+ raiseException(event, "Action Forbidden - " + error);
+}
+
+
+void AgentSessionImpl::raiseException(AgentEvent& event, const string& error)
+{
+ Data exception(new DataImpl());
+ exception.setProperty("error_text", error);
+ raiseException(event, exception);
+}
+
+
+void AgentSessionImpl::raiseException(AgentEvent& event, const Data& data)
+{
+ Message msg;
+ Variant::Map map;
+ Variant::Map& headers(msg.getProperties());
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_EXCEPTION;
+ headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+
+ AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
+ const DataImpl& dataImpl(DataImplAccess::get(data));
+
+ msg.setCorrelationId(eventImpl.getCorrelationId());
+ encode(dataImpl.asMap(), msg);
+ send(msg, eventImpl.getReplyTo());
+
+ QPID_LOG(trace, "SENT Exception to=" << eventImpl.getReplyTo());
+}
+
+
+void AgentSessionImpl::response(AgentEvent& event, const Data& data)
+{
+ AgentEventImpl& impl(AgentEventImplAccess::get(event));
+ uint32_t count = impl.enqueueData(data);
+ if (count >= 8)
+ flushResponses(event, false);
+}
+
+
+void AgentSessionImpl::complete(AgentEvent& event)
+{
+ flushResponses(event, true);
+}
+
+
+void AgentSessionImpl::methodSuccess(AgentEvent& event)
+{
+ Message msg;
+ Variant::Map map;
+ Variant::Map& headers(msg.getProperties());
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+
+ AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
+
+ const Variant::Map& outArgs(eventImpl.getReturnArguments());
+ const Variant::Map& outSubtypes(eventImpl.getReturnArgumentSubtypes());
+
+ map["_arguments"] = outArgs;
+ if (!outSubtypes.empty())
+ map["_subtypes"] = outSubtypes;
+
+ msg.setCorrelationId(eventImpl.getCorrelationId());
+ encode(map, msg);
+ send(msg, eventImpl.getReplyTo());
+
+ QPID_LOG(trace, "SENT MethodResponse to=" << eventImpl.getReplyTo());
+}
+
+
+void AgentSessionImpl::raiseEvent(const Data& data)
+{
+ int severity(SEV_NOTICE);
+ if (data.hasSchema()) {
+ const Schema& schema(DataImplAccess::get(data).getSchema());
+ if (schema.isValid())
+ severity = schema.getDefaultSeverity();
+ }
+
+ raiseEvent(data, severity);
+}
+
+
+void AgentSessionImpl::raiseEvent(const Data& data, int severity)
+{
+ Message msg;
+ Variant::Map map;
+ Variant::Map& headers(msg.getProperties());
+ string subject("agent.ind.event");
+
+ if (data.hasSchema()) {
+ const SchemaId& schemaId(data.getSchemaId());
+ if (schemaId.getType() != SCHEMA_TYPE_EVENT)
+ throw QmfException("Cannot call raiseEvent on data that is not an Event");
+ subject = subject + "." + schemaId.getPackageName() + "." + schemaId.getName();
+ }
+
+ if (severity < SEV_EMERG || severity > SEV_DEBUG)
+ throw QmfException("Invalid severity value");
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_DATA_INDICATION;
+ headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_EVENT;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+ msg.setSubject(subject);
+
+ Variant::List list;
+ Variant::Map dataAsMap(DataImplAccess::get(data).asMap());
+ dataAsMap["_severity"] = severity;
+ dataAsMap["_timestamp"] = uint64_t(qpid::sys::Duration::FromEpoch());
+ list.push_back(dataAsMap);
+ encode(list, msg);
+ topicSender.send(msg);
+
+ QPID_LOG(trace, "SENT EventIndication to=" << topicSender.getName() << "/" << subject);
+}
+
+
+void AgentSessionImpl::checkOpen()
+{
+ if (opened)
+ throw QmfException("Operation must be performed before calling open()");
+}
+
+
+void AgentSessionImpl::enqueueEvent(const AgentEvent& event)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ bool notify = eventQueue.empty();
+ eventQueue.push(event);
+ if (notify) {
+ cond.notify();
+ alertEventNotifierLH(true);
+ }
+}
+
+
+void AgentSessionImpl::setAgentName()
+{
+ Variant::Map::iterator iter;
+ string vendor;
+ string product;
+ string instance;
+
+ iter = attributes.find("_vendor");
+ if (iter == attributes.end())
+ attributes["_vendor"] = vendor;
+ else
+ vendor = iter->second.asString();
+
+ iter = attributes.find("_product");
+ if (iter == attributes.end())
+ attributes["_product"] = product;
+ else
+ product = iter->second.asString();
+
+ iter = attributes.find("_instance");
+ if (iter == attributes.end()) {
+ instance = qpid::types::Uuid(true).str();
+ attributes["_instance"] = instance;
+ } else
+ instance = iter->second.asString();
+
+ agentName = vendor + ":" + product + ":" + instance;
+ attributes["_name"] = agentName;
+}
+
+
+void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg)
+{
+ QPID_LOG(trace, "RCVD AgentLocateRequest from=" << msg.getReplyTo());
+
+ if (!predicate.empty()) {
+ Query agentQuery(QUERY_OBJECT);
+ agentQuery.setPredicate(predicate);
+ if (!agentQuery.matchesPredicate(attributes)) {
+ QPID_LOG(trace, "AgentLocate predicate does not match this agent, ignoring");
+ return;
+ }
+ }
+
+ Message reply;
+ Variant::Map map;
+ Variant::Map& headers(reply.getProperties());
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+
+ map["_values"] = attributes;
+ map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration::FromEpoch());
+ map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval;
+ map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence;
+ map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime;
+
+ encode(map, reply);
+ send(reply, msg.getReplyTo());
+ QPID_LOG(trace, "SENT AgentLocateResponse to=" << msg.getReplyTo());
+}
+
+
+void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Message& msg)
+{
+ QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId());
+
+ //
+ // Construct an AgentEvent to be sent to the application.
+ //
+ std::auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_METHOD));
+ eventImpl->setUserId(msg.getUserId());
+ eventImpl->setReplyTo(msg.getReplyTo());
+ eventImpl->setCorrelationId(msg.getCorrelationId());
+
+ Variant::Map::const_iterator iter;
+
+ iter = content.find("_method_name");
+ if (iter == content.end()) {
+ AgentEvent event(eventImpl.release());
+ raiseException(event, "Malformed MethodRequest: missing _method_name field");
+ return;
+ }
+ eventImpl->setMethodName(iter->second.asString());
+
+ iter = content.find("_arguments");
+ if (iter != content.end())
+ eventImpl->setArguments(iter->second.asMap());
+
+ iter = content.find("_subtypes");
+ if (iter != content.end())
+ eventImpl->setArgumentSubtypes(iter->second.asMap());
+
+ iter = content.find("_object_id");
+ if (iter != content.end()) {
+ DataAddr addr(new DataAddrImpl(iter->second.asMap()));
+ eventImpl->setDataAddr(addr);
+ if (!externalStorage) {
+ DataIndex::const_iterator iter(globalIndex.find(addr));
+ if (iter == globalIndex.end()) {
+ AgentEvent event(eventImpl.release());
+ raiseException(event, "No data object found with the specified address");
+ return;
+ }
+
+ const Schema& schema(DataImplAccess::get(iter->second).getSchema());
+ if (schema.isValid()) {
+ eventImpl->setSchema(schema);
+ for (Variant::Map::const_iterator aIter = eventImpl->getArguments().begin();
+ aIter != eventImpl->getArguments().end(); aIter++) {
+ const Schema& schema(DataImplAccess::get(iter->second).getSchema());
+ if (!SchemaImplAccess::get(schema).isValidMethodInArg(eventImpl->getMethodName(), aIter->first, aIter->second)) {
+ AgentEvent event(eventImpl.release());
+ raiseException(event, "Invalid argument: " + aIter->first);
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ enqueueEvent(AgentEvent(eventImpl.release()));
+}
+
+
+void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Message& msg)
+{
+ QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId());
+
+ //
+ // Construct an AgentEvent to be sent to the application or directly handled by the agent.
+ //
+ std::auto_ptr<QueryImpl> queryImpl(new QueryImpl(content));
+ std::auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_AUTH_QUERY));
+ eventImpl->setUserId(msg.getUserId());
+ eventImpl->setReplyTo(msg.getReplyTo());
+ eventImpl->setCorrelationId(msg.getCorrelationId());
+ eventImpl->setQuery(queryImpl.release());
+ AgentEvent ae(eventImpl.release());
+
+ if (ae.getQuery().getTarget() == QUERY_SCHEMA_ID || ae.getQuery().getTarget() == QUERY_SCHEMA) {
+ handleSchemaRequest(ae);
+ return;
+ }
+
+ if (autoAllowQueries)
+ authAccept(ae);
+ else
+ enqueueEvent(ae);
+}
+
+
+void AgentSessionImpl::handleSchemaRequest(AgentEvent& event)
+{
+ SchemaMap::const_iterator iter;
+ string error;
+ const Query& query(event.getQuery());
+
+ Message msg;
+ Variant::List content;
+ Variant::Map map;
+ Variant::Map& headers(msg.getProperties());
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (query.getTarget() == QUERY_SCHEMA_ID) {
+ headers[protocol::HEADER_KEY_CONTENT] = "_schema_id";
+ for (iter = schemata.begin(); iter != schemata.end(); iter++)
+ content.push_back(SchemaIdImplAccess::get(iter->first).asMap());
+ } else if (query.getSchemaId().isValid()) {
+ headers[protocol::HEADER_KEY_CONTENT] = "_schema";
+ iter = schemata.find(query.getSchemaId());
+ if (iter != schemata.end())
+ content.push_back(SchemaImplAccess::get(iter->second).asMap());
+ } else {
+ error = "Invalid Schema Query: Requests for SCHEMA must supply a valid schema ID.";
+ }
+ }
+
+ if (!error.empty()) {
+ raiseException(event, error);
+ return;
+ }
+
+ AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
+
+ msg.setCorrelationId(eventImpl.getCorrelationId());
+ encode(content, msg);
+ send(msg, eventImpl.getReplyTo());
+
+ QPID_LOG(trace, "SENT QueryResponse(Schema) to=" << eventImpl.getReplyTo());
+}
+
+
+void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, uint32_t seq, const Message& msg)
+{
+ string packageName;
+ string className;
+ uint8_t hashBits[16];
+
+ buffer.getShortString(packageName);
+ buffer.getShortString(className);
+ buffer.getBin128(hashBits);
+
+ QPID_LOG(trace, "RCVD QMFv1 SchemaRequest for " << packageName << ":" << className);
+
+ qpid::types::Uuid hash(hashBits);
+ map<SchemaId, Schema, SchemaIdCompare>::const_iterator iter;
+ string replyContent;
+
+ SchemaId dataId(SCHEMA_TYPE_DATA, packageName, className);
+ dataId.setHash(hash);
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ iter = schemata.find(dataId);
+ if (iter != schemata.end())
+ replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq);
+ else {
+ SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className);
+ eventId.setHash(hash);
+ iter = schemata.find(dataId);
+ if (iter != schemata.end())
+ replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq);
+ else
+ return;
+ }
+ }
+
+ Message reply;
+ Variant::Map& headers(reply.getProperties());
+
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ reply.setContent(replyContent);
+
+ send(reply, msg.getReplyTo());
+ QPID_LOG(trace, "SENT QMFv1 SchemaResponse to=" << msg.getReplyTo());
+}
+
+
+void AgentSessionImpl::dispatch(Message msg)
+{
+ const Variant::Map& properties(msg.getProperties());
+ Variant::Map::const_iterator iter;
+
+ //
+ // If strict-security is enabled, make sure that reply-to address complies with the
+ // strict-security addressing pattern (i.e. start with 'qmf.<domain>.topic/direct-console.').
+ //
+ if (strictSecurity && msg.getReplyTo()) {
+ if (msg.getReplyTo().getName() != topicBase || msg.getReplyTo().getSubject().find("direct-console.") != 0) {
+ QPID_LOG(warning, "Reply-to violates strict-security policy: " << msg.getReplyTo().str());
+ return;
+ }
+ }
+
+ iter = properties.find(protocol::HEADER_KEY_APP_ID);
+ if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) {
+ //
+ // Dispatch a QMFv2 formatted message
+ //
+ iter = properties.find(protocol::HEADER_KEY_OPCODE);
+ if (iter == properties.end()) {
+ QPID_LOG(trace, "Message received with no 'qmf.opcode' header");
+ return;
+ }
+
+ const string& opcode = iter->second.asString();
+
+ if (msg.getContentType() == "amqp/list") {
+ Variant::List content;
+ decode(msg, content);
+
+ if (opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST) handleLocateRequest(content, msg);
+ else {
+ QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/list' content: " << opcode);
+ }
+
+ } else if (msg.getContentType() == "amqp/map") {
+ Variant::Map content;
+ decode(msg, content);
+
+ if (opcode == protocol::HEADER_OPCODE_METHOD_REQUEST) handleMethodRequest(content, msg);
+ else if (opcode == protocol::HEADER_OPCODE_QUERY_REQUEST) handleQueryRequest(content, msg);
+ else {
+ QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/map' content: " << opcode);
+ }
+ } else {
+ QPID_LOG(trace, "Unexpected QMFv2 content type. Expected amqp/list or amqp/map");
+ }
+
+ } else {
+ //
+ // Dispatch a QMFv1 formatted message
+ //
+ const string& body(msg.getContent());
+ if (body.size() < 8)
+ return;
+ qpid::management::Buffer buffer(const_cast<char*>(body.c_str()), body.size());
+
+ if (buffer.getOctet() != 'A') return;
+ if (buffer.getOctet() != 'M') return;
+ if (buffer.getOctet() != '2') return;
+ char v1Opcode(buffer.getOctet());
+ uint32_t seq(buffer.getLong());
+
+ if (v1Opcode == 'S') handleV1SchemaRequest(buffer, seq, msg);
+ else {
+ QPID_LOG(trace, "Unknown or Unsupported QMFv1 opcode: " << v1Opcode);
+ }
+ }
+}
+
+
+void AgentSessionImpl::sendHeartbeat()
+{
+ Message msg;
+ Variant::Map map;
+ Variant::Map& headers(msg.getProperties());
+ std::stringstream address;
+
+ address << "agent.ind.heartbeat";
+
+ // append .<vendor>.<product> to address key if present.
+ Variant::Map::const_iterator v;
+ if ((v = attributes.find("_vendor")) != attributes.end() && !v->second.getString().empty()) {
+ address << "." << v->second.getString();
+ if ((v = attributes.find("_product")) != attributes.end() && !v->second.getString().empty()) {
+ address << "." << v->second.getString();
+ }
+ }
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+ msg.setSubject(address.str());
+
+ map["_values"] = attributes;
+ map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration::FromEpoch());
+ map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval;
+ map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence;
+ map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime;
+
+ encode(map, msg);
+ topicSender.send(msg);
+ QPID_LOG(trace, "SENT AgentHeartbeat name=" << agentName);
+}
+
+
+void AgentSessionImpl::send(Message msg, const Address& to)
+{
+ Sender sender;
+
+ if (strictSecurity && to.getName() != topicBase) {
+ QPID_LOG(warning, "Address violates strict-security policy: " << to);
+ return;
+ }
+
+ if (to.getName() == directBase) {
+ msg.setSubject(to.getSubject());
+ sender = directSender;
+ } else if (to.getName() == topicBase) {
+ msg.setSubject(to.getSubject());
+ sender = topicSender;
+ } else
+ sender = session.createSender(to);
+
+ sender.send(msg);
+}
+
+
+void AgentSessionImpl::flushResponses(AgentEvent& event, bool final)
+{
+ Message msg;
+ Variant::Map map;
+ Variant::Map& headers(msg.getProperties());
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE;
+ headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA;
+ headers[protocol::HEADER_KEY_AGENT] = agentName;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+ if (!final)
+ headers[protocol::HEADER_KEY_PARTIAL] = Variant();
+
+ Variant::List body;
+ AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
+ Data data(eventImpl.dequeueData());
+ while (data.isValid()) {
+ DataImpl& dataImpl(DataImplAccess::get(data));
+ body.push_back(dataImpl.asMap());
+ data = eventImpl.dequeueData();
+ }
+
+ msg.setCorrelationId(eventImpl.getCorrelationId());
+ encode(body, msg);
+ send(msg, eventImpl.getReplyTo());
+
+ QPID_LOG(trace, "SENT QueryResponse to=" << eventImpl.getReplyTo());
+}
+
+
+void AgentSessionImpl::periodicProcessing(uint64_t seconds)
+{
+ //
+ // The granularity of this timer is seconds. Don't waste time looking for work if
+ // it's been less than a second since we last visited.
+ //
+ if (seconds == lastVisit)
+ return;
+ //uint64_t thisInterval(seconds - lastVisit);
+ lastVisit = seconds;
+
+ //
+ // First time through, set lastHeartbeat to the current time.
+ //
+ if (lastHeartbeat == 0)
+ lastHeartbeat = seconds;
+
+ //
+ // If the hearbeat interval has elapsed, send a heartbeat.
+ //
+ if (forceHeartbeat || (seconds - lastHeartbeat >= interval)) {
+ lastHeartbeat = seconds;
+ forceHeartbeat = false;
+ sendHeartbeat();
+ }
+
+ //
+ // TODO: process any active subscriptions on their intervals.
+ //
+}
+
+
+void AgentSessionImpl::alertEventNotifierLH(bool readable)
+{
+ if (eventNotifier)
+ eventNotifier->setReadable(readable);
+}
+
+
+void AgentSessionImpl::run()
+{
+ QPID_LOG(debug, "AgentSession thread started for agent " << agentName);
+
+ try {
+ while (!threadCanceled) {
+ periodicProcessing((uint64_t) qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_SEC);
+
+ Receiver rx;
+ bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
+ if (threadCanceled)
+ break;
+ if (valid) {
+ try {
+ dispatch(rx.fetch());
+ } catch (qpid::types::Exception& e) {
+ QPID_LOG(error, "Exception caught in message dispatch: " << e.what());
+ }
+ session.acknowledge();
+ }
+ }
+ } catch (qpid::types::Exception& e) {
+ QPID_LOG(error, "Exception caught in message thread - exiting: " << e.what());
+ enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED)));
+ }
+
+ session.close();
+ QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
+}
+
+
+AgentSessionImpl& AgentSessionImplAccess::get(AgentSession& session)
+{
+ return *session.impl;
+}
+
+
+const AgentSessionImpl& AgentSessionImplAccess::get(const AgentSession& session)
+{
+ return *session.impl;
+}
+
+}
diff --git a/qpid/cpp/src/qmf/AgentSessionImpl.h b/qpid/cpp/src/qmf/AgentSessionImpl.h
new file mode 100644
index 0000000000..64a39ab2e8
--- /dev/null
+++ b/qpid/cpp/src/qmf/AgentSessionImpl.h
@@ -0,0 +1,168 @@
+#ifndef __QMF_AGENT_SESSION_IMPL_H
+#define __QMF_AGENT_SESSION_IMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/exceptions.h"
+#include "qmf/AgentSession.h"
+#include "qmf/AgentEventImpl.h"
+#include "qmf/EventNotifierImpl.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/AddressParser.h"
+#include "qpid/management/Buffer.h"
+#include "qpid/RefCounted.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/AgentSession.h"
+#include "qmf/exceptions.h"
+#include "qmf/AgentSession.h"
+#include "qmf/SchemaIdImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/DataAddrImpl.h"
+#include "qmf/DataImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/agentCapability.h"
+#include "qmf/constants.h"
+
+#include <queue>
+#include <map>
+
+namespace qmf {
+ typedef qmf::PrivateImplRef<AgentSession> PI;
+
+ class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
+ public:
+ ~AgentSessionImpl();
+
+ //
+ // Methods from API handle
+ //
+ AgentSessionImpl(qpid::messaging::Connection& c, const std::string& o);
+ void setDomain(const std::string& d) { checkOpen(); domain = d; }
+ void setVendor(const std::string& v) { checkOpen(); attributes["_vendor"] = v; }
+ void setProduct(const std::string& p) { checkOpen(); attributes["_product"] = p; }
+ void setInstance(const std::string& i) { checkOpen(); attributes["_instance"] = i; }
+ void setAttribute(const std::string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; }
+ const std::string& getName() const { return agentName; }
+ void open();
+ void closeAsync();
+ void close();
+ bool nextEvent(AgentEvent& e, qpid::messaging::Duration t);
+ int pendingEvents() const;
+
+ void setEventNotifier(EventNotifierImpl* eventNotifier);
+ EventNotifierImpl* getEventNotifier() const;
+
+ void registerSchema(Schema& s);
+ DataAddr addData(Data& d, const std::string& n, bool persist);
+ void delData(const DataAddr&);
+
+ void authAccept(AgentEvent& e);
+ void authReject(AgentEvent& e, const std::string& m);
+ void raiseException(AgentEvent& e, const std::string& s);
+ void raiseException(AgentEvent& e, const Data& d);
+ void response(AgentEvent& e, const Data& d);
+ void complete(AgentEvent& e);
+ void methodSuccess(AgentEvent& e);
+ void raiseEvent(const Data& d);
+ void raiseEvent(const Data& d, int s);
+
+ private:
+ typedef std::map<DataAddr, Data, DataAddrCompare> DataIndex;
+ typedef std::map<SchemaId, Schema, SchemaIdCompare> SchemaMap;
+
+ mutable qpid::sys::Mutex lock;
+ qpid::sys::Condition cond;
+ qpid::messaging::Connection connection;
+ qpid::messaging::Session session;
+ qpid::messaging::Sender directSender;
+ qpid::messaging::Sender topicSender;
+ std::string domain;
+ qpid::types::Variant::Map attributes;
+ qpid::types::Variant::Map options;
+ std::string agentName;
+ bool opened;
+ std::queue<AgentEvent> eventQueue;
+ EventNotifierImpl* eventNotifier;
+ qpid::sys::Thread* thread;
+ bool threadCanceled;
+ uint32_t bootSequence;
+ uint32_t interval;
+ uint64_t lastHeartbeat;
+ uint64_t lastVisit;
+ bool forceHeartbeat;
+ bool externalStorage;
+ bool autoAllowQueries;
+ bool autoAllowMethods;
+ uint32_t maxSubscriptions;
+ uint32_t minSubInterval;
+ uint32_t subLifetime;
+ bool publicEvents;
+ bool listenOnDirect;
+ bool strictSecurity;
+ uint32_t maxThreadWaitTime;
+ uint64_t schemaUpdateTime;
+ std::string directBase;
+ std::string topicBase;
+
+ SchemaMap schemata;
+ DataIndex globalIndex;
+ std::map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex;
+
+ void checkOpen();
+ void setAgentName();
+ void enqueueEvent(const AgentEvent&);
+ void alertEventNotifierLH(bool readable);
+ void handleLocateRequest(const qpid::types::Variant::List& content, const qpid::messaging::Message& msg);
+ void handleMethodRequest(const qpid::types::Variant::Map& content, const qpid::messaging::Message& msg);
+ void handleQueryRequest(const qpid::types::Variant::Map& content, const qpid::messaging::Message& msg);
+ void handleSchemaRequest(AgentEvent&);
+ void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&);
+ void dispatch(qpid::messaging::Message);
+ void sendHeartbeat();
+ void send(qpid::messaging::Message, const qpid::messaging::Address&);
+ void flushResponses(AgentEvent&, bool);
+ void periodicProcessing(uint64_t);
+ void run();
+ };
+
+ struct AgentSessionImplAccess {
+ static AgentSessionImpl& get(AgentSession& session);
+ static const AgentSessionImpl& get(const AgentSession& session);
+ };
+}
+
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/AgentSubscription.cpp b/qpid/cpp/src/qmf/AgentSubscription.cpp
new file mode 100644
index 0000000000..4dc5cb74a4
--- /dev/null
+++ b/qpid/cpp/src/qmf/AgentSubscription.cpp
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/AgentSubscription.h"
+
+using namespace qmf;
+
+AgentSubscription::AgentSubscription(uint64_t _id, uint64_t _interval, uint64_t _life,
+ const std::string& _replyTo, const std::string& _cid, Query _query) :
+ id(_id), interval(_interval), lifetime(_life), timeSincePublish(0), timeSinceKeepalive(0),
+ replyTo(_replyTo), cid(_cid), query(_query)
+{
+}
+
+
+AgentSubscription::~AgentSubscription()
+{
+}
+
+
+bool AgentSubscription::tick(uint64_t seconds)
+{
+ timeSinceKeepalive += seconds;
+ if (timeSinceKeepalive >= lifetime)
+ return false;
+
+ timeSincePublish += seconds;
+ if (timeSincePublish >= interval) {
+ }
+
+ return true;
+}
+
diff --git a/qpid/cpp/src/qmf/AgentSubscription.h b/qpid/cpp/src/qmf/AgentSubscription.h
new file mode 100644
index 0000000000..01e8f43e9f
--- /dev/null
+++ b/qpid/cpp/src/qmf/AgentSubscription.h
@@ -0,0 +1,52 @@
+#ifndef _QMF_AGENT_SUBSCRIPTION_H_
+#define _QMF_AGENT_SUBSCRIPTION_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/types/Variant.h"
+#include "qmf/Query.h"
+#include "qmf/Data.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+ class AgentSubscription {
+ public:
+ AgentSubscription(uint64_t _id, uint64_t _interval, uint64_t _life,
+ const std::string& _replyTo, const std::string& _cid, Query _query);
+ ~AgentSubscription();
+ bool tick(uint64_t seconds);
+ void keepalive() { timeSinceKeepalive = 0; }
+
+ private:
+ uint64_t id;
+ uint64_t interval;
+ uint64_t lifetime;
+ uint64_t timeSincePublish;
+ uint64_t timeSinceKeepalive;
+ const std::string replyTo;
+ const std::string cid;
+ Query query;
+ };
+
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/ConsoleEvent.cpp b/qpid/cpp/src/qmf/ConsoleEvent.cpp
new file mode 100644
index 0000000000..b2a5e321c7
--- /dev/null
+++ b/qpid/cpp/src/qmf/ConsoleEvent.cpp
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/ConsoleEventImpl.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/exceptions.h"
+
+using namespace std;
+using namespace qmf;
+using qpid::types::Variant;
+
+typedef PrivateImplRef<ConsoleEvent> PI;
+
+ConsoleEvent::ConsoleEvent(ConsoleEventImpl* impl) { PI::ctor(*this, impl); }
+ConsoleEvent::ConsoleEvent(const ConsoleEvent& s) : qmf::Handle<ConsoleEventImpl>() { PI::copy(*this, s); }
+ConsoleEvent::~ConsoleEvent() { PI::dtor(*this); }
+ConsoleEvent& ConsoleEvent::operator=(const ConsoleEvent& s) { return PI::assign(*this, s); }
+
+ConsoleEventCode ConsoleEvent::getType() const { return impl->getType(); }
+uint32_t ConsoleEvent::getCorrelator() const { return impl->getCorrelator(); }
+Agent ConsoleEvent::getAgent() const { return impl->getAgent(); }
+AgentDelReason ConsoleEvent::getAgentDelReason() const { return impl->getAgentDelReason(); }
+uint32_t ConsoleEvent::getSchemaIdCount() const { return impl->getSchemaIdCount(); }
+SchemaId ConsoleEvent::getSchemaId(uint32_t i) const { return impl->getSchemaId(i); }
+uint32_t ConsoleEvent::getDataCount() const { return impl->getDataCount(); }
+Data ConsoleEvent::getData(uint32_t i) const { return impl->getData(i); }
+bool ConsoleEvent::isFinal() const { return impl->isFinal(); }
+const Variant::Map& ConsoleEvent::getArguments() const { return impl->getArguments(); }
+int ConsoleEvent::getSeverity() const { return impl->getSeverity(); }
+uint64_t ConsoleEvent::getTimestamp() const { return impl->getTimestamp(); }
+
+
+SchemaId ConsoleEventImpl::getSchemaId(uint32_t i) const
+{
+ uint32_t count = 0;
+ for (list<SchemaId>::const_iterator iter = newSchemaIds.begin(); iter != newSchemaIds.end(); iter++) {
+ if (count++ == i)
+ return *iter;
+ }
+ throw IndexOutOfRange();
+}
+
+
+Data ConsoleEventImpl::getData(uint32_t i) const
+{
+ uint32_t count = 0;
+ for (list<Data>::const_iterator iter = dataList.begin(); iter != dataList.end(); iter++) {
+ if (count++ == i)
+ return *iter;
+ }
+ throw IndexOutOfRange();
+}
+
+
+ConsoleEventImpl& ConsoleEventImplAccess::get(ConsoleEvent& item)
+{
+ return *item.impl;
+}
+
+
+const ConsoleEventImpl& ConsoleEventImplAccess::get(const ConsoleEvent& item)
+{
+ return *item.impl;
+}
diff --git a/qpid/cpp/src/qmf/ConsoleEventImpl.h b/qpid/cpp/src/qmf/ConsoleEventImpl.h
new file mode 100644
index 0000000000..9843971456
--- /dev/null
+++ b/qpid/cpp/src/qmf/ConsoleEventImpl.h
@@ -0,0 +1,84 @@
+#ifndef _QMF_CONSOLE_EVENT_IMPL_H_
+#define _QMF_CONSOLE_EVENT_IMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/ConsoleEvent.h"
+#include "qmf/Agent.h"
+#include "qmf/Data.h"
+#include "qpid/types/Variant.h"
+#include <list>
+
+namespace qmf {
+ class ConsoleEventImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Impl-only methods
+ //
+ ConsoleEventImpl(ConsoleEventCode e, AgentDelReason r = AGENT_DEL_AGED) :
+ eventType(e), delReason(r), correlator(0), final(false) {}
+ void setCorrelator(uint32_t c) { correlator = c; }
+ void setAgent(const Agent& a) { agent = a; }
+ void addData(const Data& d) { dataList.push_back(Data(d)); }
+ void addSchemaId(const SchemaId& s) { newSchemaIds.push_back(SchemaId(s)); }
+ void setFinal() { final = true; }
+ void setArguments(const qpid::types::Variant::Map& a) { arguments = a; }
+ void setSeverity(int s) { severity = s; }
+ void setTimestamp(uint64_t t) { timestamp = t; }
+
+ //
+ // Methods from API handle
+ //
+ ConsoleEventCode getType() const { return eventType; }
+ uint32_t getCorrelator() const { return correlator; }
+ Agent getAgent() const { return agent; }
+ AgentDelReason getAgentDelReason() const { return delReason; }
+ uint32_t getSchemaIdCount() const { return newSchemaIds.size(); }
+ SchemaId getSchemaId(uint32_t) const;
+ uint32_t getDataCount() const { return dataList.size(); }
+ Data getData(uint32_t i) const;
+ bool isFinal() const { return final; }
+ const qpid::types::Variant::Map& getArguments() const { return arguments; }
+ int getSeverity() const { return severity; }
+ uint64_t getTimestamp() const { return timestamp; }
+
+ private:
+ const ConsoleEventCode eventType;
+ const AgentDelReason delReason;
+ uint32_t correlator;
+ Agent agent;
+ bool final;
+ std::list<Data> dataList;
+ std::list<SchemaId> newSchemaIds;
+ qpid::types::Variant::Map arguments;
+ int severity;
+ uint64_t timestamp;
+ };
+
+ struct ConsoleEventImplAccess
+ {
+ static ConsoleEventImpl& get(ConsoleEvent&);
+ static const ConsoleEventImpl& get(const ConsoleEvent&);
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp
new file mode 100644
index 0000000000..c74d4de8db
--- /dev/null
+++ b/qpid/cpp/src/qmf/ConsoleSession.cpp
@@ -0,0 +1,683 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/PrivateImplRef.h"
+#include "qmf/ConsoleSessionImpl.h"
+#include "qmf/AgentImpl.h"
+#include "qmf/SchemaId.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/ConsoleEventImpl.h"
+#include "qmf/constants.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/AddressParser.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+
+using namespace std;
+using namespace qmf;
+using qpid::messaging::Address;
+using qpid::messaging::Connection;
+using qpid::messaging::Receiver;
+using qpid::messaging::Sender;
+using qpid::messaging::Duration;
+using qpid::messaging::Message;
+using qpid::types::Variant;
+
+typedef qmf::PrivateImplRef<ConsoleSession> PI;
+
+ConsoleSession::ConsoleSession(ConsoleSessionImpl* impl) { PI::ctor(*this, impl); }
+ConsoleSession::ConsoleSession(const ConsoleSession& s) : qmf::Handle<ConsoleSessionImpl>() { PI::copy(*this, s); }
+ConsoleSession::~ConsoleSession() { PI::dtor(*this); }
+ConsoleSession& ConsoleSession::operator=(const ConsoleSession& s) { return PI::assign(*this, s); }
+
+ConsoleSession::ConsoleSession(Connection& c, const string& o) { PI::ctor(*this, new ConsoleSessionImpl(c, o)); }
+void ConsoleSession::setDomain(const string& d) { impl->setDomain(d); }
+void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f); }
+void ConsoleSession::open() { impl->open(); }
+void ConsoleSession::close() { impl->close(); }
+bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); }
+int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); }
+uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); }
+Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); }
+Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); }
+Subscription ConsoleSession::subscribe(const Query& q, const string& f, const string& o) { return impl->subscribe(q, f, o); }
+Subscription ConsoleSession::subscribe(const string& q, const string& f, const string& o) { return impl->subscribe(q, f, o); }
+
+//========================================================================================
+// Impl Method Bodies
+//========================================================================================
+
+ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
+ connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
+ opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
+ connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
+{
+ if (!options.empty()) {
+ qpid::messaging::AddressParser parser(options);
+ Variant::Map optMap;
+ Variant::Map::const_iterator iter;
+
+ parser.parseMap(optMap);
+
+ iter = optMap.find("domain");
+ if (iter != optMap.end())
+ domain = iter->second.asString();
+
+ iter = optMap.find("max-agent-age");
+ if (iter != optMap.end())
+ maxAgentAgeMinutes = iter->second.asUint32();
+
+ iter = optMap.find("listen-on-direct");
+ if (iter != optMap.end())
+ listenOnDirect = iter->second.asBool();
+
+ iter = optMap.find("strict-security");
+ if (iter != optMap.end())
+ strictSecurity = iter->second.asBool();
+
+ iter = optMap.find("max-thread-wait-time");
+ if (iter != optMap.end())
+ maxThreadWaitTime = iter->second.asUint32();
+ }
+
+ if (maxThreadWaitTime > 60)
+ maxThreadWaitTime = 60;
+}
+
+
+ConsoleSessionImpl::~ConsoleSessionImpl()
+{
+ if (opened)
+ close();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+}
+
+
+void ConsoleSessionImpl::setAgentFilter(const string& predicate)
+{
+ agentQuery = Query(QUERY_OBJECT, predicate);
+
+ //
+ // Purge the agent list of any agents that don't match the filter.
+ //
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ map<string, Agent> toDelete;
+ for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++)
+ if (!agentQuery.matchesPredicate(iter->second.getAttributes())) {
+ toDelete[iter->first] = iter->second;
+ if (iter->second.getName() == connectedBrokerAgent.getName())
+ connectedBrokerInAgentList = false;
+ }
+
+ for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) {
+ agents.erase(iter->first);
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_FILTER));
+ eventImpl->setAgent(iter->second);
+ enqueueEventLH(eventImpl.release());
+ }
+
+ if (!connectedBrokerInAgentList && connectedBrokerAgent.isValid() &&
+ agentQuery.matchesPredicate(connectedBrokerAgent.getAttributes())) {
+ agents[connectedBrokerAgent.getName()] = connectedBrokerAgent;
+ connectedBrokerInAgentList = true;
+
+ //
+ // Enqueue a notification of the new agent.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));
+ eventImpl->setAgent(connectedBrokerAgent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ }
+ }
+
+ //
+ // Broadcast an agent locate request with our new criteria.
+ //
+ if (opened)
+ sendAgentLocate();
+}
+
+
+void ConsoleSessionImpl::open()
+{
+ if (opened)
+ throw QmfException("The session is already open");
+
+ // If the thread exists, join and delete it before creating a new one.
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+
+ // Establish messaging addresses
+ directBase = "qmf." + domain + ".direct";
+ topicBase = "qmf." + domain + ".topic";
+
+ string myKey("direct-console." + qpid::types::Uuid(true).str());
+
+ replyAddress = Address(topicBase + "/" + myKey + ";{node:{type:topic}}");
+
+ // Create AMQP session, receivers, and senders
+ session = connection.createSession();
+ Receiver directRx = session.createReceiver(replyAddress);
+ Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating
+ if (!strictSecurity) {
+ Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
+ legacyRx.setCapacity(64);
+ directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ directSender.setCapacity(128);
+ }
+
+ directRx.setCapacity(64);
+ topicRx.setCapacity(128);
+
+ topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}");
+
+ topicSender.setCapacity(128);
+
+ // Start the receiver thread
+ threadCanceled = false;
+ opened = true;
+ thread = new qpid::sys::Thread(*this);
+
+ // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
+ sendBrokerLocate();
+ if (agentQuery)
+ sendAgentLocate();
+}
+
+
+void ConsoleSessionImpl::closeAsync()
+{
+ if (!opened)
+ throw QmfException("The session is already closed");
+
+ // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
+ threadCanceled = true;
+ opened = false;
+}
+
+
+void ConsoleSessionImpl::close()
+{
+ closeAsync();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ thread = 0;
+ }
+}
+
+
+bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
+{
+ uint64_t milliseconds = timeout.getMilliseconds();
+ qpid::sys::Mutex::ScopedLock l(lock);
+
+ if (eventQueue.empty() && milliseconds > 0) {
+ int64_t nsecs(qpid::sys::TIME_INFINITE);
+ if ((uint64_t)(nsecs / 1000000) > milliseconds)
+ nsecs = (int64_t) milliseconds * 1000000;
+ qpid::sys::Duration then(nsecs);
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+ }
+
+ if (!eventQueue.empty()) {
+ event = eventQueue.front();
+ eventQueue.pop();
+ if (eventQueue.empty())
+ alertEventNotifierLH(false);
+ return true;
+ }
+
+ return false;
+}
+
+
+int ConsoleSessionImpl::pendingEvents() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventQueue.size();
+}
+
+
+void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ eventNotifier = notifier;
+}
+
+
+EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventNotifier;
+}
+
+
+uint32_t ConsoleSessionImpl::getAgentCount() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return agents.size();
+}
+
+
+Agent ConsoleSessionImpl::getAgent(uint32_t i) const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ uint32_t count = 0;
+ for (map<string, Agent>::const_iterator iter = agents.begin(); iter != agents.end(); iter++)
+ if (count++ == i)
+ return iter->second;
+ throw IndexOutOfRange();
+}
+
+
+Subscription ConsoleSessionImpl::subscribe(const Query&, const string&, const string&)
+{
+ return Subscription();
+}
+
+
+Subscription ConsoleSessionImpl::subscribe(const string&, const string&, const string&)
+{
+ return Subscription();
+}
+
+
+void ConsoleSessionImpl::enqueueEvent(const ConsoleEvent& event)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ enqueueEventLH(event);
+}
+
+
+void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event)
+{
+ bool notify = eventQueue.empty();
+ eventQueue.push(event);
+ if (notify) {
+ cond.notify();
+ alertEventNotifierLH(true);
+ }
+}
+
+
+void ConsoleSessionImpl::dispatch(Message msg)
+{
+ const Variant::Map& properties(msg.getProperties());
+ Variant::Map::const_iterator iter;
+ Variant::Map::const_iterator oiter;
+
+ oiter = properties.find(protocol::HEADER_KEY_OPCODE);
+ iter = properties.find(protocol::HEADER_KEY_APP_ID);
+ if (iter == properties.end())
+ iter = properties.find("app_id");
+ if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF && oiter != properties.end()) {
+ //
+ // Dispatch a QMFv2 formatted message
+ //
+ const string& opcode = oiter->second.asString();
+
+ iter = properties.find(protocol::HEADER_KEY_AGENT);
+ if (iter == properties.end()) {
+ QPID_LOG(trace, "Message received with no 'qmf.agent' header");
+ return;
+ }
+ const string& agentName = iter->second.asString();
+
+ Agent agent;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ map<string, Agent>::iterator aIter = agents.find(agentName);
+ if (aIter != agents.end()) {
+ agent = aIter->second;
+ AgentImplAccess::get(agent).touch();
+ }
+ }
+
+ if (msg.getContentType() == "amqp/map" &&
+ (opcode == protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION || opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE)) {
+ //
+ // This is the one case where it's ok (necessary actually) to receive a QMFv2
+ // message from an unknown agent (how else are they going to get known?)
+ //
+ Variant::Map content;
+ decode(msg, content);
+ handleAgentUpdate(agentName, content, msg);
+ return;
+ }
+
+ if (!agent.isValid())
+ return;
+
+ AgentImpl& agentImpl(AgentImplAccess::get(agent));
+
+ if (msg.getContentType() == "amqp/map") {
+ Variant::Map content;
+ decode(msg, content);
+
+ if (opcode == protocol::HEADER_OPCODE_EXCEPTION) agentImpl.handleException(content, msg);
+ else if (opcode == protocol::HEADER_OPCODE_METHOD_RESPONSE) agentImpl.handleMethodResponse(content, msg);
+ else
+ QPID_LOG(error, "Received a map-formatted QMFv2 message with opcode=" << opcode);
+
+ return;
+ }
+
+ if (msg.getContentType() == "amqp/list") {
+ Variant::List content;
+ decode(msg, content);
+
+ if (opcode == protocol::HEADER_OPCODE_QUERY_RESPONSE) agentImpl.handleQueryResponse(content, msg);
+ else if (opcode == protocol::HEADER_OPCODE_DATA_INDICATION) agentImpl.handleDataIndication(content, msg);
+ else
+ QPID_LOG(error, "Received a list-formatted QMFv2 message with opcode=" << opcode);
+
+ return;
+ }
+ } else {
+ //
+ // Dispatch a QMFv1 formatted message
+ //
+ const string& body(msg.getContent());
+ if (body.size() < 8)
+ return;
+ qpid::management::Buffer buffer(const_cast<char*>(body.c_str()), body.size());
+
+ if (buffer.getOctet() != 'A') return;
+ if (buffer.getOctet() != 'M') return;
+ if (buffer.getOctet() != '2') return;
+ char v1Opcode(buffer.getOctet());
+ uint32_t seq(buffer.getLong());
+
+ if (v1Opcode == 's') handleV1SchemaResponse(buffer, seq, msg);
+ else {
+ QPID_LOG(trace, "Unknown or Unsupported QMFv1 opcode: " << v1Opcode);
+ }
+ }
+}
+
+
+void ConsoleSessionImpl::sendBrokerLocate()
+{
+ Message msg;
+ Variant::Map& headers(msg.getProperties());
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+
+ msg.setReplyTo(replyAddress);
+ msg.setCorrelationId("broker-locate");
+ msg.setSubject("broker");
+
+ Sender sender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ sender.send(msg);
+ sender.close();
+
+ QPID_LOG(trace, "SENT AgentLocate to broker");
+}
+
+
+void ConsoleSessionImpl::sendAgentLocate()
+{
+ Message msg;
+ Variant::Map& headers(msg.getProperties());
+ static const string subject("console.request.agent_locate");
+
+ headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
+ headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST;
+ headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF;
+
+ msg.setReplyTo(replyAddress);
+ msg.setCorrelationId("agent-locate");
+ msg.setSubject(subject);
+ encode(agentQuery.getPredicate(), msg);
+
+ topicSender.send(msg);
+
+ QPID_LOG(trace, "SENT AgentLocate to=" << topicSender.getName() << "/" << subject);
+}
+
+
+void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Variant::Map& content, const Message& msg)
+{
+ Variant::Map::const_iterator iter;
+ Agent agent;
+ uint32_t epoch(0);
+ string cid(msg.getCorrelationId());
+
+ iter = content.find("_values");
+ if (iter == content.end())
+ return;
+ const Variant::Map& in_attrs(iter->second.asMap());
+ Variant::Map attrs;
+
+ //
+ // Copy the map from the message to "attrs". Translate any old-style
+ // keys to their new key values in the process.
+ //
+ for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) {
+ if (iter->first == "epoch")
+ attrs[protocol::AGENT_ATTR_EPOCH] = iter->second;
+ else if (iter->first == "timestamp")
+ attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second;
+ else if (iter->first == "heartbeat_interval")
+ attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second;
+ else
+ attrs[iter->first] = iter->second;
+ }
+
+ iter = attrs.find(protocol::AGENT_ATTR_EPOCH);
+ if (iter != attrs.end())
+ epoch = iter->second.asUint32();
+
+ if (cid == "broker-locate") {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
+ for (iter = attrs.begin(); iter != attrs.end(); iter++)
+ if (iter->first != protocol::AGENT_ATTR_EPOCH)
+ impl->setAttribute(iter->first, iter->second);
+ agent = Agent(impl.release());
+ connectedBrokerAgent = agent;
+ if (!agentQuery || agentQuery.matchesPredicate(attrs)) {
+ connectedBrokerInAgentList = true;
+ agents[agentName] = agent;
+
+ //
+ // Enqueue a notification of the new agent.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ }
+ return;
+ }
+
+ //
+ // Check this agent against the agent filter. Exit if it doesn't match.
+ // (only if this isn't the connected broker agent)
+ //
+ if (agentQuery && (!agentQuery.matchesPredicate(attrs)))
+ return;
+
+ QPID_LOG(trace, "RCVD AgentHeartbeat from an agent matching our filter: " << agentName);
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ map<string, Agent>::iterator aIter = agents.find(agentName);
+ if (aIter == agents.end()) {
+ //
+ // This is a new agent. We have no current record of its existence.
+ //
+ auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
+ for (iter = attrs.begin(); iter != attrs.end(); iter++)
+ if (iter->first != protocol::AGENT_ATTR_EPOCH)
+ impl->setAttribute(iter->first, iter->second);
+ agent = Agent(impl.release());
+ agents[agentName] = agent;
+
+ //
+ // Enqueue a notification of the new agent.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ } else {
+ //
+ // This is a refresh of an agent we are already tracking.
+ //
+ bool detectedRestart(false);
+ agent = aIter->second;
+ AgentImpl& impl(AgentImplAccess::get(agent));
+ impl.touch();
+ if (impl.getEpoch() != epoch) {
+ //
+ // The agent has restarted since the last time we heard from it.
+ // Enqueue a notification.
+ //
+ impl.setEpoch(epoch);
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ detectedRestart = true;
+ }
+
+ iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP);
+ if (iter != attrs.end()) {
+ uint64_t ts(iter->second.asUint64());
+ if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) {
+ //
+ // The agent has added new schema entries since we last heard from it.
+ // Update the attribute and, if this doesn't accompany a restart, enqueue a notification.
+ //
+ if (!detectedRestart) {
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ }
+ impl.setAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP, iter->second);
+ }
+ }
+ }
+ }
+}
+
+
+void ConsoleSessionImpl::handleV1SchemaResponse(qpid::management::Buffer& buffer, uint32_t, const Message&)
+{
+ QPID_LOG(trace, "RCVD V1SchemaResponse");
+ Schema schema(new SchemaImpl(buffer));
+ schemaCache->declareSchema(schema);
+}
+
+
+void ConsoleSessionImpl::periodicProcessing(uint64_t seconds)
+{
+ //
+ // The granularity of this timer is seconds. Don't waste time looking for work if
+ // it's been less than a second since we last visited.
+ //
+ if (seconds == lastVisit)
+ return;
+ lastVisit = seconds;
+
+ //
+ // Handle the aging of agent records
+ //
+ if (lastAgePass == 0)
+ lastAgePass = seconds;
+ if (seconds - lastAgePass >= 60) {
+ lastAgePass = seconds;
+ map<string, Agent> toDelete;
+ qpid::sys::Mutex::ScopedLock l(lock);
+
+ for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++)
+ if ((iter->second.getName() != connectedBrokerAgent.getName()) &&
+ (AgentImplAccess::get(iter->second).age() > maxAgentAgeMinutes))
+ toDelete[iter->first] = iter->second;
+
+ for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) {
+ agents.erase(iter->first);
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_AGED));
+ eventImpl->setAgent(iter->second);
+ enqueueEventLH(eventImpl.release());
+ }
+ }
+}
+
+
+void ConsoleSessionImpl::alertEventNotifierLH(bool readable)
+{
+ if (eventNotifier)
+ eventNotifier->setReadable(readable);
+}
+
+
+void ConsoleSessionImpl::run()
+{
+ QPID_LOG(debug, "ConsoleSession thread started");
+
+ try {
+ while (!threadCanceled) {
+ periodicProcessing((uint64_t) qpid::sys::Duration::FromEpoch() /
+ qpid::sys::TIME_SEC);
+
+ Receiver rx;
+ bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
+ if (threadCanceled)
+ break;
+ if (valid) {
+ try {
+ dispatch(rx.fetch());
+ } catch (qpid::types::Exception& e) {
+ QPID_LOG(error, "Exception caught in message dispatch: " << e.what());
+ }
+ session.acknowledge();
+ }
+ }
+ } catch (qpid::types::Exception& e) {
+ QPID_LOG(error, "Exception caught in message thread - exiting: " << e.what());
+ enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED)));
+ }
+
+ session.close();
+ QPID_LOG(debug, "ConsoleSession thread exiting");
+}
+
+
+ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session)
+{
+ return *session.impl;
+}
+
+
+const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session)
+{
+ return *session.impl;
+}
diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
new file mode 100644
index 0000000000..2c06df030c
--- /dev/null
+++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
@@ -0,0 +1,127 @@
+#ifndef _QMF_CONSOLE_SESSION_IMPL_H_
+#define _QMF_CONSOLE_SESSION_IMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/ConsoleSession.h"
+#include "qmf/AgentImpl.h"
+#include "qmf/SchemaId.h"
+#include "qmf/Schema.h"
+#include "qmf/ConsoleEventImpl.h"
+#include "qmf/EventNotifierImpl.h"
+#include "qmf/SchemaCache.h"
+#include "qmf/Query.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/management/Buffer.h"
+#include "qpid/types/Variant.h"
+
+#include <boost/shared_ptr.hpp>
+#include <map>
+#include <queue>
+
+namespace qmf {
+ class ConsoleSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
+ public:
+ ~ConsoleSessionImpl();
+
+ //
+ // Methods from API handle
+ //
+ ConsoleSessionImpl(qpid::messaging::Connection& c, const std::string& o);
+ void setDomain(const std::string& d) { domain = d; }
+ void setAgentFilter(const std::string& f);
+ void open();
+ void closeAsync();
+ void close();
+ bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t);
+ int pendingEvents() const;
+
+ void setEventNotifier(EventNotifierImpl* notifier);
+ EventNotifierImpl* getEventNotifier() const;
+
+ uint32_t getAgentCount() const;
+ Agent getAgent(uint32_t i) const;
+ Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; }
+ Subscription subscribe(const Query&, const std::string& agentFilter, const std::string& options);
+ Subscription subscribe(const std::string&, const std::string& agentFilter, const std::string& options);
+
+ protected:
+ mutable qpid::sys::Mutex lock;
+ qpid::sys::Condition cond;
+ qpid::messaging::Connection connection;
+ qpid::messaging::Session session;
+ qpid::messaging::Sender directSender;
+ qpid::messaging::Sender topicSender;
+ std::string domain;
+ uint32_t maxAgentAgeMinutes;
+ bool listenOnDirect;
+ bool strictSecurity;
+ uint32_t maxThreadWaitTime;
+ Query agentQuery;
+ bool opened;
+ std::queue<ConsoleEvent> eventQueue;
+ EventNotifierImpl* eventNotifier;
+ qpid::sys::Thread* thread;
+ bool threadCanceled;
+ uint64_t lastVisit;
+ uint64_t lastAgePass;
+ std::map<std::string, Agent> agents;
+ Agent connectedBrokerAgent;
+ bool connectedBrokerInAgentList;
+ qpid::messaging::Address replyAddress;
+ std::string directBase;
+ std::string topicBase;
+ boost::shared_ptr<SchemaCache> schemaCache;
+ qpid::sys::Mutex corrlock;
+ uint32_t nextCorrelator;
+
+ void enqueueEvent(const ConsoleEvent&);
+ void enqueueEventLH(const ConsoleEvent&);
+ void dispatch(qpid::messaging::Message);
+ void sendBrokerLocate();
+ void sendAgentLocate();
+ void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&);
+ void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&);
+ void periodicProcessing(uint64_t);
+ void alertEventNotifierLH(bool readable);
+ void run();
+ uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; }
+
+ friend class AgentImpl;
+ };
+
+ struct ConsoleSessionImplAccess {
+ static ConsoleSessionImpl& get(ConsoleSession& session);
+ static const ConsoleSessionImpl& get(const ConsoleSession& session);
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/Data.cpp b/qpid/cpp/src/qmf/Data.cpp
new file mode 100644
index 0000000000..c503bab445
--- /dev/null
+++ b/qpid/cpp/src/qmf/Data.cpp
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/DataImpl.h"
+#include "qmf/DataAddrImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/SchemaIdImpl.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/SchemaProperty.h"
+
+using namespace std;
+using namespace qmf;
+using qpid::types::Variant;
+
+typedef PrivateImplRef<Data> PI;
+
+Data::Data(DataImpl* impl) { PI::ctor(*this, impl); }
+Data::Data(const Data& s) : qmf::Handle<DataImpl>() { PI::copy(*this, s); }
+Data::~Data() { PI::dtor(*this); }
+Data& Data::operator=(const Data& s) { return PI::assign(*this, s); }
+
+Data::Data(const Schema& s) { PI::ctor(*this, new DataImpl(s)); }
+void Data::setAddr(const DataAddr& a) { impl->setAddr(a); }
+void Data::setProperty(const string& k, const qpid::types::Variant& v) { impl->setProperty(k, v); }
+void Data::overwriteProperties(const qpid::types::Variant::Map& m) { impl->overwriteProperties(m); }
+bool Data::hasSchema() const { return impl->hasSchema(); }
+bool Data::hasAddr() const { return impl->hasAddr(); }
+const SchemaId& Data::getSchemaId() const { return impl->getSchemaId(); }
+const DataAddr& Data::getAddr() const { return impl->getAddr(); }
+const Variant& Data::getProperty(const string& k) const { return impl->getProperty(k); }
+const Variant::Map& Data::getProperties() const { return impl->getProperties(); }
+bool Data::hasAgent() const { return impl->hasAgent(); }
+const Agent& Data::getAgent() const { return impl->getAgent(); }
+
+
+void DataImpl::overwriteProperties(const Variant::Map& m) {
+ for (Variant::Map::const_iterator iter = m.begin(); iter != m.end(); iter++)
+ properties[iter->first] = iter->second;
+}
+
+const Variant& DataImpl::getProperty(const string& k) const {
+ Variant::Map::const_iterator iter = properties.find(k);
+ if (iter == properties.end())
+ throw KeyNotFound(k);
+ return iter->second;
+}
+
+
+DataImpl::DataImpl(const qpid::types::Variant::Map& map, const Agent& a)
+{
+ Variant::Map::const_iterator iter;
+
+ agent = a;
+
+ iter = map.find("_values");
+ if (iter != map.end())
+ properties = iter->second.asMap();
+
+ iter = map.find("_object_id");
+ if (iter != map.end())
+ dataAddr = DataAddr(new DataAddrImpl(iter->second.asMap()));
+
+ iter = map.find("_schema_id");
+ if (iter != map.end())
+ schemaId = SchemaId(new SchemaIdImpl(iter->second.asMap()));
+}
+
+
+Variant::Map DataImpl::asMap() const
+{
+ Variant::Map result;
+
+ result["_values"] = properties;
+
+ if (hasAddr()) {
+ const DataAddrImpl& aImpl(DataAddrImplAccess::get(getAddr()));
+ result["_object_id"] = aImpl.asMap();
+ }
+
+ if (hasSchema()) {
+ const SchemaIdImpl& sImpl(SchemaIdImplAccess::get(getSchemaId()));
+ result["_schema_id"] = sImpl.asMap();
+ }
+
+ return result;
+}
+
+
+void DataImpl::setProperty(const std::string& k, const qpid::types::Variant& v)
+{
+ if (schema.isValid()) {
+ //
+ // If we have a valid schema, make sure that the property is included in the
+ // schema and that the variant type is compatible with the schema type.
+ //
+ if (!SchemaImplAccess::get(schema).isValidProperty(k, v))
+ throw QmfException("Property '" + k + "' either not in the schema or value is of incompatible type");
+ }
+ properties[k] = v;
+}
+
+
+DataImpl& DataImplAccess::get(Data& item)
+{
+ return *item.impl;
+}
+
+
+const DataImpl& DataImplAccess::get(const Data& item)
+{
+ return *item.impl;
+}
diff --git a/qpid/cpp/src/qmf/DataAddr.cpp b/qpid/cpp/src/qmf/DataAddr.cpp
new file mode 100644
index 0000000000..08b64d5b5d
--- /dev/null
+++ b/qpid/cpp/src/qmf/DataAddr.cpp
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/DataAddrImpl.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/DataAddr.h"
+#include <iostream>
+
+using namespace std;
+using namespace qmf;
+using qpid::types::Variant;
+
+typedef PrivateImplRef<DataAddr> PI;
+
+DataAddr::DataAddr(DataAddrImpl* impl) { PI::ctor(*this, impl); }
+DataAddr::DataAddr(const DataAddr& s) : qmf::Handle<DataAddrImpl>() { PI::copy(*this, s); }
+DataAddr::~DataAddr() { PI::dtor(*this); }
+DataAddr& DataAddr::operator=(const DataAddr& s) { return PI::assign(*this, s); }
+
+bool DataAddr::operator==(const DataAddr& o) { return *impl == *o.impl; }
+bool DataAddr::operator==(const DataAddr& o) const { return *impl == *o.impl; }
+bool DataAddr::operator<(const DataAddr& o) { return *impl < *o.impl; }
+bool DataAddr::operator<(const DataAddr& o) const { return *impl < *o.impl; }
+
+DataAddr::DataAddr(const qpid::types::Variant::Map& m) { PI::ctor(*this, new DataAddrImpl(m)); }
+DataAddr::DataAddr(const string& n, const string& a, uint32_t e) { PI::ctor(*this, new DataAddrImpl(n, a, e)); }
+const string& DataAddr::getName() const { return impl->getName(); }
+const string& DataAddr::getAgentName() const { return impl->getAgentName(); }
+uint32_t DataAddr::getAgentEpoch() const { return impl->getAgentEpoch(); }
+Variant::Map DataAddr::asMap() const { return impl->asMap(); }
+
+bool DataAddrImpl::operator==(const DataAddrImpl& other) const
+{
+ return
+ agentName == other.agentName &&
+ name == other.name &&
+ agentEpoch == other.agentEpoch;
+}
+
+
+bool DataAddrImpl::operator<(const DataAddrImpl& other) const
+{
+ if (agentName < other.agentName) return true;
+ if (agentName > other.agentName) return false;
+ if (name < other.name) return true;
+ if (name > other.name) return false;
+ return agentEpoch < other.agentEpoch;
+}
+
+
+DataAddrImpl::DataAddrImpl(const Variant::Map& map) : agentEpoch(0)
+{
+ Variant::Map::const_iterator iter;
+
+ iter = map.find("_agent_name");
+ if (iter != map.end())
+ agentName = iter->second.asString();
+
+ iter = map.find("_object_name");
+ if (iter != map.end())
+ name = iter->second.asString();
+
+ iter = map.find("_agent_epoch");
+ if (iter != map.end())
+ agentEpoch = (uint32_t) iter->second.asUint64();
+}
+
+
+Variant::Map DataAddrImpl::asMap() const
+{
+ Variant::Map result;
+
+ result["_agent_name"] = agentName;
+ result["_object_name"] = name;
+ if (agentEpoch > 0)
+ result["_agent_epoch"] = agentEpoch;
+ return result;
+}
+
+
+DataAddrImpl& DataAddrImplAccess::get(DataAddr& item)
+{
+ return *item.impl;
+}
+
+
+const DataAddrImpl& DataAddrImplAccess::get(const DataAddr& item)
+{
+ return *item.impl;
+}
diff --git a/qpid/cpp/src/qmf/DataAddrImpl.h b/qpid/cpp/src/qmf/DataAddrImpl.h
new file mode 100644
index 0000000000..11d512f0c4
--- /dev/null
+++ b/qpid/cpp/src/qmf/DataAddrImpl.h
@@ -0,0 +1,73 @@
+#ifndef _QMF_DATA_ADDR_IMPL_H_
+#define _QMF_DATA_ADDR_IMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/types/Variant.h"
+#include "qmf/DataAddr.h"
+
+namespace qmf {
+ class DataAddrImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Impl-only methods
+ //
+ void setName(const std::string& n) { name = n; }
+ void setAgent(const std::string& n, uint32_t e=0) { agentName = n; agentEpoch = e; }
+
+ //
+ // Methods from API handle
+ //
+ bool operator==(const DataAddrImpl&) const;
+ bool operator<(const DataAddrImpl&) const;
+ DataAddrImpl(const qpid::types::Variant::Map&);
+ DataAddrImpl(const std::string& _name, const std::string& _agentName, uint32_t _agentEpoch=0) :
+ agentName(_agentName), name(_name), agentEpoch(_agentEpoch) {}
+ const std::string& getName() const { return name; }
+ const std::string& getAgentName() const { return agentName; }
+ uint32_t getAgentEpoch() const { return agentEpoch; }
+ qpid::types::Variant::Map asMap() const;
+
+ private:
+ std::string agentName;
+ std::string name;
+ uint32_t agentEpoch;
+ };
+
+ struct DataAddrImplAccess
+ {
+ static DataAddrImpl& get(DataAddr&);
+ static const DataAddrImpl& get(const DataAddr&);
+ };
+
+ struct DataAddrCompare {
+ bool operator() (const DataAddr& lhs, const DataAddr& rhs) const
+ {
+ if (lhs.getName() != rhs.getName())
+ return lhs.getName() < rhs.getName();
+ return lhs.getAgentName() < rhs.getAgentName();
+ }
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/DataImpl.h b/qpid/cpp/src/qmf/DataImpl.h
new file mode 100644
index 0000000000..4ac3197da0
--- /dev/null
+++ b/qpid/cpp/src/qmf/DataImpl.h
@@ -0,0 +1,84 @@
+#ifndef _QMF_DATA_IMPL_H_
+#define _QMF_DATA_IMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/Data.h"
+#include "qmf/SchemaId.h"
+#include "qmf/Schema.h"
+#include "qmf/DataAddr.h"
+#include "qmf/Agent.h"
+#include "qmf/AgentSubscription.h"
+#include "qpid/types/Variant.h"
+
+namespace qmf {
+ class DataImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Public impl-only methods
+ //
+ DataImpl(const qpid::types::Variant::Map&, const Agent&);
+ qpid::types::Variant::Map asMap() const;
+ DataImpl() {}
+ void addSubscription(boost::shared_ptr<AgentSubscription>);
+ void delSubscription(uint64_t);
+ qpid::types::Variant::Map publishSubscription(uint64_t);
+ const Schema& getSchema() const { return schema; }
+
+ //
+ // Methods from API handle
+ //
+ DataImpl(const Schema& s) : schema(s) {}
+ void setAddr(const DataAddr& a) { dataAddr = a; }
+ void setProperty(const std::string& k, const qpid::types::Variant& v);
+ void overwriteProperties(const qpid::types::Variant::Map& m);
+ bool hasSchema() const { return schemaId.isValid() || schema.isValid(); }
+ bool hasAddr() const { return dataAddr.isValid(); }
+ const SchemaId& getSchemaId() const { if (schema.isValid()) return schema.getSchemaId(); else return schemaId; }
+ const DataAddr& getAddr() const { return dataAddr; }
+ const qpid::types::Variant& getProperty(const std::string& k) const;
+ const qpid::types::Variant::Map& getProperties() const { return properties; }
+ bool hasAgent() const { return agent.isValid(); }
+ const Agent& getAgent() const { return agent; }
+
+ private:
+ struct Subscr {
+ boost::shared_ptr<AgentSubscription> subscription;
+ qpid::types::Variant::Map deltas;
+ };
+ std::map<uint64_t, boost::shared_ptr<Subscr> > subscriptions;
+
+ SchemaId schemaId;
+ Schema schema;
+ DataAddr dataAddr;
+ qpid::types::Variant::Map properties;
+ Agent agent;
+ };
+
+ struct DataImplAccess
+ {
+ static DataImpl& get(Data&);
+ static const DataImpl& get(const Data&);
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/EventNotifierImpl.cpp b/qpid/cpp/src/qmf/EventNotifierImpl.cpp
new file mode 100644
index 0000000000..81b6d637a3
--- /dev/null
+++ b/qpid/cpp/src/qmf/EventNotifierImpl.cpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/EventNotifierImpl.h"
+#include "qmf/AgentSessionImpl.h"
+#include "qmf/ConsoleSessionImpl.h"
+
+namespace qmf {
+
+EventNotifierImpl::EventNotifierImpl(AgentSession& agentSession)
+ : readable(false), agent(agentSession)
+{
+ AgentSessionImplAccess::get(agent).setEventNotifier(this);
+}
+
+
+EventNotifierImpl::EventNotifierImpl(ConsoleSession& consoleSession)
+ : readable(false), console(consoleSession)
+{
+ ConsoleSessionImplAccess::get(console).setEventNotifier(this);
+}
+
+
+EventNotifierImpl::~EventNotifierImpl()
+{
+ if (agent.isValid())
+ AgentSessionImplAccess::get(agent).setEventNotifier(NULL);
+ if (console.isValid())
+ ConsoleSessionImplAccess::get(console).setEventNotifier(NULL);
+}
+
+void EventNotifierImpl::setReadable(bool readable)
+{
+ update(readable);
+ this->readable = readable;
+}
+
+
+bool EventNotifierImpl::isReadable() const
+{
+ return this->readable;
+}
+
+}
diff --git a/qpid/cpp/src/qmf/EventNotifierImpl.h b/qpid/cpp/src/qmf/EventNotifierImpl.h
new file mode 100644
index 0000000000..d85f9979d2
--- /dev/null
+++ b/qpid/cpp/src/qmf/EventNotifierImpl.h
@@ -0,0 +1,48 @@
+#ifndef __QMF_EVENT_NOTIFIER_IMPL_H
+#define __QMF_EVENT_NOTIFIER_IMPL_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/AgentSession.h"
+#include "qmf/ConsoleSession.h"
+
+namespace qmf
+{
+ class EventNotifierImpl {
+ private:
+ bool readable;
+ AgentSession agent;
+ ConsoleSession console;
+
+ public:
+ EventNotifierImpl(AgentSession& agentSession);
+ EventNotifierImpl(ConsoleSession& consoleSession);
+ virtual ~EventNotifierImpl();
+
+ void setReadable(bool readable);
+ bool isReadable() const;
+
+ protected:
+ virtual void update(bool readable) = 0;
+ };
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/Expression.cpp b/qpid/cpp/src/qmf/Expression.cpp
new file mode 100644
index 0000000000..7d48678c15
--- /dev/null
+++ b/qpid/cpp/src/qmf/Expression.cpp
@@ -0,0 +1,441 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/exceptions.h"
+#include "qmf/Expression.h"
+#include <iostream>
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::types;
+
+Expression::Expression(const Variant::List& expr)
+{
+ static int level(0);
+ level++;
+ Variant::List::const_iterator iter(expr.begin());
+ string op(iter->asString());
+ iter++;
+
+ if (op == "not") logicalOp = LOGICAL_NOT;
+ else if (op == "and") logicalOp = LOGICAL_AND;
+ else if (op == "or") logicalOp = LOGICAL_OR;
+ else {
+ logicalOp = LOGICAL_ID;
+ if (op == "eq") boolOp = BOOL_EQ;
+ else if (op == "ne") boolOp = BOOL_NE;
+ else if (op == "lt") boolOp = BOOL_LT;
+ else if (op == "le") boolOp = BOOL_LE;
+ else if (op == "gt") boolOp = BOOL_GT;
+ else if (op == "ge") boolOp = BOOL_GE;
+ else if (op == "re_match") boolOp = BOOL_RE_MATCH;
+ else if (op == "exists") boolOp = BOOL_EXISTS;
+ else if (op == "true") boolOp = BOOL_TRUE;
+ else if (op == "false") boolOp = BOOL_FALSE;
+ else
+ throw QmfException("Invalid operator in predicate expression");
+ }
+
+ if (logicalOp == LOGICAL_ID) {
+ switch (boolOp) {
+ case BOOL_EQ:
+ case BOOL_NE:
+ case BOOL_LT:
+ case BOOL_LE:
+ case BOOL_GT:
+ case BOOL_GE:
+ case BOOL_RE_MATCH:
+ //
+ // Binary operator: get two operands.
+ //
+ operandCount = 2;
+ break;
+
+ case BOOL_EXISTS:
+ //
+ // Unary operator: get one operand.
+ //
+ operandCount = 1;
+ break;
+
+ case BOOL_TRUE:
+ case BOOL_FALSE:
+ //
+ // Literal operator: no operands.
+ //
+ operandCount = 0;
+ break;
+ }
+
+ for (int idx = 0; idx < operandCount; idx++) {
+ if (iter == expr.end())
+ throw QmfException("Too few operands for operation: " + op);
+ if (iter->getType() == VAR_STRING) {
+ quoted[idx] = false;
+ operands[idx] = *iter;
+ } else if (iter->getType() == VAR_LIST) {
+ const Variant::List& sublist(iter->asList());
+ Variant::List::const_iterator subIter(sublist.begin());
+ if (subIter != sublist.end() && subIter->asString() == "quote") {
+ quoted[idx] = true;
+ subIter++;
+ if (subIter != sublist.end()) {
+ operands[idx] = *subIter;
+ subIter++;
+ if (subIter != sublist.end())
+ throw QmfException("Extra tokens at end of 'quote'");
+ }
+ } else
+ throw QmfException("Expected '[quote, <token>]'");
+ } else
+ throw QmfException("Expected string or list as operand for: " + op);
+ iter++;
+ }
+
+ if (iter != expr.end())
+ throw QmfException("Too many operands for operation: " + op);
+
+ } else {
+ //
+ // This is a logical expression, collect sub-expressions
+ //
+ while (iter != expr.end()) {
+ if (iter->getType() != VAR_LIST)
+ throw QmfException("Operands of " + op + " must be lists");
+ expressionList.push_back(boost::shared_ptr<Expression>(new Expression(iter->asList())));
+ iter++;
+ }
+ }
+ level--;
+}
+
+
+bool Expression::evaluate(const Variant::Map& data) const
+{
+ list<boost::shared_ptr<Expression> >::const_iterator iter;
+
+ switch (logicalOp) {
+ case LOGICAL_ID:
+ return boolEval(data);
+
+ case LOGICAL_NOT:
+ for (iter = expressionList.begin(); iter != expressionList.end(); iter++)
+ if ((*iter)->evaluate(data))
+ return false;
+ return true;
+
+ case LOGICAL_AND:
+ for (iter = expressionList.begin(); iter != expressionList.end(); iter++)
+ if (!(*iter)->evaluate(data))
+ return false;
+ return true;
+
+ case LOGICAL_OR:
+ for (iter = expressionList.begin(); iter != expressionList.end(); iter++)
+ if ((*iter)->evaluate(data))
+ return true;
+ return false;
+ }
+
+ return false;
+}
+
+
+bool Expression::boolEval(const Variant::Map& data) const
+{
+ Variant val[2];
+ bool exists[2];
+
+ for (int idx = 0; idx < operandCount; idx++) {
+ if (quoted[idx]) {
+ exists[idx] = true;
+ val[idx] = operands[idx];
+ } else {
+ Variant::Map::const_iterator mIter(data.find(operands[idx].asString()));
+ if (mIter == data.end()) {
+ exists[idx] = false;
+ } else {
+ exists[idx] = true;
+ val[idx] = mIter->second;
+ }
+ }
+ }
+
+ switch (boolOp) {
+ case BOOL_EQ: return (exists[0] && exists[1] && (val[0].asString() == val[1].asString()));
+ case BOOL_NE: return (exists[0] && exists[1] && (val[0].asString() != val[1].asString()));
+ case BOOL_LT: return (exists[0] && exists[1] && lessThan(val[0], val[1]));
+ case BOOL_LE: return (exists[0] && exists[1] && lessEqual(val[0], val[1]));
+ case BOOL_GT: return (exists[0] && exists[1] && greaterThan(val[0], val[1]));
+ case BOOL_GE: return (exists[0] && exists[1] && greaterEqual(val[0], val[1]));
+ case BOOL_RE_MATCH: return false; // TODO
+ case BOOL_EXISTS: return exists[0];
+ case BOOL_TRUE: return true;
+ case BOOL_FALSE: return false;
+ }
+
+ return false;
+}
+
+bool Expression::lessThan(const Variant& left, const Variant& right) const
+{
+ switch (left.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ return left.asInt64() < right.asInt64();
+ case VAR_STRING:
+ try {
+ return left.asInt64() < right.asInt64();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ switch (right.getType()) {
+ case VAR_FLOAT: case VAR_DOUBLE:
+ return left.asDouble() < right.asDouble();
+ case VAR_STRING:
+ try {
+ return left.asDouble() < right.asDouble();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_STRING:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ try {
+ return left.asInt64() < right.asInt64();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ try {
+ return left.asDouble() < right.asDouble();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_STRING:
+ return left.asString() < right.asString();
+ default:
+ break;
+ }
+ default:
+ break;
+ }
+
+ return false;
+}
+
+
+bool Expression::lessEqual(const Variant& left, const Variant& right) const
+{
+ switch (left.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ return left.asInt64() <= right.asInt64();
+ case VAR_STRING:
+ try {
+ return left.asInt64() <= right.asInt64();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ switch (right.getType()) {
+ case VAR_FLOAT: case VAR_DOUBLE:
+ return left.asDouble() <= right.asDouble();
+ case VAR_STRING:
+ try {
+ return left.asDouble() <= right.asDouble();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_STRING:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ try {
+ return left.asInt64() <= right.asInt64();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ try {
+ return left.asDouble() <= right.asDouble();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_STRING:
+ return left.asString() <= right.asString();
+ default:
+ break;
+ }
+ default:
+ break;
+ }
+
+ return false;
+}
+
+
+bool Expression::greaterThan(const Variant& left, const Variant& right) const
+{
+ switch (left.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ return left.asInt64() > right.asInt64();
+ case VAR_STRING:
+ try {
+ return left.asInt64() > right.asInt64();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ switch (right.getType()) {
+ case VAR_FLOAT: case VAR_DOUBLE:
+ return left.asDouble() > right.asDouble();
+ case VAR_STRING:
+ try {
+ return left.asDouble() > right.asDouble();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_STRING:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ try {
+ return left.asInt64() > right.asInt64();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ try {
+ return left.asDouble() > right.asDouble();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_STRING:
+ return left.asString() > right.asString();
+ default:
+ break;
+ }
+ default:
+ break;
+ }
+
+ return false;
+}
+
+
+bool Expression::greaterEqual(const Variant& left, const Variant& right) const
+{
+ switch (left.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ return left.asInt64() >= right.asInt64();
+ case VAR_STRING:
+ try {
+ return left.asInt64() >= right.asInt64();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ switch (right.getType()) {
+ case VAR_FLOAT: case VAR_DOUBLE:
+ return left.asDouble() >= right.asDouble();
+ case VAR_STRING:
+ try {
+ return left.asDouble() >= right.asDouble();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_STRING:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ try {
+ return left.asInt64() >= right.asInt64();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ try {
+ return left.asDouble() >= right.asDouble();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_STRING:
+ return left.asString() >= right.asString();
+ default:
+ break;
+ }
+ default:
+ break;
+ }
+
+ return false;
+}
+
+
diff --git a/qpid/cpp/src/qmf/Expression.h b/qpid/cpp/src/qmf/Expression.h
new file mode 100644
index 0000000000..6fbfdbc4ba
--- /dev/null
+++ b/qpid/cpp/src/qmf/Expression.h
@@ -0,0 +1,73 @@
+#ifndef _QMF_EXPRESSION_H_
+#define _QMF_EXPRESSION_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/types/Variant.h"
+#include <string>
+#include <list>
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+
+ enum LogicalOp {
+ LOGICAL_ID = 1,
+ LOGICAL_NOT = 2,
+ LOGICAL_AND = 3,
+ LOGICAL_OR = 4
+ };
+
+ enum BooleanOp {
+ BOOL_EQ = 1,
+ BOOL_NE = 2,
+ BOOL_LT = 3,
+ BOOL_LE = 4,
+ BOOL_GT = 5,
+ BOOL_GE = 6,
+ BOOL_RE_MATCH = 7,
+ BOOL_EXISTS = 8,
+ BOOL_TRUE = 9,
+ BOOL_FALSE = 10
+ };
+
+ class Expression {
+ public:
+ Expression(const qpid::types::Variant::List& expr);
+ bool evaluate(const qpid::types::Variant::Map& data) const;
+ private:
+ LogicalOp logicalOp;
+ BooleanOp boolOp;
+ int operandCount;
+ qpid::types::Variant operands[2];
+ bool quoted[2];
+ std::list<boost::shared_ptr<Expression> > expressionList;
+
+ bool boolEval(const qpid::types::Variant::Map& data) const;
+ bool lessThan(const qpid::types::Variant& left, const qpid::types::Variant& right) const;
+ bool lessEqual(const qpid::types::Variant& left, const qpid::types::Variant& right) const;
+ bool greaterThan(const qpid::types::Variant& left, const qpid::types::Variant& right) const;
+ bool greaterEqual(const qpid::types::Variant& left, const qpid::types::Variant& right) const;
+ };
+
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/Hash.cpp b/qpid/cpp/src/qmf/Hash.cpp
new file mode 100644
index 0000000000..86738dda2f
--- /dev/null
+++ b/qpid/cpp/src/qmf/Hash.cpp
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/Hash.h"
+
+using namespace qmf;
+
+Hash::Hash()
+{
+ data[0] = 0x5A5A5A5A5A5A5A5ALL;
+ data[1] = 0x5A5A5A5A5A5A5A5ALL;
+}
+
+void Hash::update(const char* s, uint32_t len)
+{
+ uint64_t* first = &data[0];
+ uint64_t* second = &data[1];
+
+ for (uint32_t idx = 0; idx < len; idx++) {
+ uint64_t recycle = ((*second & 0xff00000000000000LL) >> 56);
+ *second = *second << 8;
+ *second |= ((*first & 0xFF00000000000000LL) >> 56);
+ *first = *first << 8;
+ *first = *first + (uint64_t) s[idx] + recycle;
+ }
+}
+
diff --git a/qpid/cpp/src/qmf/Hash.h b/qpid/cpp/src/qmf/Hash.h
new file mode 100644
index 0000000000..4bd76832aa
--- /dev/null
+++ b/qpid/cpp/src/qmf/Hash.h
@@ -0,0 +1,44 @@
+#ifndef QMF_HASH_H
+#define QMF_HASH_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/types/Uuid.h"
+#include <string>
+
+namespace qmf {
+ class Hash {
+ public:
+ Hash();
+ qpid::types::Uuid asUuid() const { return qpid::types::Uuid((const unsigned char*) data); }
+ void update(const char* s, uint32_t len);
+ void update(uint8_t v) { update((char*) &v, sizeof(v)); }
+ void update(uint32_t v) { update((char*) &v, sizeof(v)); }
+ void update(const std::string& v) { update(const_cast<char*>(v.c_str()), v.size()); }
+ void update(bool v) { update(uint8_t(v ? 1 : 0)); }
+
+ private:
+ uint64_t data[2];
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/PosixEventNotifier.cpp b/qpid/cpp/src/qmf/PosixEventNotifier.cpp
new file mode 100644
index 0000000000..a364cc155d
--- /dev/null
+++ b/qpid/cpp/src/qmf/PosixEventNotifier.cpp
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/posix/EventNotifier.h"
+#include "qmf/PosixEventNotifierImpl.h"
+#include "qmf/PrivateImplRef.h"
+
+using namespace qmf;
+using namespace std;
+
+typedef qmf::PrivateImplRef<posix::EventNotifier> PI;
+
+posix::EventNotifier::EventNotifier(PosixEventNotifierImpl* impl) { PI::ctor(*this, impl); }
+
+posix::EventNotifier::EventNotifier(AgentSession& agentSession)
+{
+ PI::ctor(*this, new PosixEventNotifierImpl(agentSession));
+}
+
+
+posix::EventNotifier::EventNotifier(ConsoleSession& consoleSession)
+{
+ PI::ctor(*this, new PosixEventNotifierImpl(consoleSession));
+}
+
+
+posix::EventNotifier::EventNotifier(const posix::EventNotifier& that)
+ : Handle<PosixEventNotifierImpl>()
+{
+ PI::copy(*this, that);
+}
+
+
+posix::EventNotifier::~EventNotifier()
+{
+ PI::dtor(*this);
+}
+
+posix::EventNotifier& posix::EventNotifier::operator=(const posix::EventNotifier& that)
+{
+ return PI::assign(*this, that);
+}
+
+
+int posix::EventNotifier::getHandle() const
+{
+ return impl->getHandle();
+}
+
diff --git a/qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp b/qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp
new file mode 100644
index 0000000000..011dbcc214
--- /dev/null
+++ b/qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "PosixEventNotifierImpl.h"
+#include "qpid/log/Statement.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+
+#define BUFFER_SIZE 10
+
+using namespace qmf;
+
+PosixEventNotifierImpl::PosixEventNotifierImpl(AgentSession& agentSession)
+ : EventNotifierImpl(agentSession)
+{
+ openHandle();
+}
+
+
+PosixEventNotifierImpl::PosixEventNotifierImpl(ConsoleSession& consoleSession)
+ : EventNotifierImpl(consoleSession)
+{
+ openHandle();
+}
+
+
+PosixEventNotifierImpl::~PosixEventNotifierImpl()
+{
+ closeHandle();
+}
+
+
+void PosixEventNotifierImpl::update(bool readable)
+{
+ char buffer[BUFFER_SIZE];
+
+ if(readable && !this->isReadable()) {
+ if (::write(myHandle, "1", 1) == -1)
+ QPID_LOG(error, "PosixEventNotifierImpl::update write failed: " << errno);
+ }
+ else if(!readable && this->isReadable()) {
+ if (::read(yourHandle, buffer, BUFFER_SIZE) == -1)
+ QPID_LOG(error, "PosixEventNotifierImpl::update read failed: " << errno);
+ }
+}
+
+
+void PosixEventNotifierImpl::openHandle()
+{
+ int pair[2];
+
+ if(::pipe(pair) == -1)
+ throw QmfException("Unable to open event notifier handle.");
+
+ yourHandle = pair[0];
+ myHandle = pair[1];
+
+ int flags;
+
+ flags = ::fcntl(yourHandle, F_GETFL);
+ if((::fcntl(yourHandle, F_SETFL, flags | O_NONBLOCK)) == -1)
+ throw QmfException("Unable to make remote handle non-blocking.");
+
+ flags = ::fcntl(myHandle, F_GETFL);
+ if((::fcntl(myHandle, F_SETFL, flags | O_NONBLOCK)) == -1)
+ throw QmfException("Unable to make local handle non-blocking.");
+}
+
+
+void PosixEventNotifierImpl::closeHandle()
+{
+ if(myHandle > 0) {
+ ::close(myHandle);
+ myHandle = -1;
+ }
+
+ if(yourHandle > 0) {
+ ::close(yourHandle);
+ yourHandle = -1;
+ }
+}
+
+
+PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(posix::EventNotifier& notifier)
+{
+ return *notifier.impl;
+}
+
+
+const PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(const posix::EventNotifier& notifier)
+{
+ return *notifier.impl;
+}
+
diff --git a/qpid/cpp/src/qmf/PosixEventNotifierImpl.h b/qpid/cpp/src/qmf/PosixEventNotifierImpl.h
new file mode 100644
index 0000000000..c8a7446bd5
--- /dev/null
+++ b/qpid/cpp/src/qmf/PosixEventNotifierImpl.h
@@ -0,0 +1,61 @@
+#ifndef __QMF_POSIX_EVENT_NOTIFIER_IMPL_H
+#define __QMF_POSIX_EVENT_NOTIFIER_IMPL_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/posix/EventNotifier.h"
+#include "qmf/EventNotifierImpl.h"
+#include "qpid/RefCounted.h"
+
+namespace qmf
+{
+ class AgentSession;
+ class ConsoleSession;
+
+ class PosixEventNotifierImpl : public EventNotifierImpl, public virtual qpid::RefCounted
+ {
+ public:
+ PosixEventNotifierImpl(AgentSession& agentSession);
+ PosixEventNotifierImpl(ConsoleSession& consoleSession);
+ virtual ~PosixEventNotifierImpl();
+
+ int getHandle() const { return yourHandle; }
+
+ private:
+ int myHandle;
+ int yourHandle;
+
+ void openHandle();
+ void closeHandle();
+
+ protected:
+ void update(bool readable);
+ };
+
+ struct PosixEventNotifierImplAccess
+ {
+ static PosixEventNotifierImpl& get(posix::EventNotifier& notifier);
+ static const PosixEventNotifierImpl& get(const posix::EventNotifier& notifier);
+ };
+
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/PrivateImplRef.h b/qpid/cpp/src/qmf/PrivateImplRef.h
new file mode 100644
index 0000000000..c0c07d7e1b
--- /dev/null
+++ b/qpid/cpp/src/qmf/PrivateImplRef.h
@@ -0,0 +1,93 @@
+#ifndef QMF_PRIVATEIMPL_H
+#define QMF_PRIVATEIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/ImportExport.h"
+#include "qpid/RefCounted.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace qmf {
+
+/**
+ * Helper class to implement a class with a private, reference counted
+ * implementation and reference semantics.
+ *
+ * Such classes are used in the public API to hide implementation, they
+ * should. Example of use:
+ *
+ * === Foo.h
+ *
+ * template <class T> PrivateImplRef;
+ * class FooImpl;
+ *
+ * Foo : public Handle<FooImpl> {
+ * public:
+ * Foo(FooImpl* = 0);
+ * Foo(const Foo&);
+ * ~Foo();
+ * Foo& operator=(const Foo&);
+ *
+ * int fooDo(); // and other Foo functions...
+ *
+ * private:
+ * typedef FooImpl Impl;
+ * Impl* impl;
+ * friend class PrivateImplRef<Foo>;
+ *
+ * === Foo.cpp
+ *
+ * typedef PrivateImplRef<Foo> PI;
+ * Foo::Foo(FooImpl* p) { PI::ctor(*this, p); }
+ * Foo::Foo(const Foo& c) : Handle<FooImpl>() { PI::copy(*this, c); }
+ * Foo::~Foo() { PI::dtor(*this); }
+ * Foo& Foo::operator=(const Foo& c) { return PI::assign(*this, c); }
+ *
+ * int foo::fooDo() { return impl->fooDo(); }
+ *
+ */
+template <class T> class PrivateImplRef {
+ public:
+ typedef typename T::Impl Impl;
+ typedef boost::intrusive_ptr<Impl> intrusive_ptr;
+
+ /** Get the implementation pointer from a handle */
+ static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); }
+
+ /** Set the implementation pointer in a handle */
+ static void set(T& t, const intrusive_ptr& p) {
+ if (t.impl == p) return;
+ if (t.impl) intrusive_ptr_release(t.impl);
+ t.impl = p.get();
+ if (t.impl) intrusive_ptr_add_ref(t.impl);
+ }
+
+ // Helper functions to implement the ctor, dtor, copy, assign
+ static void ctor(T& t, Impl* p) { t.impl = p; if (p) intrusive_ptr_add_ref(p); }
+ static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); }
+ static void dtor(T& t) { if(t.impl) intrusive_ptr_release(t.impl); }
+ static T& assign(T& t, const T& x) { set(t, get(x)); return t;}
+};
+
+} // namespace qmf
+
+#endif /*!QMF_PRIVATEIMPL_H*/
diff --git a/qpid/cpp/src/qmf/Query.cpp b/qpid/cpp/src/qmf/Query.cpp
new file mode 100644
index 0000000000..ee8ca38e59
--- /dev/null
+++ b/qpid/cpp/src/qmf/Query.cpp
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/PrivateImplRef.h"
+#include "qmf/exceptions.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/DataAddrImpl.h"
+#include "qmf/SchemaIdImpl.h"
+#include "qpid/messaging/AddressParser.h"
+
+using namespace std;
+using namespace qmf;
+using qpid::types::Variant;
+
+typedef PrivateImplRef<Query> PI;
+
+Query::Query(QueryImpl* impl) { PI::ctor(*this, impl); }
+Query::Query(const Query& s) : qmf::Handle<QueryImpl>() { PI::copy(*this, s); }
+Query::~Query() { PI::dtor(*this); }
+Query& Query::operator=(const Query& s) { return PI::assign(*this, s); }
+
+Query::Query(QueryTarget t, const string& pr) { PI::ctor(*this, new QueryImpl(t, pr)); }
+Query::Query(QueryTarget t, const string& c, const string& p, const string& pr) { PI::ctor(*this, new QueryImpl(t, c, p, pr)); }
+Query::Query(QueryTarget t, const SchemaId& s, const string& pr) { PI::ctor(*this, new QueryImpl(t, s, pr)); }
+Query::Query(const DataAddr& a) { PI::ctor(*this, new QueryImpl(a)); }
+
+QueryTarget Query::getTarget() const { return impl->getTarget(); }
+const DataAddr& Query::getDataAddr() const { return impl->getDataAddr(); }
+const SchemaId& Query::getSchemaId() const { return impl->getSchemaId(); }
+void Query::setPredicate(const Variant::List& pr) { impl->setPredicate(pr); }
+const Variant::List& Query::getPredicate() const { return impl->getPredicate(); }
+bool Query::matchesPredicate(const qpid::types::Variant::Map& map) const { return impl->matchesPredicate(map); }
+
+
+QueryImpl::QueryImpl(const Variant::Map& map) : predicateCompiled(false)
+{
+ Variant::Map::const_iterator iter;
+
+ iter = map.find("_what");
+ if (iter == map.end())
+ throw QmfException("Query missing _what element");
+
+ const string& targetString(iter->second.asString());
+ if (targetString == "OBJECT") target = QUERY_OBJECT;
+ else if (targetString == "OBJECT_ID") target = QUERY_OBJECT_ID;
+ else if (targetString == "SCHEMA") target = QUERY_SCHEMA;
+ else if (targetString == "SCHEMA_ID") target = QUERY_SCHEMA_ID;
+ else
+ throw QmfException("Query with invalid _what value: " + targetString);
+
+ iter = map.find("_object_id");
+ if (iter != map.end()) {
+ auto_ptr<DataAddrImpl> addrImpl(new DataAddrImpl(iter->second.asMap()));
+ dataAddr = DataAddr(addrImpl.release());
+ }
+
+ iter = map.find("_schema_id");
+ if (iter != map.end()) {
+ auto_ptr<SchemaIdImpl> sidImpl(new SchemaIdImpl(iter->second.asMap()));
+ schemaId = SchemaId(sidImpl.release());
+ }
+
+ iter = map.find("_where");
+ if (iter != map.end())
+ predicate = iter->second.asList();
+}
+
+
+Variant::Map QueryImpl::asMap() const
+{
+ Variant::Map map;
+ string targetString;
+
+ switch (target) {
+ case QUERY_OBJECT : targetString = "OBJECT"; break;
+ case QUERY_OBJECT_ID : targetString = "OBJECT_ID"; break;
+ case QUERY_SCHEMA : targetString = "SCHEMA"; break;
+ case QUERY_SCHEMA_ID : targetString = "SCHEMA_ID"; break;
+ }
+
+ map["_what"] = targetString;
+
+ if (dataAddr.isValid())
+ map["_object_id"] = DataAddrImplAccess::get(dataAddr).asMap();
+
+ if (schemaId.isValid())
+ map["_schema_id"] = SchemaIdImplAccess::get(schemaId).asMap();
+
+ if (!predicate.empty())
+ map["_where"] = predicate;
+
+ return map;
+}
+
+
+bool QueryImpl::matchesPredicate(const qpid::types::Variant::Map& data) const
+{
+ if (predicate.empty())
+ return true;
+
+ if (!predicateCompiled) {
+ expression.reset(new Expression(predicate));
+ predicateCompiled = true;
+ }
+
+ return expression->evaluate(data);
+}
+
+
+void QueryImpl::parsePredicate(const string& pred)
+{
+ if (pred.empty())
+ return;
+
+ if (pred[0] == '[') {
+ //
+ // Parse this as an AddressParser list.
+ //
+ qpid::messaging::AddressParser parser(pred);
+ parser.parseList(predicate);
+ } else
+ throw QmfException("Invalid predicate format");
+}
+
+
+QueryImpl& QueryImplAccess::get(Query& item)
+{
+ return *item.impl;
+}
+
+
+const QueryImpl& QueryImplAccess::get(const Query& item)
+{
+ return *item.impl;
+}
diff --git a/qpid/cpp/src/qmf/QueryImpl.h b/qpid/cpp/src/qmf/QueryImpl.h
new file mode 100644
index 0000000000..27ec427684
--- /dev/null
+++ b/qpid/cpp/src/qmf/QueryImpl.h
@@ -0,0 +1,77 @@
+#ifndef _QMF_QUERY_IMPL_H_
+#define _QMF_QUERY_IMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/Query.h"
+#include "qmf/DataAddr.h"
+#include "qmf/SchemaId.h"
+#include "qmf/Expression.h"
+#include "qpid/types/Variant.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+ class QueryImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Public impl-only methods
+ //
+ QueryImpl(const qpid::types::Variant::Map&);
+ qpid::types::Variant::Map asMap() const;
+
+ //
+ // Methods from API handle
+ //
+ QueryImpl(QueryTarget t, const std::string& pr) : target(t), predicateCompiled(false) { parsePredicate(pr); }
+ QueryImpl(QueryTarget t, const std::string& c, const std::string& p, const std::string& pr) :
+ target(t), schemaId(SCHEMA_TYPE_DATA, p, c), predicateCompiled(false) { parsePredicate(pr); }
+ QueryImpl(QueryTarget t, const SchemaId& s, const std::string& pr) :
+ target(t), schemaId(s), predicateCompiled(false) { parsePredicate(pr); }
+ QueryImpl(const DataAddr& a) : target(QUERY_OBJECT), dataAddr(a), predicateCompiled(false) {}
+
+ QueryTarget getTarget() const { return target; }
+ const DataAddr& getDataAddr() const { return dataAddr; }
+ const SchemaId& getSchemaId() const { return schemaId; }
+ void setPredicate(const qpid::types::Variant::List& pr) { predicate = pr; }
+ const qpid::types::Variant::List& getPredicate() const { return predicate; }
+ bool matchesPredicate(const qpid::types::Variant::Map& map) const;
+
+ private:
+ QueryTarget target;
+ SchemaId schemaId;
+ DataAddr dataAddr;
+ qpid::types::Variant::List predicate;
+ mutable bool predicateCompiled;
+ mutable boost::shared_ptr<Expression> expression;
+
+ void parsePredicate(const std::string& s);
+ };
+
+ struct QueryImplAccess
+ {
+ static QueryImpl& get(Query&);
+ static const QueryImpl& get(const Query&);
+ };
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/Schema.cpp b/qpid/cpp/src/qmf/Schema.cpp
new file mode 100644
index 0000000000..872aad724c
--- /dev/null
+++ b/qpid/cpp/src/qmf/Schema.cpp
@@ -0,0 +1,358 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/SchemaImpl.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/exceptions.h"
+#include "qmf/SchemaTypes.h"
+#include "qmf/SchemaIdImpl.h"
+#include "qmf/SchemaPropertyImpl.h"
+#include "qmf/SchemaMethodImpl.h"
+#include "qmf/Hash.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/Buffer.h"
+#include <list>
+
+using namespace std;
+using qpid::types::Variant;
+using namespace qmf;
+
+typedef PrivateImplRef<Schema> PI;
+
+Schema::Schema(SchemaImpl* impl) { PI::ctor(*this, impl); }
+Schema::Schema(const Schema& s) : qmf::Handle<SchemaImpl>() { PI::copy(*this, s); }
+Schema::~Schema() { PI::dtor(*this); }
+Schema& Schema::operator=(const Schema& s) { return PI::assign(*this, s); }
+
+Schema::Schema(int t, const string& p, const string& c) { PI::ctor(*this, new SchemaImpl(t, p, c)); }
+const SchemaId& Schema::getSchemaId() const { return impl->getSchemaId(); }
+void Schema::finalize() { impl->finalize(); }
+bool Schema::isFinalized() const { return impl->isFinalized(); }
+void Schema::addProperty(const SchemaProperty& p) { impl->addProperty(p); }
+void Schema::addMethod(const SchemaMethod& m) { impl->addMethod(m); }
+void Schema::setDesc(const string& d) { impl->setDesc(d); }
+const string& Schema::getDesc() const { return impl->getDesc(); }
+void Schema::setDefaultSeverity(int s) { impl->setDefaultSeverity(s); }
+int Schema::getDefaultSeverity() const { return impl->getDefaultSeverity(); }
+uint32_t Schema::getPropertyCount() const { return impl->getPropertyCount(); }
+SchemaProperty Schema::getProperty(uint32_t i) const { return impl->getProperty(i); }
+uint32_t Schema::getMethodCount() const { return impl->getMethodCount(); }
+SchemaMethod Schema::getMethod(uint32_t i) const { return impl->getMethod(i); }
+
+//========================================================================================
+// Impl Method Bodies
+//========================================================================================
+
+SchemaImpl::SchemaImpl(const Variant::Map& map) : finalized(false)
+{
+ Variant::Map::const_iterator iter;
+ Variant::List::const_iterator lIter;
+
+ iter = map.find("_schema_id");
+ if (iter == map.end())
+ throw QmfException("Schema map missing _schema_id element");
+ schemaId = SchemaId(new SchemaIdImpl(iter->second.asMap()));
+
+ iter = map.find("_desc");
+ if (iter != map.end())
+ description = iter->second.asString();
+
+ iter = map.find("_default_severity");
+ if (iter != map.end())
+ defaultSeverity = int(iter->second.asUint32());
+
+ iter = map.find("_properties");
+ if (iter != map.end()) {
+ const Variant::List& props(iter->second.asList());
+ for (lIter = props.begin(); lIter != props.end(); lIter++)
+ addProperty(SchemaProperty(new SchemaPropertyImpl(lIter->asMap())));
+ }
+
+ iter = map.find("_methods");
+ if (iter != map.end()) {
+ const Variant::List& meths(iter->second.asList());
+ for (lIter = meths.begin(); lIter != meths.end(); lIter++)
+ addMethod(SchemaMethod(new SchemaMethodImpl(lIter->asMap())));
+ }
+
+ finalized = true;
+}
+
+
+Variant::Map SchemaImpl::asMap() const
+{
+ Variant::Map map;
+ Variant::List propList;
+ Variant::List methList;
+
+ checkNotFinal();
+
+ map["_schema_id"] = SchemaIdImplAccess::get(schemaId).asMap();
+ if (!description.empty())
+ map["_desc"] = description;
+ if (schemaId.getType() == SCHEMA_TYPE_EVENT)
+ map["_default_severity"] = uint32_t(defaultSeverity);
+
+ for (list<SchemaProperty>::const_iterator pIter = properties.begin(); pIter != properties.end(); pIter++)
+ propList.push_back(SchemaPropertyImplAccess::get(*pIter).asMap());
+
+ for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++)
+ methList.push_back(SchemaMethodImplAccess::get(*mIter).asMap());
+
+ map["_properties"] = propList;
+ map["_methods"] = methList;
+ return map;
+}
+
+
+SchemaImpl::SchemaImpl(qpid::management::Buffer& buffer) : finalized(false)
+{
+ int schemaType;
+ string packageName;
+ string className;
+ uint8_t hash[16];
+
+ schemaType = int(buffer.getOctet());
+ buffer.getShortString(packageName);
+ buffer.getShortString(className);
+ buffer.getBin128(hash);
+ schemaId = SchemaId(schemaType, packageName, className);
+ schemaId.setHash(qpid::types::Uuid(hash));
+
+ if (schemaType == SCHEMA_TYPE_DATA) {
+ uint16_t propCount(buffer.getShort());
+ uint16_t statCount(buffer.getShort());
+ uint16_t methCount(buffer.getShort());
+ for (uint16_t idx = 0; idx < propCount + statCount; idx++)
+ addProperty(new SchemaPropertyImpl(buffer));
+ for (uint16_t idx = 0; idx < methCount; idx++)
+ addMethod(new SchemaMethodImpl(buffer));
+ }
+
+ finalized = true;
+}
+
+
+string SchemaImpl::asV1Content(uint32_t sequence) const
+{
+#define RAW_BUF_SIZE 65536
+ char rawBuf[RAW_BUF_SIZE];
+ qpid::management::Buffer buffer(rawBuf, RAW_BUF_SIZE);
+
+ //
+ // Encode the QMFv1 Header
+ //
+ buffer.putOctet('A');
+ buffer.putOctet('M');
+ buffer.putOctet('2');
+ buffer.putOctet('s');
+ buffer.putLong(sequence);
+
+ //
+ // Encode the common schema information
+ //
+ buffer.putOctet(uint8_t(schemaId.getType()));
+ buffer.putShortString(schemaId.getPackageName());
+ buffer.putShortString(schemaId.getName());
+ buffer.putBin128(schemaId.getHash().data());
+
+ if (schemaId.getType() == SCHEMA_TYPE_DATA) {
+ buffer.putShort(properties.size());
+ buffer.putShort(0);
+ buffer.putShort(methods.size());
+ for (list<SchemaProperty>::const_iterator pIter = properties.begin(); pIter != properties.end(); pIter++)
+ SchemaPropertyImplAccess::get(*pIter).encodeV1(buffer, false, false);
+ for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++)
+ SchemaMethodImplAccess::get(*mIter).encodeV1(buffer);
+ } else {
+ buffer.putShort(properties.size());
+ for (list<SchemaProperty>::const_iterator pIter = properties.begin(); pIter != properties.end(); pIter++)
+ SchemaPropertyImplAccess::get(*pIter).encodeV1(buffer, true, false);
+ }
+
+ return string(rawBuf, buffer.getPosition());
+}
+
+
+bool SchemaImpl::isValidProperty(const std::string& k, const Variant& v) const
+{
+ for (list<SchemaProperty>::const_iterator iter = properties.begin(); iter != properties.end(); iter++)
+ if (iter->getName() == k)
+ return (isCompatibleType(iter->getType(), v.getType()));
+ return false;
+}
+
+
+bool SchemaImpl::isValidMethodInArg(const std::string& m, const std::string& k, const Variant& v) const
+{
+ for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++) {
+ if (mIter->getName() == m) {
+ uint32_t count(mIter->getArgumentCount());
+ for (uint32_t i = 0; i < count; i++) {
+ const SchemaProperty prop(mIter->getArgument(i));
+ if (prop.getName() == k) {
+ if (prop.getDirection() == DIR_IN || prop.getDirection() == DIR_IN_OUT)
+ return (isCompatibleType(prop.getType(), v.getType()));
+ else
+ return false;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+
+bool SchemaImpl::isValidMethodOutArg(const std::string& m, const std::string& k, const Variant& v) const
+{
+ for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++) {
+ if (mIter->getName() == m) {
+ uint32_t count(mIter->getArgumentCount());
+ for (uint32_t i = 0; i < count; i++) {
+ const SchemaProperty prop(mIter->getArgument(i));
+ if (prop.getName() == k) {
+ if (prop.getDirection() == DIR_OUT || prop.getDirection() == DIR_IN_OUT)
+ return (isCompatibleType(prop.getType(), v.getType()));
+ else
+ return false;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+
+void SchemaImpl::finalize()
+{
+ Hash hash;
+
+ hash.update((uint8_t) schemaId.getType());
+ hash.update(schemaId.getPackageName());
+ hash.update(schemaId.getName());
+
+ for (list<SchemaProperty>::const_iterator pIter = properties.begin(); pIter != properties.end(); pIter++)
+ SchemaPropertyImplAccess::get(*pIter).updateHash(hash);
+ for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++)
+ SchemaMethodImplAccess::get(*mIter).updateHash(hash);
+
+ schemaId.setHash(hash.asUuid());
+ QPID_LOG(debug, "Schema Finalized: " << schemaId.getPackageName() << ":" << schemaId.getName() << ":" <<
+ schemaId.getHash());
+
+ finalized = true;
+}
+
+
+SchemaProperty SchemaImpl::getProperty(uint32_t i) const
+{
+ uint32_t count = 0;
+ for (list<SchemaProperty>::const_iterator iter = properties.begin(); iter != properties.end(); iter++)
+ if (count++ == i)
+ return *iter;
+ throw IndexOutOfRange();
+}
+
+
+SchemaMethod SchemaImpl::getMethod(uint32_t i) const
+{
+ uint32_t count = 0;
+ for (list<SchemaMethod>::const_iterator iter = methods.begin(); iter != methods.end(); iter++)
+ if (count++ == i)
+ return *iter;
+ throw IndexOutOfRange();
+}
+
+void SchemaImpl::checkFinal() const
+{
+ if (finalized)
+ throw QmfException("Modification of a finalized schema is forbidden");
+}
+
+
+void SchemaImpl::checkNotFinal() const
+{
+ if (!finalized)
+ throw QmfException("Schema is not yet finalized/registered");
+}
+
+
+bool SchemaImpl::isCompatibleType(int qmfType, qpid::types::VariantType qpidType) const
+{
+ bool typeValid(false);
+
+ switch (qpidType) {
+ case qpid::types::VAR_VOID:
+ if (qmfType == SCHEMA_DATA_VOID)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_BOOL:
+ if (qmfType == SCHEMA_DATA_BOOL)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_UINT8:
+ case qpid::types::VAR_UINT16:
+ case qpid::types::VAR_UINT32:
+ case qpid::types::VAR_UINT64:
+ case qpid::types::VAR_INT8:
+ case qpid::types::VAR_INT16:
+ case qpid::types::VAR_INT32:
+ case qpid::types::VAR_INT64:
+ if (qmfType == SCHEMA_DATA_INT)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_FLOAT:
+ case qpid::types::VAR_DOUBLE:
+ if (qmfType == SCHEMA_DATA_FLOAT)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_STRING:
+ if (qmfType == SCHEMA_DATA_STRING)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_MAP:
+ if (qmfType == SCHEMA_DATA_MAP)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_LIST:
+ if (qmfType == SCHEMA_DATA_LIST)
+ typeValid = true;
+ break;
+ case qpid::types::VAR_UUID:
+ if (qmfType == SCHEMA_DATA_UUID)
+ typeValid = true;
+ break;
+ }
+
+ return typeValid;
+}
+
+
+SchemaImpl& SchemaImplAccess::get(Schema& item)
+{
+ return *item.impl;
+}
+
+
+const SchemaImpl& SchemaImplAccess::get(const Schema& item)
+{
+ return *item.impl;
+}
diff --git a/qpid/cpp/src/qmf/SchemaCache.cpp b/qpid/cpp/src/qmf/SchemaCache.cpp
new file mode 100644
index 0000000000..74ca4044fd
--- /dev/null
+++ b/qpid/cpp/src/qmf/SchemaCache.cpp
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/SchemaCache.h"
+#include "qmf/exceptions.h"
+
+using namespace std;
+using namespace qmf;
+
+bool SchemaCache::declareSchemaId(const SchemaId& id)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ SchemaMap::const_iterator iter = schemata.find(id);
+ if (iter == schemata.end()) {
+ schemata[id] = Schema();
+ return false;
+ }
+ return true;
+}
+
+
+void SchemaCache::declareSchema(const Schema& schema)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ SchemaMap::const_iterator iter = schemata.find(schema.getSchemaId());
+ if (iter == schemata.end() || !iter->second.isValid()) {
+ schemata[schema.getSchemaId()] = schema;
+
+ //
+ // If there are any threads blocking in SchemaCache::getSchema waiting for
+ // this schema, unblock them all now.
+ //
+ CondMap::iterator cIter = conditions.find(schema.getSchemaId());
+ if (cIter != conditions.end())
+ cIter->second->notifyAll();
+ }
+}
+
+
+bool SchemaCache::haveSchema(const SchemaId& id) const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ SchemaMap::const_iterator iter = schemata.find(id);
+ return iter != schemata.end() && iter->second.isValid();
+}
+
+
+const Schema& SchemaCache::getSchema(const SchemaId& id, qpid::messaging::Duration timeout) const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ SchemaMap::const_iterator iter = schemata.find(id);
+ if (iter != schemata.end() && iter->second.isValid())
+ return iter->second;
+
+ //
+ // The desired schema is not in the cache. Assume that the caller knows this and has
+ // sent a schema request to the remote agent and now wishes to wait until the schema
+ // information arrives.
+ //
+ CondMap::iterator cIter = conditions.find(id);
+ if (cIter == conditions.end())
+ conditions[id] = boost::shared_ptr<qpid::sys::Condition>(new qpid::sys::Condition());
+
+ uint64_t milliseconds = timeout.getMilliseconds();
+ conditions[id]->wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
+ qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+ iter = schemata.find(id);
+ if (iter != schemata.end() && iter->second.isValid())
+ return iter->second;
+
+ throw QmfException("Schema lookup timed out");
+}
+
diff --git a/qpid/cpp/src/qmf/SchemaCache.h b/qpid/cpp/src/qmf/SchemaCache.h
new file mode 100644
index 0000000000..a1f104233f
--- /dev/null
+++ b/qpid/cpp/src/qmf/SchemaCache.h
@@ -0,0 +1,56 @@
+#ifndef QMF_SCHEMA_CACHE_H
+#define QMF_SCHEMA_CACHE_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/SchemaIdImpl.h"
+#include "qmf/Schema.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
+#include "qpid/messaging/Duration.h"
+#include <string>
+#include <map>
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+
+ class SchemaCache {
+ public:
+ SchemaCache() {}
+ ~SchemaCache() {}
+
+ bool declareSchemaId(const SchemaId&);
+ void declareSchema(const Schema&);
+ bool haveSchema(const SchemaId&) const;
+ const Schema& getSchema(const SchemaId&, qpid::messaging::Duration) const;
+
+ private:
+ mutable qpid::sys::Mutex lock;
+ typedef std::map<SchemaId, Schema, SchemaIdCompare> SchemaMap;
+ typedef std::map<SchemaId, boost::shared_ptr<qpid::sys::Condition>, SchemaIdCompare> CondMap;
+ SchemaMap schemata;
+ mutable CondMap conditions;
+ };
+
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/SchemaId.cpp b/qpid/cpp/src/qmf/SchemaId.cpp
new file mode 100644
index 0000000000..25fa9915ae
--- /dev/null
+++ b/qpid/cpp/src/qmf/SchemaId.cpp
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/SchemaIdImpl.h"
+#include "qmf/PrivateImplRef.h"
+
+using namespace std;
+using namespace qmf;
+using qpid::types::Variant;
+
+typedef PrivateImplRef<SchemaId> PI;
+
+SchemaId::SchemaId(SchemaIdImpl* impl) { PI::ctor(*this, impl); }
+SchemaId::SchemaId(const SchemaId& s) : qmf::Handle<SchemaIdImpl>() { PI::copy(*this, s); }
+SchemaId::~SchemaId() { PI::dtor(*this); }
+SchemaId& SchemaId::operator=(const SchemaId& s) { return PI::assign(*this, s); }
+
+SchemaId::SchemaId(int t, const string& p, const string& n) { PI::ctor(*this, new SchemaIdImpl(t, p, n)); }
+void SchemaId::setHash(const qpid::types::Uuid& h) { impl->setHash(h); }
+int SchemaId::getType() const { return impl->getType(); }
+const string& SchemaId::getPackageName() const { return impl->getPackageName(); }
+const string& SchemaId::getName() const { return impl->getName(); }
+const qpid::types::Uuid& SchemaId::getHash() const { return impl->getHash(); }
+
+
+SchemaIdImpl::SchemaIdImpl(const Variant::Map& map)
+{
+ Variant::Map::const_iterator iter;
+
+ iter = map.find("_package_name");
+ if (iter != map.end())
+ package = iter->second.asString();
+
+ iter = map.find("_class_name");
+ if (iter != map.end())
+ name = iter->second.asString();
+
+ iter = map.find("_type");
+ if (iter != map.end()) {
+ const string& stype = iter->second.asString();
+ if (stype == "_data")
+ sType = SCHEMA_TYPE_DATA;
+ else if (stype == "_event")
+ sType = SCHEMA_TYPE_EVENT;
+ }
+
+ iter = map.find("_hash");
+ if (iter != map.end())
+ hash = iter->second.asUuid();
+}
+
+
+Variant::Map SchemaIdImpl::asMap() const
+{
+ Variant::Map result;
+
+ result["_package_name"] = package;
+ result["_class_name"] = name;
+ if (sType == SCHEMA_TYPE_DATA)
+ result["_type"] = "_data";
+ else
+ result["_type"] = "_event";
+ if (!hash.isNull())
+ result["_hash"] = hash;
+ return result;
+}
+
+
+SchemaIdImpl& SchemaIdImplAccess::get(SchemaId& item)
+{
+ return *item.impl;
+}
+
+
+const SchemaIdImpl& SchemaIdImplAccess::get(const SchemaId& item)
+{
+ return *item.impl;
+}
diff --git a/qpid/cpp/src/qmf/SchemaIdImpl.h b/qpid/cpp/src/qmf/SchemaIdImpl.h
new file mode 100644
index 0000000000..ae1a3d8d3b
--- /dev/null
+++ b/qpid/cpp/src/qmf/SchemaIdImpl.h
@@ -0,0 +1,83 @@
+#ifndef _QMF_SCHEMA_ID_IMPL_H_
+#define _QMF_SCHEMA_ID_IMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/SchemaId.h"
+#include "qpid/types/Variant.h"
+#include "qpid/types/Uuid.h"
+#include <string>
+
+namespace qmf {
+ class SchemaIdImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Public impl-only methods
+ //
+ SchemaIdImpl(const qpid::types::Variant::Map&);
+ qpid::types::Variant::Map asMap() const;
+
+ //
+ // Methods from API handle
+ //
+ SchemaIdImpl(int t, const std::string& p, const std::string& n) : sType(t), package(p), name(n) {}
+ void setHash(const qpid::types::Uuid& h) { hash = h; }
+ int getType() const { return sType; }
+ const std::string& getPackageName() const { return package; }
+ const std::string& getName() const { return name; }
+ const qpid::types::Uuid& getHash() const { return hash; }
+
+ private:
+ int sType;
+ std::string package;
+ std::string name;
+ qpid::types::Uuid hash;
+ };
+
+ struct SchemaIdImplAccess
+ {
+ static SchemaIdImpl& get(SchemaId&);
+ static const SchemaIdImpl& get(const SchemaId&);
+ };
+
+ struct SchemaIdCompare {
+ bool operator() (const SchemaId& lhs, const SchemaId& rhs) const
+ {
+ if (lhs.getName() != rhs.getName())
+ return lhs.getName() < rhs.getName();
+ if (lhs.getPackageName() != rhs.getPackageName())
+ return lhs.getPackageName() < rhs.getPackageName();
+ return lhs.getHash() < rhs.getHash();
+ }
+ };
+
+ struct SchemaIdCompareNoHash {
+ bool operator() (const SchemaId& lhs, const SchemaId& rhs) const
+ {
+ if (lhs.getName() != rhs.getName())
+ return lhs.getName() < rhs.getName();
+ return lhs.getPackageName() < rhs.getPackageName();
+ }
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/SchemaImpl.h b/qpid/cpp/src/qmf/SchemaImpl.h
new file mode 100644
index 0000000000..1c88f87808
--- /dev/null
+++ b/qpid/cpp/src/qmf/SchemaImpl.h
@@ -0,0 +1,95 @@
+#ifndef _QMF_SCHEMAIMPL_H_
+#define _QMF_SCHEMAIMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/exceptions.h"
+#include "qmf/SchemaTypes.h"
+#include "qmf/SchemaId.h"
+#include "qmf/Schema.h"
+#include "qmf/SchemaProperty.h"
+#include "qmf/SchemaMethod.h"
+#include <list>
+
+namespace qpid {
+namespace management {
+ class Buffer;
+}}
+
+namespace qmf {
+ class SchemaImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Impl-only public methods
+ //
+ SchemaImpl(const qpid::types::Variant::Map& m);
+ qpid::types::Variant::Map asMap() const;
+ SchemaImpl(qpid::management::Buffer& v1Buffer);
+ std::string asV1Content(uint32_t sequence) const;
+ bool isValidProperty(const std::string& k, const qpid::types::Variant& v) const;
+ bool isValidMethodInArg(const std::string& m, const std::string& k, const qpid::types::Variant& v) const;
+ bool isValidMethodOutArg(const std::string& m, const std::string& k, const qpid::types::Variant& v) const;
+
+ //
+ // Methods from API handle
+ //
+ SchemaImpl(int t, const std::string& p, const std::string& c) : schemaId(t, p, c), finalized(false) {}
+ const SchemaId& getSchemaId() const { checkNotFinal(); return schemaId; }
+
+ void finalize();
+ bool isFinalized() const { return finalized; }
+ void addProperty(const SchemaProperty& p) { checkFinal(); properties.push_back(p); }
+ void addMethod(const SchemaMethod& m) { checkFinal(); methods.push_back(m); }
+
+ void setDesc(const std::string& d) { description = d; }
+ const std::string& getDesc() const { return description; }
+
+ void setDefaultSeverity(int s) { checkFinal(); defaultSeverity = s; }
+ int getDefaultSeverity() const { return defaultSeverity; }
+
+ uint32_t getPropertyCount() const { return properties.size(); }
+ SchemaProperty getProperty(uint32_t i) const;
+
+ uint32_t getMethodCount() const { return methods.size(); }
+ SchemaMethod getMethod(uint32_t i) const;
+ private:
+ SchemaId schemaId;
+ int defaultSeverity;
+ std::string description;
+ bool finalized;
+ std::list<SchemaProperty> properties;
+ std::list<SchemaMethod> methods;
+
+ void checkFinal() const;
+ void checkNotFinal() const;
+ bool isCompatibleType(int qmfType, qpid::types::VariantType qpidType) const;
+ };
+
+ struct SchemaImplAccess
+ {
+ static SchemaImpl& get(Schema&);
+ static const SchemaImpl& get(const Schema&);
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/SchemaMethod.cpp b/qpid/cpp/src/qmf/SchemaMethod.cpp
new file mode 100644
index 0000000000..e267878238
--- /dev/null
+++ b/qpid/cpp/src/qmf/SchemaMethod.cpp
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/SchemaMethodImpl.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/exceptions.h"
+#include "qmf/Hash.h"
+#include "qpid/messaging/AddressParser.h"
+#include "qpid/management/Buffer.h"
+
+using namespace std;
+using qpid::types::Variant;
+using namespace qmf;
+
+typedef PrivateImplRef<SchemaMethod> PI;
+
+SchemaMethod::SchemaMethod(SchemaMethodImpl* impl) { PI::ctor(*this, impl); }
+SchemaMethod::SchemaMethod(const SchemaMethod& s) : qmf::Handle<SchemaMethodImpl>() { PI::copy(*this, s); }
+SchemaMethod::~SchemaMethod() { PI::dtor(*this); }
+SchemaMethod& SchemaMethod::operator=(const SchemaMethod& s) { return PI::assign(*this, s); }
+
+SchemaMethod::SchemaMethod(const string& n, const string& o) { PI::ctor(*this, new SchemaMethodImpl(n, o)); }
+void SchemaMethod::setDesc(const string& d) { impl->setDesc(d); }
+void SchemaMethod::addArgument(const SchemaProperty& p) { impl->addArgument(p); }
+const string& SchemaMethod::getName() const { return impl->getName(); }
+const string& SchemaMethod::getDesc() const { return impl->getDesc(); }
+uint32_t SchemaMethod::getArgumentCount() const { return impl->getArgumentCount(); }
+SchemaProperty SchemaMethod::getArgument(uint32_t i) const { return impl->getArgument(i); }
+
+//========================================================================================
+// Impl Method Bodies
+//========================================================================================
+
+SchemaMethodImpl::SchemaMethodImpl(const string& n, const string& options) : name(n)
+{
+ if (!options.empty()) {
+ qpid::messaging::AddressParser parser = qpid::messaging::AddressParser(options);
+ Variant::Map optMap;
+ Variant::Map::iterator iter;
+
+ parser.parseMap(optMap);
+ iter = optMap.find("desc");
+ if (iter != optMap.end()) {
+ desc = iter->second.asString();
+ optMap.erase(iter);
+ }
+
+ if (!optMap.empty())
+ throw QmfException("Unrecognized option: " + optMap.begin()->first);
+ }
+}
+
+
+SchemaMethodImpl::SchemaMethodImpl(const qpid::types::Variant::Map& map)
+{
+ Variant::Map::const_iterator iter;
+ Variant::List::const_iterator lIter;
+
+ iter = map.find("_name");
+ if (iter == map.end())
+ throw QmfException("SchemaMethod without a _name element");
+ name = iter->second.asString();
+
+ iter = map.find("_desc");
+ if (iter != map.end())
+ desc = iter->second.asString();
+
+ iter = map.find("_arguments");
+ if (iter != map.end()) {
+ const Variant::List& argList(iter->second.asList());
+ for (lIter = argList.begin(); lIter != argList.end(); lIter++)
+ addArgument(SchemaProperty(new SchemaPropertyImpl(lIter->asMap())));
+ }
+}
+
+
+Variant::Map SchemaMethodImpl::asMap() const
+{
+ Variant::Map map;
+ Variant::List argList;
+
+ map["_name"] = name;
+
+ if (!desc.empty())
+ map["_desc"] = desc;
+
+ for (list<SchemaProperty>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++)
+ argList.push_back(SchemaPropertyImplAccess::get(*iter).asMap());
+ map["_arguments"] = argList;
+
+ return map;
+}
+
+
+SchemaMethodImpl::SchemaMethodImpl(qpid::management::Buffer& buffer)
+{
+ Variant::Map::const_iterator iter;
+ Variant::Map argMap;
+
+ buffer.getMap(argMap);
+
+ iter = argMap.find("name");
+ if (iter == argMap.end())
+ throw QmfException("Received V1 Method without a name");
+ name = iter->second.asString();
+
+ iter = argMap.find("desc");
+ if (iter != argMap.end())
+ desc = iter->second.asString();
+
+ iter = argMap.find("argCount");
+ if (iter == argMap.end())
+ throw QmfException("Received V1 Method without argCount");
+
+ int64_t count = iter->second.asInt64();
+ for (int idx = 0; idx < count; idx++) {
+ SchemaProperty arg(new SchemaPropertyImpl(buffer));
+ addArgument(arg);
+ }
+}
+
+
+SchemaProperty SchemaMethodImpl::getArgument(uint32_t i) const
+{
+ uint32_t count = 0;
+ for (list<SchemaProperty>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++)
+ if (count++ == i)
+ return *iter;
+
+ throw IndexOutOfRange();
+}
+
+
+void SchemaMethodImpl::updateHash(Hash& hash) const
+{
+ hash.update(name);
+ hash.update(desc);
+ for (list<SchemaProperty>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++)
+ SchemaPropertyImplAccess::get(*iter).updateHash(hash);
+}
+
+
+void SchemaMethodImpl::encodeV1(qpid::management::Buffer& buffer) const
+{
+ Variant::Map map;
+
+ map["name"] = name;
+ map["argCount"] = (uint64_t) arguments.size();
+ if (!desc.empty())
+ map["desc"] = desc;
+
+ buffer.putMap(map);
+
+ for (list<SchemaProperty>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++)
+ SchemaPropertyImplAccess::get(*iter).encodeV1(buffer, true, true);
+}
+
+
+SchemaMethodImpl& SchemaMethodImplAccess::get(SchemaMethod& item)
+{
+ return *item.impl;
+}
+
+
+const SchemaMethodImpl& SchemaMethodImplAccess::get(const SchemaMethod& item)
+{
+ return *item.impl;
+}
diff --git a/qpid/cpp/src/qmf/SchemaMethodImpl.h b/qpid/cpp/src/qmf/SchemaMethodImpl.h
new file mode 100644
index 0000000000..930d48509c
--- /dev/null
+++ b/qpid/cpp/src/qmf/SchemaMethodImpl.h
@@ -0,0 +1,75 @@
+#ifndef _QMF_SCHEMA_METHOD_IMPL_H_
+#define _QMF_SCHEMA_METHOD_IMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/SchemaTypes.h"
+#include "qmf/SchemaMethod.h"
+#include "qmf/SchemaPropertyImpl.h"
+#include "qpid/management/Buffer.h"
+#include <list>
+#include <string>
+
+namespace qpid {
+namespace management {
+ class Buffer;
+}}
+
+namespace qmf {
+ class Hash;
+ class SchemaMethodImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Public impl-only methods
+ //
+ SchemaMethodImpl(const qpid::types::Variant::Map& m);
+ SchemaMethodImpl(qpid::management::Buffer& v1Buffer);
+ qpid::types::Variant::Map asMap() const;
+ void updateHash(Hash&) const;
+ void encodeV1(qpid::management::Buffer&) const;
+
+ //
+ // Methods from API handle
+ //
+ SchemaMethodImpl(const std::string& n, const std::string& options);
+
+ void setDesc(const std::string& d) { desc = d; }
+ void addArgument(const SchemaProperty& p) { arguments.push_back(p); }
+ const std::string& getName() const { return name; }
+ const std::string& getDesc() const { return desc; }
+ uint32_t getArgumentCount() const { return arguments.size(); }
+ SchemaProperty getArgument(uint32_t i) const;
+
+ private:
+ std::string name;
+ std::string desc;
+ std::list<SchemaProperty> arguments;
+ };
+
+ struct SchemaMethodImplAccess
+ {
+ static SchemaMethodImpl& get(SchemaMethod&);
+ static const SchemaMethodImpl& get(const SchemaMethod&);
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/SchemaProperty.cpp b/qpid/cpp/src/qmf/SchemaProperty.cpp
new file mode 100644
index 0000000000..106127261b
--- /dev/null
+++ b/qpid/cpp/src/qmf/SchemaProperty.cpp
@@ -0,0 +1,434 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/SchemaPropertyImpl.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/exceptions.h"
+#include "qmf/SchemaTypes.h"
+#include "qmf/SchemaProperty.h"
+#include "qmf/Hash.h"
+#include "qpid/messaging/AddressParser.h"
+#include <list>
+#include <iostream>
+
+using namespace std;
+using qpid::types::Variant;
+using namespace qmf;
+
+typedef PrivateImplRef<SchemaProperty> PI;
+
+SchemaProperty::SchemaProperty(SchemaPropertyImpl* impl) { PI::ctor(*this, impl); }
+SchemaProperty::SchemaProperty(const SchemaProperty& s) : qmf::Handle<SchemaPropertyImpl>() { PI::copy(*this, s); }
+SchemaProperty::~SchemaProperty() { PI::dtor(*this); }
+SchemaProperty& SchemaProperty::operator=(const SchemaProperty& s) { return PI::assign(*this, s); }
+
+SchemaProperty::SchemaProperty(const string& n, int t, const string& o) { PI::ctor(*this, new SchemaPropertyImpl(n, t, o)); }
+
+void SchemaProperty::setAccess(int a) { impl->setAccess(a); }
+void SchemaProperty::setIndex(bool i) { impl->setIndex(i); }
+void SchemaProperty::setOptional(bool o) { impl->setOptional(o); }
+void SchemaProperty::setUnit(const string& u) { impl->setUnit(u); }
+void SchemaProperty::setDesc(const string& d) { impl->setDesc(d); }
+void SchemaProperty::setSubtype(const string& s) { impl->setSubtype(s); }
+void SchemaProperty::setDirection(int d) { impl->setDirection(d); }
+
+const string& SchemaProperty::getName() const { return impl->getName(); }
+int SchemaProperty::getType() const { return impl->getType(); }
+int SchemaProperty::getAccess() const { return impl->getAccess(); }
+bool SchemaProperty::isIndex() const { return impl->isIndex(); }
+bool SchemaProperty::isOptional() const { return impl->isOptional(); }
+const string& SchemaProperty::getUnit() const { return impl->getUnit(); }
+const string& SchemaProperty::getDesc() const { return impl->getDesc(); }
+const string& SchemaProperty::getSubtype() const { return impl->getSubtype(); }
+int SchemaProperty::getDirection() const { return impl->getDirection(); }
+
+//========================================================================================
+// Impl Method Bodies
+//========================================================================================
+
+SchemaPropertyImpl::SchemaPropertyImpl(const string& n, int t, const string options) :
+ name(n), dataType(t), access(ACCESS_READ_ONLY), index(false),
+ optional(false), direction(DIR_IN)
+{
+ if (!options.empty()) {
+ qpid::messaging::AddressParser parser = qpid::messaging::AddressParser(options);
+ Variant::Map optMap;
+ Variant::Map::iterator iter;
+
+ parser.parseMap(optMap);
+
+ iter = optMap.find("access");
+ if (iter != optMap.end()) {
+ const string& v(iter->second.asString());
+ if (v == "RC") access = ACCESS_READ_CREATE;
+ else if (v == "RO") access = ACCESS_READ_ONLY;
+ else if (v == "RW") access = ACCESS_READ_WRITE;
+ else
+ throw QmfException("Invalid value for 'access' option. Expected RC, RO, or RW");
+ optMap.erase(iter);
+ }
+
+ iter = optMap.find("index");
+ if (iter != optMap.end()) {
+ index = iter->second.asBool();
+ optMap.erase(iter);
+ }
+
+ iter = optMap.find("optional");
+ if (iter != optMap.end()) {
+ optional = iter->second.asBool();
+ optMap.erase(iter);
+ }
+
+ iter = optMap.find("unit");
+ if (iter != optMap.end()) {
+ unit = iter->second.asString();
+ optMap.erase(iter);
+ }
+
+ iter = optMap.find("desc");
+ if (iter != optMap.end()) {
+ desc = iter->second.asString();
+ optMap.erase(iter);
+ }
+
+ iter = optMap.find("subtype");
+ if (iter != optMap.end()) {
+ subtype = iter->second.asString();
+ optMap.erase(iter);
+ }
+
+ iter = optMap.find("dir");
+ if (iter != optMap.end()) {
+ const string& v(iter->second.asString());
+ if (v == "IN") direction = DIR_IN;
+ else if (v == "OUT") direction = DIR_OUT;
+ else if (v == "INOUT") direction = DIR_IN_OUT;
+ else
+ throw QmfException("Invalid value for 'dir' option. Expected IN, OUT, or INOUT");
+ optMap.erase(iter);
+ }
+
+ if (!optMap.empty())
+ throw QmfException("Unexpected option: " + optMap.begin()->first);
+ }
+}
+
+
+SchemaPropertyImpl::SchemaPropertyImpl(const Variant::Map& map) :
+ access(ACCESS_READ_ONLY), index(false), optional(false), direction(DIR_IN)
+{
+ Variant::Map::const_iterator iter;
+
+ iter = map.find("_name");
+ if (iter == map.end())
+ throw QmfException("SchemaProperty without a _name element");
+ name = iter->second.asString();
+
+ iter = map.find("_type");
+ if (iter == map.end())
+ throw QmfException("SchemaProperty without a _type element");
+ const string& ts(iter->second.asString());
+ if (ts == "TYPE_VOID") dataType = SCHEMA_DATA_VOID;
+ else if (ts == "TYPE_BOOL") dataType = SCHEMA_DATA_BOOL;
+ else if (ts == "TYPE_INT") dataType = SCHEMA_DATA_INT;
+ else if (ts == "TYPE_FLOAT") dataType = SCHEMA_DATA_FLOAT;
+ else if (ts == "TYPE_STRING") dataType = SCHEMA_DATA_STRING;
+ else if (ts == "TYPE_MAP") dataType = SCHEMA_DATA_MAP;
+ else if (ts == "TYPE_LIST") dataType = SCHEMA_DATA_LIST;
+ else if (ts == "TYPE_UUID") dataType = SCHEMA_DATA_UUID;
+ else
+ throw QmfException("SchemaProperty with an invalid type code: " + ts);
+
+ iter = map.find("_access");
+ if (iter != map.end()) {
+ const string& as(iter->second.asString());
+ if (as == "RO") access = ACCESS_READ_ONLY;
+ else if (as == "RC") access = ACCESS_READ_CREATE;
+ else if (as == "RW") access = ACCESS_READ_WRITE;
+ else
+ throw QmfException("SchemaProperty with an invalid access code: " + as);
+ }
+
+ iter = map.find("_unit");
+ if (iter != map.end())
+ unit = iter->second.asString();
+
+ iter = map.find("_dir");
+ if (iter != map.end()) {
+ const string& ds(iter->second.asString());
+ if (ds == "I") direction = DIR_IN;
+ else if (ds == "O") direction = DIR_OUT;
+ else if (ds == "IO") direction = DIR_IN_OUT;
+ else
+ throw QmfException("SchemaProperty with an invalid direction code: " + ds);
+ }
+
+ iter = map.find("_desc");
+ if (iter != map.end())
+ desc = iter->second.asString();
+
+ iter = map.find("_index");
+ if (iter != map.end())
+ index = iter->second.asBool();
+
+ iter = map.find("_subtype");
+ if (iter != map.end())
+ subtype = iter->second.asString();
+}
+
+
+Variant::Map SchemaPropertyImpl::asMap() const
+{
+ Variant::Map map;
+ string ts;
+
+ map["_name"] = name;
+
+ switch (dataType) {
+ case SCHEMA_DATA_VOID: ts = "TYPE_VOID"; break;
+ case SCHEMA_DATA_BOOL: ts = "TYPE_BOOL"; break;
+ case SCHEMA_DATA_INT: ts = "TYPE_INT"; break;
+ case SCHEMA_DATA_FLOAT: ts = "TYPE_FLOAT"; break;
+ case SCHEMA_DATA_STRING: ts = "TYPE_STRING"; break;
+ case SCHEMA_DATA_MAP: ts = "TYPE_MAP"; break;
+ case SCHEMA_DATA_LIST: ts = "TYPE_LIST"; break;
+ case SCHEMA_DATA_UUID: ts = "TYPE_UUID"; break;
+ }
+ map["_type"] = ts;
+
+ switch (access) {
+ case ACCESS_READ_ONLY: ts = "RO"; break;
+ case ACCESS_READ_CREATE: ts = "RC"; break;
+ case ACCESS_READ_WRITE: ts = "RW"; break;
+ }
+ map["_access"] = ts;
+
+ if (!unit.empty())
+ map["_unit"] = unit;
+
+ switch (direction) {
+ case DIR_IN: ts = "I"; break;
+ case DIR_OUT: ts = "O"; break;
+ case DIR_IN_OUT: ts = "IO"; break;
+ }
+ map["_dir"] = ts;
+
+ if (!desc.empty())
+ map["_desc"] = desc;
+
+ if (index)
+ map["_index"] = true;
+
+ if (!subtype.empty())
+ map["_subtype"] = subtype;
+
+ return map;
+}
+
+
+SchemaPropertyImpl::SchemaPropertyImpl(qpid::management::Buffer& buffer) :
+ access(ACCESS_READ_ONLY), index(false), optional(false), direction(DIR_IN)
+{
+ Variant::Map::const_iterator iter;
+ Variant::Map pmap;
+
+ buffer.getMap(pmap);
+ iter = pmap.find("name");
+ if (iter == pmap.end())
+ throw QmfException("Received V1 Schema property without a name");
+ name = iter->second.asString();
+
+ iter = pmap.find("type");
+ if (iter == pmap.end())
+ throw QmfException("Received V1 Schema property without a type");
+ fromV1TypeCode(iter->second.asInt8());
+
+ iter = pmap.find("unit");
+ if (iter != pmap.end())
+ unit = iter->second.asString();
+
+ iter = pmap.find("desc");
+ if (iter != pmap.end())
+ desc = iter->second.asString();
+
+ iter = pmap.find("access");
+ if (iter != pmap.end()) {
+ int8_t val = iter->second.asInt8();
+ if (val < 1 || val > 3)
+ throw QmfException("Received V1 Schema property with invalid 'access' code");
+ access = val;
+ }
+
+ iter = pmap.find("index");
+ if (iter != pmap.end())
+ index = iter->second.asInt64() != 0;
+
+ iter = pmap.find("optional");
+ if (iter != pmap.end())
+ optional = iter->second.asInt64() != 0;
+
+ iter = pmap.find("dir");
+ if (iter != pmap.end()) {
+ string dirStr(iter->second.asString());
+ if (dirStr == "I") direction = DIR_IN;
+ else if (dirStr == "O") direction = DIR_OUT;
+ else if (dirStr == "IO") direction = DIR_IN_OUT;
+ else
+ throw QmfException("Received V1 Schema property with invalid 'dir' code");
+ }
+}
+
+
+void SchemaPropertyImpl::updateHash(Hash& hash) const
+{
+ hash.update(name);
+ hash.update((uint8_t) dataType);
+ hash.update(subtype);
+ hash.update((uint8_t) access);
+ hash.update(index);
+ hash.update(optional);
+ hash.update(unit);
+ hash.update(desc);
+ hash.update((uint8_t) direction);
+}
+
+
+void SchemaPropertyImpl::encodeV1(qpid::management::Buffer& buffer, bool isArg, bool isMethodArg) const
+{
+ Variant::Map pmap;
+
+ pmap["name"] = name;
+ pmap["type"] = v1TypeCode();
+ if (!unit.empty())
+ pmap["unit"] = unit;
+ if (!desc.empty())
+ pmap["desc"] = desc;
+ if (!isArg) {
+ pmap["access"] = access;
+ pmap["index"] = index ? 1 : 0;
+ pmap["optional"] = optional ? 1 : 0;
+ } else {
+ if (isMethodArg) {
+ string dirStr;
+ switch (direction) {
+ case DIR_IN : dirStr = "I"; break;
+ case DIR_OUT : dirStr = "O"; break;
+ case DIR_IN_OUT : dirStr = "IO"; break;
+ }
+ pmap["dir"] = dirStr;
+ }
+ }
+
+ buffer.putMap(pmap);
+}
+
+
+uint8_t SchemaPropertyImpl::v1TypeCode() const
+{
+ switch (dataType) {
+ case SCHEMA_DATA_VOID: return 1;
+ case SCHEMA_DATA_BOOL: return 11;
+ case SCHEMA_DATA_INT:
+ if (subtype == "timestamp") return 8;
+ if (subtype == "duration") return 9;
+ return 19;
+ case SCHEMA_DATA_FLOAT: return 13;
+ case SCHEMA_DATA_STRING: return 7;
+ case SCHEMA_DATA_LIST: return 21;
+ case SCHEMA_DATA_UUID: return 14;
+ case SCHEMA_DATA_MAP:
+ if (subtype == "reference") return 10;
+ if (subtype == "data") return 20;
+ return 15;
+ }
+
+ return 1;
+}
+
+void SchemaPropertyImpl::fromV1TypeCode(int8_t code)
+{
+ switch (code) {
+ case 1: // U8
+ case 2: // U16
+ case 3: // U32
+ case 4: // U64
+ dataType = SCHEMA_DATA_INT;
+ break;
+ case 6: // SSTR
+ case 7: // LSTR
+ dataType = SCHEMA_DATA_STRING;
+ break;
+ case 8: // ABSTIME
+ dataType = SCHEMA_DATA_INT;
+ subtype = "timestamp";
+ break;
+ case 9: // DELTATIME
+ dataType = SCHEMA_DATA_INT;
+ subtype = "duration";
+ break;
+ case 10: // REF
+ dataType = SCHEMA_DATA_MAP;
+ subtype = "reference";
+ break;
+ case 11: // BOOL
+ dataType = SCHEMA_DATA_BOOL;
+ break;
+ case 12: // FLOAT
+ case 13: // DOUBLE
+ dataType = SCHEMA_DATA_FLOAT;
+ break;
+ case 14: // UUID
+ dataType = SCHEMA_DATA_UUID;
+ break;
+ case 15: // FTABLE
+ dataType = SCHEMA_DATA_MAP;
+ break;
+ case 16: // S8
+ case 17: // S16
+ case 18: // S32
+ case 19: // S64
+ dataType = SCHEMA_DATA_INT;
+ break;
+ case 20: // OBJECT
+ dataType = SCHEMA_DATA_MAP;
+ subtype = "data";
+ break;
+ case 21: // LIST
+ case 22: // ARRAY
+ dataType = SCHEMA_DATA_LIST;
+ break;
+ default:
+ throw QmfException("Received V1 schema with an unknown data type");
+ }
+}
+
+
+SchemaPropertyImpl& SchemaPropertyImplAccess::get(SchemaProperty& item)
+{
+ return *item.impl;
+}
+
+
+const SchemaPropertyImpl& SchemaPropertyImplAccess::get(const SchemaProperty& item)
+{
+ return *item.impl;
+}
diff --git a/qpid/cpp/src/qmf/SchemaPropertyImpl.h b/qpid/cpp/src/qmf/SchemaPropertyImpl.h
new file mode 100644
index 0000000000..cdfc29066f
--- /dev/null
+++ b/qpid/cpp/src/qmf/SchemaPropertyImpl.h
@@ -0,0 +1,93 @@
+#ifndef _QMF_SCHEMA_PROPERTY_IMPL_H_
+#define _QMF_SCHEMA_PROPERTY_IMPL_H_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/SchemaTypes.h"
+#include "qmf/SchemaProperty.h"
+#include "qpid/types/Variant.h"
+#include "qpid/management/Buffer.h"
+
+namespace qpid {
+namespace management {
+ class Buffer;
+}}
+
+namespace qmf {
+ class Hash;
+ class SchemaPropertyImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Public impl-only methods
+ //
+ SchemaPropertyImpl(const qpid::types::Variant::Map& m);
+ SchemaPropertyImpl(qpid::management::Buffer& v1Buffer);
+ qpid::types::Variant::Map asMap() const;
+ void updateHash(Hash&) const;
+ void encodeV1(qpid::management::Buffer&, bool isArg, bool isMethodArg) const;
+
+ //
+ // Methods from API handle
+ //
+ SchemaPropertyImpl(const std::string& n, int t, const std::string o);
+ void setAccess(int a) { access = a; }
+ void setIndex(bool i) { index = i; }
+ void setOptional(bool o) { optional = o; }
+ void setUnit(const std::string& u) { unit = u; }
+ void setDesc(const std::string& d) { desc = d; }
+ void setSubtype(const std::string& s) { subtype = s; }
+ void setDirection(int d) { direction = d; }
+
+ const std::string& getName() const { return name; }
+ int getType() const { return dataType; }
+ int getAccess() const { return access; }
+ bool isIndex() const { return index; }
+ bool isOptional() const { return optional; }
+ const std::string& getUnit() const { return unit; }
+ const std::string& getDesc() const { return desc; }
+ const std::string& getSubtype() const { return subtype; }
+ int getDirection() const { return direction; }
+
+ private:
+ std::string name;
+ int dataType;
+ std::string subtype;
+ int access;
+ bool index;
+ bool optional;
+ std::string unit;
+ std::string desc;
+ int direction;
+
+ uint8_t v1TypeCode() const;
+ void fromV1TypeCode(int8_t);
+ };
+
+ struct SchemaPropertyImplAccess
+ {
+ static SchemaPropertyImpl& get(SchemaProperty&);
+ static const SchemaPropertyImpl& get(const SchemaProperty&);
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/Subscription.cpp b/qpid/cpp/src/qmf/Subscription.cpp
new file mode 100644
index 0000000000..73afc8c79d
--- /dev/null
+++ b/qpid/cpp/src/qmf/Subscription.cpp
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/PrivateImplRef.h"
+#include "qmf/exceptions.h"
+#include "qmf/SubscriptionImpl.h"
+#include "qmf/DataImpl.h"
+
+using namespace std;
+using namespace qmf;
+using qpid::types::Variant;
+
+typedef PrivateImplRef<Subscription> PI;
+
+Subscription::Subscription(SubscriptionImpl* impl) { PI::ctor(*this, impl); }
+Subscription::Subscription(const Subscription& s) : qmf::Handle<SubscriptionImpl>() { PI::copy(*this, s); }
+Subscription::~Subscription() { PI::dtor(*this); }
+Subscription& Subscription::operator=(const Subscription& s) { return PI::assign(*this, s); }
+
+void Subscription::cancel() { impl->cancel(); }
+bool Subscription::isActive() const { return impl->isActive(); }
+void Subscription::lock() { impl->lock(); }
+void Subscription::unlock() { impl->unlock(); }
+uint32_t Subscription::getDataCount() const { return impl->getDataCount(); }
+Data Subscription::getData(uint32_t i) const { return impl->getData(i); }
+
+
+void SubscriptionImpl::cancel()
+{
+}
+
+
+bool SubscriptionImpl::isActive() const
+{
+ return false;
+}
+
+
+void SubscriptionImpl::lock()
+{
+}
+
+
+void SubscriptionImpl::unlock()
+{
+}
+
+
+uint32_t SubscriptionImpl::getDataCount() const
+{
+ return 0;
+}
+
+
+Data SubscriptionImpl::getData(uint32_t) const
+{
+ return Data();
+}
+
+
+SubscriptionImpl& SubscriptionImplAccess::get(Subscription& item)
+{
+ return *item.impl;
+}
+
+
+const SubscriptionImpl& SubscriptionImplAccess::get(const Subscription& item)
+{
+ return *item.impl;
+}
diff --git a/qpid/cpp/src/qmf/SubscriptionImpl.h b/qpid/cpp/src/qmf/SubscriptionImpl.h
new file mode 100644
index 0000000000..053e3cd00e
--- /dev/null
+++ b/qpid/cpp/src/qmf/SubscriptionImpl.h
@@ -0,0 +1,57 @@
+#ifndef _QMF_SUBSCRIPTION_IMPL_H_
+#define _QMF_SUBSCRIPTION_IMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/Subscription.h"
+
+namespace qmf {
+ class SubscriptionImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Public impl-only methods
+ //
+ SubscriptionImpl(int p) : placeholder(p) {}
+ ~SubscriptionImpl();
+
+ //
+ // Methods from API handle
+ //
+ void cancel();
+ bool isActive() const;
+ void lock();
+ void unlock();
+ uint32_t getDataCount() const;
+ Data getData(uint32_t) const;
+
+ private:
+ int placeholder;
+ };
+
+ struct SubscriptionImplAccess
+ {
+ static SubscriptionImpl& get(Subscription&);
+ static const SubscriptionImpl& get(const Subscription&);
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/agentCapability.h b/qpid/cpp/src/qmf/agentCapability.h
new file mode 100644
index 0000000000..6a3f6f8534
--- /dev/null
+++ b/qpid/cpp/src/qmf/agentCapability.h
@@ -0,0 +1,39 @@
+#ifndef QMF_AGENT_CAPABILITY_H
+#define QMF_AGENT_CAPABILITY_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qmf {
+
+ /**
+ * Legacy (Qpid 0.7 C++ Agent, 0.7 Broker Agent) capabilities
+ */
+ const uint32_t AGENT_CAPABILITY_LEGACY = 0;
+
+ /**
+ * Qpid 0.8 QMFv2 capabilities
+ */
+ const uint32_t AGENT_CAPABILITY_0_8 = 1;
+ const uint32_t AGENT_CAPABILITY_V2_SCHEMA = 1;
+ const uint32_t AGENT_CAPABILITY_AGENT_PREDICATE = 1;
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/constants.cpp b/qpid/cpp/src/qmf/constants.cpp
new file mode 100644
index 0000000000..6e2fd935a9
--- /dev/null
+++ b/qpid/cpp/src/qmf/constants.cpp
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "constants.h"
+
+using namespace std;
+using namespace qmf;
+
+/**
+ * Header key strings
+ */
+const string protocol::HEADER_KEY_APP_ID = "x-amqp-0-10.app-id";
+const string protocol::HEADER_KEY_METHOD = "method";
+const string protocol::HEADER_KEY_OPCODE = "qmf.opcode";
+const string protocol::HEADER_KEY_AGENT = "qmf.agent";
+const string protocol::HEADER_KEY_CONTENT = "qmf.content";
+const string protocol::HEADER_KEY_PARTIAL = "partial";
+
+/**
+ * Header values per-key
+ */
+const string protocol::HEADER_APP_ID_QMF = "qmf2";
+
+const string protocol::HEADER_METHOD_REQUEST = "request";
+const string protocol::HEADER_METHOD_RESPONSE = "response";
+const string protocol::HEADER_METHOD_INDICATION = "indication";
+
+const string protocol::HEADER_OPCODE_EXCEPTION = "_exception";
+const string protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST = "_agent_locate_request";
+const string protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE = "_agent_locate_response";
+const string protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION = "_agent_heartbeat_indication";
+const string protocol::HEADER_OPCODE_QUERY_REQUEST = "_query_request";
+const string protocol::HEADER_OPCODE_QUERY_RESPONSE = "_query_response";
+const string protocol::HEADER_OPCODE_SUBSCRIBE_REQUEST = "_subscribe_request";
+const string protocol::HEADER_OPCODE_SUBSCRIBE_RESPONSE = "_subscribe_response";
+const string protocol::HEADER_OPCODE_SUBSCRIBE_CANCEL_INDICATION = "_subscribe_cancel_indication";
+const string protocol::HEADER_OPCODE_SUBSCRIBE_REFRESH_INDICATION = "_subscribe_refresh_indication";
+const string protocol::HEADER_OPCODE_DATA_INDICATION = "_data_indication";
+const string protocol::HEADER_OPCODE_METHOD_REQUEST = "_method_request";
+const string protocol::HEADER_OPCODE_METHOD_RESPONSE = "_method_response";
+
+const string protocol::HEADER_CONTENT_SCHEMA_ID = "_schema_id";
+const string protocol::HEADER_CONTENT_SCHEMA_CLASS = "_schema_class";
+const string protocol::HEADER_CONTENT_OBJECT_ID = "_object_id";
+const string protocol::HEADER_CONTENT_DATA = "_data";
+const string protocol::HEADER_CONTENT_EVENT = "_event";
+const string protocol::HEADER_CONTENT_QUERY = "_query";
+
+/**
+ * Keywords for Agent attributes
+ */
+const string protocol::AGENT_ATTR_VENDOR = "_vendor";
+const string protocol::AGENT_ATTR_PRODUCT = "_product";
+const string protocol::AGENT_ATTR_INSTANCE = "_instance";
+const string protocol::AGENT_ATTR_NAME = "_name";
+const string protocol::AGENT_ATTR_TIMESTAMP = "_timestamp";
+const string protocol::AGENT_ATTR_HEARTBEAT_INTERVAL = "_heartbeat_interval";
+const string protocol::AGENT_ATTR_EPOCH = "_epoch";
+const string protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP = "_schema_updated";
diff --git a/qpid/cpp/src/qmf/constants.h b/qpid/cpp/src/qmf/constants.h
new file mode 100644
index 0000000000..79beaaf1ca
--- /dev/null
+++ b/qpid/cpp/src/qmf/constants.h
@@ -0,0 +1,83 @@
+#ifndef QMF_CONSTANTS_H
+#define QMF_CONSTANTS_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+
+namespace qmf {
+
+ struct protocol {
+ /**
+ * Header key strings
+ */
+ static const std::string HEADER_KEY_APP_ID;
+ static const std::string HEADER_KEY_METHOD;
+ static const std::string HEADER_KEY_OPCODE;
+ static const std::string HEADER_KEY_AGENT;
+ static const std::string HEADER_KEY_CONTENT;
+ static const std::string HEADER_KEY_PARTIAL;
+
+ /**
+ * Header values per-key
+ */
+ static const std::string HEADER_APP_ID_QMF;
+
+ static const std::string HEADER_METHOD_REQUEST;
+ static const std::string HEADER_METHOD_RESPONSE;
+ static const std::string HEADER_METHOD_INDICATION;
+
+ static const std::string HEADER_OPCODE_EXCEPTION;
+ static const std::string HEADER_OPCODE_AGENT_LOCATE_REQUEST;
+ static const std::string HEADER_OPCODE_AGENT_LOCATE_RESPONSE;
+ static const std::string HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION;
+ static const std::string HEADER_OPCODE_QUERY_REQUEST;
+ static const std::string HEADER_OPCODE_QUERY_RESPONSE;
+ static const std::string HEADER_OPCODE_SUBSCRIBE_REQUEST;
+ static const std::string HEADER_OPCODE_SUBSCRIBE_RESPONSE;
+ static const std::string HEADER_OPCODE_SUBSCRIBE_CANCEL_INDICATION;
+ static const std::string HEADER_OPCODE_SUBSCRIBE_REFRESH_INDICATION;
+ static const std::string HEADER_OPCODE_DATA_INDICATION;
+ static const std::string HEADER_OPCODE_METHOD_REQUEST;
+ static const std::string HEADER_OPCODE_METHOD_RESPONSE;
+
+ static const std::string HEADER_CONTENT_SCHEMA_ID;
+ static const std::string HEADER_CONTENT_SCHEMA_CLASS;
+ static const std::string HEADER_CONTENT_OBJECT_ID;
+ static const std::string HEADER_CONTENT_DATA;
+ static const std::string HEADER_CONTENT_EVENT;
+ static const std::string HEADER_CONTENT_QUERY;
+
+ /**
+ * Keywords for Agent attributes
+ */
+ static const std::string AGENT_ATTR_VENDOR;
+ static const std::string AGENT_ATTR_PRODUCT;
+ static const std::string AGENT_ATTR_INSTANCE;
+ static const std::string AGENT_ATTR_NAME;
+ static const std::string AGENT_ATTR_TIMESTAMP;
+ static const std::string AGENT_ATTR_HEARTBEAT_INTERVAL;
+ static const std::string AGENT_ATTR_EPOCH;
+ static const std::string AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP;
+ };
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/exceptions.cpp b/qpid/cpp/src/qmf/exceptions.cpp
new file mode 100644
index 0000000000..be212f62f7
--- /dev/null
+++ b/qpid/cpp/src/qmf/exceptions.cpp
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qmf/exceptions.h"
+
+namespace qmf {
+
+ QmfException::QmfException(const std::string& msg) : qpid::types::Exception(msg) {}
+ QmfException::~QmfException() throw() {}
+
+ KeyNotFound::KeyNotFound(const std::string& msg) : QmfException("Key Not Found: " + msg) {}
+ KeyNotFound::~KeyNotFound() throw() {}
+
+ IndexOutOfRange::IndexOutOfRange() : QmfException("Index out-of-range") {}
+ IndexOutOfRange::~IndexOutOfRange() throw() {}
+
+ OperationTimedOut::OperationTimedOut() : QmfException("Timeout Expired") {}
+ OperationTimedOut::~OperationTimedOut() throw() {}
+}
+