summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-09-21 21:48:41 +0000
committerTed Ross <tross@apache.org>2010-09-21 21:48:41 +0000
commitd47927b3e150057f6d615a0d00c8eff6c83320ac (patch)
tree6cf1da8bd7a46fd3cef8251af94f88bbad0e627d
parent81414cc0fb52efbd77e3e3bc83ed0c5dcb7fe83a (diff)
downloadqpid-python-d47927b3e150057f6d615a0d00c8eff6c83320ac.tar.gz
QMFv2 Additions:
- QMFv2 schema encoding completed - Schema queries handled by the agent and initiated by the console by user request - Full query support with predicates evaluated on the agent (regex not yet implemented) - Agent filtering in the console - Agent aging in the console - Unit tests git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@999662 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/bindings/qmf2/python/python.i11
-rw-r--r--qpid/cpp/bindings/qmf2/python/qmf2.py15
-rw-r--r--qpid/cpp/include/qmf/Agent.h42
-rw-r--r--qpid/cpp/include/qmf/ConsoleEvent.h32
-rw-r--r--qpid/cpp/include/qmf/ConsoleSession.h13
-rw-r--r--qpid/cpp/include/qmf/Query.h21
-rw-r--r--qpid/cpp/include/qmf/SchemaProperty.h1
-rw-r--r--qpid/cpp/src/qmf.mk4
-rw-r--r--qpid/cpp/src/qmf/Agent.cpp151
-rw-r--r--qpid/cpp/src/qmf/AgentImpl.h13
-rw-r--r--qpid/cpp/src/qmf/AgentSession.cpp250
-rw-r--r--qpid/cpp/src/qmf/ConsoleEvent.cpp18
-rw-r--r--qpid/cpp/src/qmf/ConsoleEventImpl.h9
-rw-r--r--qpid/cpp/src/qmf/ConsoleSession.cpp172
-rw-r--r--qpid/cpp/src/qmf/ConsoleSessionImpl.h9
-rw-r--r--qpid/cpp/src/qmf/Expression.cpp441
-rw-r--r--qpid/cpp/src/qmf/Expression.h73
-rw-r--r--qpid/cpp/src/qmf/Query.cpp167
-rw-r--r--qpid/cpp/src/qmf/QueryImpl.h77
-rw-r--r--qpid/cpp/src/qmf/Schema.cpp59
-rw-r--r--qpid/cpp/src/qmf/SchemaIdImpl.h9
-rw-r--r--qpid/cpp/src/qmf/SchemaImpl.h1
-rw-r--r--qpid/cpp/src/qmf/SchemaMethod.cpp40
-rw-r--r--qpid/cpp/src/qmf/SchemaMethodImpl.h1
-rw-r--r--qpid/cpp/src/qmf/SchemaProperty.cpp109
-rw-r--r--qpid/cpp/src/qmf/SchemaPropertyImpl.h3
-rw-r--r--qpid/cpp/src/qmf/agentCapability.h39
-rw-r--r--qpid/cpp/src/tests/Makefile.am6
-rw-r--r--qpid/cpp/src/tests/Qmf2.cpp320
29 files changed, 1806 insertions, 300 deletions
diff --git a/qpid/cpp/bindings/qmf2/python/python.i b/qpid/cpp/bindings/qmf2/python/python.i
index 7d83465bb3..02dd1632b0 100644
--- a/qpid/cpp/bindings/qmf2/python/python.i
+++ b/qpid/cpp/bindings/qmf2/python/python.i
@@ -23,13 +23,16 @@
/* Define the general-purpose exception handling */
%exception {
+ std::string error;
+ Py_BEGIN_ALLOW_THREADS;
try {
- Py_BEGIN_ALLOW_THREADS
$action
- Py_END_ALLOW_THREADS
+ } catch (qpid::types::Exception& ex) {
+ error = ex.what();
}
- catch (qpid::types::Exception& ex) {
- PyErr_SetString(PyExc_RuntimeError, ex.what());
+ Py_END_ALLOW_THREADS;
+ if (!error.empty()) {
+ PyErr_SetString(PyExc_RuntimeError, error.c_str());
return NULL;
}
}
diff --git a/qpid/cpp/bindings/qmf2/python/qmf2.py b/qpid/cpp/bindings/qmf2/python/qmf2.py
index f3ece32866..285b47ebbe 100644
--- a/qpid/cpp/bindings/qmf2/python/qmf2.py
+++ b/qpid/cpp/bindings/qmf2/python/qmf2.py
@@ -54,6 +54,11 @@ SEV_NOTICE = cqmf2.SEV_NOTICE
SEV_INFORM = cqmf2.SEV_INFORM
SEV_DEBUG = cqmf2.SEV_DEBUG
+QUERY_OBJECT = cqmf2.QUERY_OBJECT
+QUERY_OBJECT_ID = cqmf2.QUERY_OBJECT_ID
+QUERY_SCHEMA = cqmf2.QUERY_SCHEMA
+QUERY_SCHEMA_ID = cqmf2.QUERY_SCHEMA_ID
+
#===================================================================================================
# EXCEPTIONS
@@ -153,7 +158,7 @@ class ConsoleSession(object):
def setAgentFilter(self, filt):
"""
"""
- self.setAgentFilter(filt)
+ self._impl.setAgentFilter(filt)
def open(self):
"""
@@ -321,6 +326,12 @@ class Agent(object):
dataList.append(Data(result.getData(i)))
return dataList
+ def loadSchemaInfo(self, timeout=30):
+ """
+ """
+ dur = cqpid.Duration(cqpid.Duration.SECOND.getMilliseconds() * timeout)
+ self._impl.querySchema(dur)
+
def getPackages(self):
"""
"""
@@ -457,7 +468,7 @@ class Data(object):
result = agent.callMethod(name, argmap, addr, dur)
##
- ## If the agent sent an exception, raise it in a QmfAgentExeption.
+ ## If the agent sent an exception, raise it in a QmfAgentException.
##
if result.getType() == cqmf2.CONSOLE_EXCEPTION:
exdata = result.getData(0)
diff --git a/qpid/cpp/include/qmf/Agent.h b/qpid/cpp/include/qmf/Agent.h
index 8ddea0fae9..8d427ab2fb 100644
--- a/qpid/cpp/include/qmf/Agent.h
+++ b/qpid/cpp/include/qmf/Agent.h
@@ -65,10 +65,52 @@ namespace qmf {
qpid::messaging::Duration timeout=qpid::messaging::Duration::MINUTE);
QMF_EXTERN uint32_t callMethodAsync(const std::string&, const qpid::types::Variant::Map&, const DataAddr&);
+ /**
+ * Query the agent for a list of schema classes that it exposes. This operation comes in both
+ * synchronous (blocking) and asynchronous flavors.
+ *
+ * This method will typically be used after receiving an AGENT_SCHEMA_UPDATE event from the console session.
+ * It may also be used on a newly discovered agent to learn what schemata are exposed.
+ *
+ * querySchema returns a ConsoleEvent that contains a list of SchemaId objects exposed by the agent.
+ * This list is cached locally and can be locally queried using getPackage[Count] and getSchemaId[Count].
+ */
+ QMF_EXTERN ConsoleEvent querySchema(qpid::messaging::Duration timeout=qpid::messaging::Duration::MINUTE);
+ QMF_EXTERN uint32_t querySchemaAsync();
+
+ /**
+ * Get the list of schema packages exposed by the agent.
+ *
+ * getPackageCount returns the number of packages exposed.
+ * getPackage returns the name of the package by index (0..package-count)
+ *
+ * Note that both of these calls are synchronous and non-blocking. They only return locally cached data
+ * and will not send any messages to the remote agent. Use querySchema[Async] to get the latest schema
+ * information from the remote agent.
+ */
QMF_EXTERN uint32_t getPackageCount() const;
QMF_EXTERN const std::string& getPackage(uint32_t) const;
+
+ /**
+ * Get the list of schema identifiers for a particular package.
+ *
+ * getSchemaIdCount returns the number of IDs in the indicates package.
+ * getSchemaId returns the SchemaId by index (0..schema-id-count)
+ *
+ * Note that both of these calls are synchronous and non-blocking. They only return locally cached data
+ * and will not send any messages to the remote agent. Use querySchema[Async] to get the latest schema
+ * information from the remote agent.
+ */
QMF_EXTERN uint32_t getSchemaIdCount(const std::string&) const;
QMF_EXTERN SchemaId getSchemaId(const std::string&, uint32_t) const;
+
+ /**
+ * Get detailed schema information for a specified schema ID.
+ *
+ * This call will return cached information if it is available. If not, it will send a query message to the
+ * remote agent and block waiting for a response. The timeout argument specifies the maximum time to wait
+ * for a response from the agent.
+ */
QMF_EXTERN Schema getSchema(const SchemaId&, qpid::messaging::Duration timeout=qpid::messaging::Duration::MINUTE);
diff --git a/qpid/cpp/include/qmf/ConsoleEvent.h b/qpid/cpp/include/qmf/ConsoleEvent.h
index 61f625b137..3e75631a61 100644
--- a/qpid/cpp/include/qmf/ConsoleEvent.h
+++ b/qpid/cpp/include/qmf/ConsoleEvent.h
@@ -23,6 +23,9 @@
#include <qmf/ImportExport.h>
#include "qmf/Handle.h"
+#include "qmf/Agent.h"
+#include "qmf/Data.h"
+#include "qmf/SchemaId.h"
#include "qpid/types/Variant.h"
namespace qmf {
@@ -32,18 +35,24 @@ namespace qmf {
#endif
class ConsoleEventImpl;
- class Agent;
- class Data;
enum ConsoleEventCode {
- CONSOLE_AGENT_ADD = 1,
- CONSOLE_AGENT_AGE = 2,
- CONSOLE_EVENT = 3,
- CONSOLE_QUERY_RESPONSE = 4,
- CONSOLE_METHOD_RESPONSE = 5,
- CONSOLE_EXCEPTION = 6,
- CONSOLE_SUBSCRIBE_UPDATE = 7,
- CONSOLE_THREAD_FAILED = 8
+ CONSOLE_AGENT_ADD = 1,
+ CONSOLE_AGENT_DEL = 2,
+ CONSOLE_AGENT_RESTART = 3,
+ CONSOLE_AGENT_SCHEMA_UPDATE = 4,
+ CONSOLE_AGENT_SCHEMA_RESPONSE = 5,
+ CONSOLE_EVENT = 6,
+ CONSOLE_QUERY_RESPONSE = 7,
+ CONSOLE_METHOD_RESPONSE = 8,
+ CONSOLE_EXCEPTION = 9,
+ CONSOLE_SUBSCRIBE_UPDATE = 10,
+ CONSOLE_THREAD_FAILED = 11
+ };
+
+ enum AgentDelReason {
+ AGENT_DEL_AGED = 1,
+ AGENT_DEL_FILTER = 2
};
class ConsoleEvent : public qmf::Handle<ConsoleEventImpl> {
@@ -56,6 +65,9 @@ namespace qmf {
QMF_EXTERN ConsoleEventCode getType() const;
QMF_EXTERN uint32_t getCorrelator() const;
QMF_EXTERN Agent getAgent() const;
+ QMF_EXTERN AgentDelReason getAgentDelReason() const;
+ QMF_EXTERN uint32_t getSchemaIdCount() const;
+ QMF_EXTERN SchemaId getSchemaId(uint32_t) const;
QMF_EXTERN uint32_t getDataCount() const;
QMF_EXTERN Data getData(uint32_t) const;
QMF_EXTERN bool isFinal() const;
diff --git a/qpid/cpp/include/qmf/ConsoleSession.h b/qpid/cpp/include/qmf/ConsoleSession.h
index 0e58a647d6..c17f4510f1 100644
--- a/qpid/cpp/include/qmf/ConsoleSession.h
+++ b/qpid/cpp/include/qmf/ConsoleSession.h
@@ -44,6 +44,19 @@ namespace qmf {
QMF_EXTERN ConsoleSession& operator=(const ConsoleSession&);
QMF_EXTERN ~ConsoleSession();
+ /**
+ * ConsoleSession
+ * A session that runs over an AMQP connection for QMF console operation.
+ *
+ * @param connection - An opened qpid::messaging::Connection
+ * @param options - An optional string containing options
+ *
+ * The options string is of the form "{key:value,key:value}". The following keys are supported:
+ *
+ * domain:NAME - QMF Domain to join [default: "default"]
+ * max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from
+ * an agent before deleting it [default: 5]
+ */
QMF_EXTERN ConsoleSession(qpid::messaging::Connection&, const std::string& options="");
QMF_EXTERN void setDomain(const std::string&);
QMF_EXTERN void setAgentFilter(const std::string&);
diff --git a/qpid/cpp/include/qmf/Query.h b/qpid/cpp/include/qmf/Query.h
index 71ef31b104..fec4660bd7 100644
--- a/qpid/cpp/include/qmf/Query.h
+++ b/qpid/cpp/include/qmf/Query.h
@@ -36,6 +36,13 @@ namespace qmf {
class SchemaId;
class DataAddr;
+ enum QueryTarget {
+ QUERY_OBJECT = 1,
+ QUERY_OBJECT_ID = 2,
+ QUERY_SCHEMA = 3,
+ QUERY_SCHEMA_ID = 4
+ };
+
class Query : public qmf::Handle<QueryImpl> {
public:
QMF_EXTERN Query(QueryImpl* impl = 0);
@@ -43,20 +50,22 @@ namespace qmf {
QMF_EXTERN Query& operator=(const Query&);
QMF_EXTERN ~Query();
- QMF_EXTERN Query(const std::string& className, const std::string& package="", const std::string& predicate="");
- QMF_EXTERN Query(const SchemaId&);
+ QMF_EXTERN Query(QueryTarget, const std::string& predicate="");
+ QMF_EXTERN Query(QueryTarget, const std::string& className, const std::string& package, const std::string& predicate="");
+ QMF_EXTERN Query(QueryTarget, const SchemaId&, const std::string& predicate="");
QMF_EXTERN Query(const DataAddr&);
+ QMF_EXTERN QueryTarget getTarget() const;
QMF_EXTERN const DataAddr& getDataAddr() const;
QMF_EXTERN const SchemaId& getSchemaId() const;
- QMF_EXTERN const std::string& getClassName() const;
- QMF_EXTERN const std::string& getPackageName() const;
- QMF_EXTERN void addPredicate(const std::string&, const qpid::types::Variant&);
- QMF_EXTERN const qpid::types::Variant::Map& getPredicate() const;
+ QMF_EXTERN void setPredicate(const qpid::types::Variant::List&);
+ QMF_EXTERN const qpid::types::Variant::List& getPredicate() const;
+ QMF_EXTERN bool matchesPredicate(const qpid::types::Variant::Map& map) const;
#ifndef SWIG
private:
friend class qmf::PrivateImplRef<Query>;
+ friend class QueryImplAccess;
#endif
};
diff --git a/qpid/cpp/include/qmf/SchemaProperty.h b/qpid/cpp/include/qmf/SchemaProperty.h
index 2e770c2ef1..a3a328b60b 100644
--- a/qpid/cpp/include/qmf/SchemaProperty.h
+++ b/qpid/cpp/include/qmf/SchemaProperty.h
@@ -54,6 +54,7 @@ namespace qmf {
QMF_EXTERN void setDirection(int);
QMF_EXTERN const std::string& getName() const;
+ QMF_EXTERN int getType() const;
QMF_EXTERN int getAccess() const;
QMF_EXTERN bool isIndex() const;
QMF_EXTERN bool isOptional() const;
diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk
index ef02cf7562..d0a186e89f 100644
--- a/qpid/cpp/src/qmf.mk
+++ b/qpid/cpp/src/qmf.mk
@@ -85,6 +85,7 @@ libqmf_la_SOURCES = \
libqmf2_la_SOURCES = \
$(QMF2_API) \
+ qmf/agentCapability.h \
qmf/Agent.cpp \
qmf/AgentEvent.cpp \
qmf/AgentEventImpl.h \
@@ -99,10 +100,13 @@ libqmf2_la_SOURCES = \
qmf/Data.cpp \
qmf/DataImpl.h \
qmf/exceptions.cpp \
+ qmf/Expression.cpp \
+ qmf/Expression.h \
qmf/Hash.cpp \
qmf/Hash.h \
qmf/PrivateImplRef.h \
qmf/Query.cpp \
+ qmf/QueryImpl.h \
qmf/Schema.cpp \
qmf/SchemaCache.cpp \
qmf/SchemaCache.h \
diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp
index c7ccea35d5..05bf1a38aa 100644
--- a/qpid/cpp/src/qmf/Agent.cpp
+++ b/qpid/cpp/src/qmf/Agent.cpp
@@ -25,6 +25,8 @@
#include "qmf/ConsoleSession.h"
#include "qmf/DataImpl.h"
#include "qmf/Query.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/agentCapability.h"
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/AddressParser.h"
#include "qpid/management/Buffer.h"
@@ -51,6 +53,8 @@ string Agent::getProduct() const { return isValid() ? impl->getProduct() : ""; }
string Agent::getInstance() const { return isValid() ? impl->getInstance() : ""; }
const Variant& Agent::getAttribute(const string& k) const { return impl->getAttribute(k); }
const Variant::Map& Agent::getAttributes() const { return impl->getAttributes(); }
+ConsoleEvent Agent::querySchema(Duration t) { return impl->querySchema(t); }
+uint32_t Agent::querySchemaAsync() { return impl->querySchemaAsync(); }
ConsoleEvent Agent::query(const Query& q, Duration t) { return impl->query(q, t); }
ConsoleEvent Agent::query(const string& q, Duration t) { return impl->query(q, t); }
uint32_t Agent::queryAsync(const Query& q) { return impl->queryAsync(q); }
@@ -66,11 +70,20 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema(
AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) :
- name(n), epoch(e), session(s), touched(true), untouchedCount(0),
+ name(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
nextCorrelator(1), schemaCache(s.schemaCache)
{
}
+void AgentImpl::setAttribute(const std::string& k, const qpid::types::Variant& v)
+{
+ attributes[k] = v;
+ if (k == "qmf.agent_capability")
+ try {
+ capability = v.asUint32();
+ } catch (std::exception&) {}
+}
+
const Variant& AgentImpl::getAttribute(const string& k) const
{
Variant::Map::const_iterator iter = attributes.find(k);
@@ -79,6 +92,7 @@ const Variant& AgentImpl::getAttribute(const string& k) const
return iter->second;
}
+
ConsoleEvent AgentImpl::query(const Query& query, Duration timeout)
{
boost::shared_ptr<SyncContext> context(new SyncContext());
@@ -258,7 +272,6 @@ SchemaId AgentImpl::getSchemaId(const string& pname, uint32_t idx) const
Schema AgentImpl::getSchema(const SchemaId& id, Duration timeout)
{
- qpid::sys::Mutex::ScopedLock l(lock);
if (!schemaCache->haveSchema(id))
//
// The desired schema is not in the cache. We need to asynchronously query the remote
@@ -375,6 +388,14 @@ void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& ms
if (aIter == props.end())
final = true;
+ aIter = props.find("qmf.content");
+ if (aIter == props.end())
+ return;
+
+ string content_type(aIter->second.asString());
+ if (content_type != "_schema" && content_type != "_schema_id" && content_type != "_data")
+ return;
+
try { correlator = boost::lexical_cast<uint32_t>(cid); }
catch(const boost::bad_lexical_cast&) { correlator = 0; }
@@ -392,12 +413,26 @@ void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& ms
qpid::sys::Mutex::ScopedLock cl(context->lock);
if (!context->response.isValid())
context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE));
- for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
- Data data(new DataImpl(lIter->asMap(), this));
- ConsoleEventImplAccess::get(context->response).addData(data);
- if (data.hasSchema())
- learnSchemaId(data.getSchemaId());
- }
+
+ if (content_type == "_data")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ Data data(new DataImpl(lIter->asMap(), this));
+ ConsoleEventImplAccess::get(context->response).addData(data);
+ if (data.hasSchema())
+ learnSchemaId(data.getSchemaId());
+ }
+ else if (content_type == "_schema_id")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ SchemaId schemaId(new SchemaIdImpl(lIter->asMap()));
+ ConsoleEventImplAccess::get(context->response).addSchemaId(schemaId);
+ learnSchemaId(schemaId);
+ }
+ else if (content_type == "_schema")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ Schema schema(new SchemaImpl(lIter->asMap()));
+ schemaCache->declareSchema(schema);
+ }
+
if (final) {
ConsoleEventImplAccess::get(context->response).setFinal();
ConsoleEventImplAccess::get(context->response).setAgent(this);
@@ -410,15 +445,30 @@ void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& ms
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE));
eventImpl->setCorrelator(correlator);
eventImpl->setAgent(this);
- for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
- Data data(new DataImpl(lIter->asMap(), this));
- eventImpl->addData(data);
- if (data.hasSchema())
- learnSchemaId(data.getSchemaId());
- }
+
+ if (content_type == "_data")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ Data data(new DataImpl(lIter->asMap(), this));
+ eventImpl->addData(data);
+ if (data.hasSchema())
+ learnSchemaId(data.getSchemaId());
+ }
+ else if (content_type == "_schema_id")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ SchemaId schemaId(new SchemaIdImpl(lIter->asMap()));
+ eventImpl->addSchemaId(schemaId);
+ learnSchemaId(schemaId);
+ }
+ else if (content_type == "_schema")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ Schema schema(new SchemaImpl(lIter->asMap()));
+ schemaCache->declareSchema(schema);
+ }
+
if (final)
eventImpl->setFinal();
- session.enqueueEvent(eventImpl.release());
+ if (content_type != "_schema")
+ session.enqueueEvent(eventImpl.release());
}
}
@@ -441,14 +491,11 @@ Query AgentImpl::stringToQuery(const std::string& text)
if (iter != map.end())
packageName = iter->second.asString();
- Query query(className, packageName);
+ Query query(QUERY_OBJECT, className, packageName);
iter = map.find("where");
- if (iter != map.end()) {
- const Variant::Map& pred(iter->second.asMap());
- for (iter = pred.begin(); iter != pred.end(); iter++)
- query.addPredicate(iter->first, iter->second);
- }
+ if (iter != map.end())
+ query.setPredicate(iter->second.asList());
return query;
}
@@ -464,42 +511,11 @@ void AgentImpl::sendQuery(const Query& query, uint32_t correlator)
headers["qmf.opcode"] = "_query_request";
headers["x-amqp-0-10.app-id"] = "qmf2";
- map["_what"] = "OBJECT";
-
- const DataAddr& dataAddr(query.getDataAddr());
- const SchemaId& schemaId(query.getSchemaId());
-
- if (dataAddr.isValid())
- map["_object_id"] = dataAddr.asMap();
- else {
- string className;
- string packageName;
- if (schemaId.isValid()) {
- className = schemaId.getName();
- packageName = schemaId.getPackageName();
- } else {
- className = query.getClassName();
- if (className.empty())
- throw QmfException("Invalid Query");
- packageName = query.getPackageName();
- }
- Variant::Map idMap;
- idMap["_class_name"] = className;
- if (!packageName.empty())
- idMap["_package_name"] = packageName;
- map["_schema_id"] = idMap;
- }
-
- //
- // TODO: Encode a simple-predicate if present.
- //
-
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
- encode(map, msg);
- Sender sender(session.session.createSender(session.directBase + "/" + name));
- sender.send(msg);
- sender.close();
+ msg.setSubject(name);
+ encode(QueryImplAccess::get(query).asMap(), msg);
+ session.directSender.send(msg);
QPID_LOG(trace, "SENT QueryRequest to=" << name);
}
@@ -521,17 +537,27 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
+ msg.setSubject(name);
encode(map, msg);
- Sender sender(session.session.createSender(session.directBase + "/" + name));
- sender.send(msg);
- sender.close();
+ session.directSender.send(msg);
QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name);
}
void AgentImpl::sendSchemaRequest(const SchemaId& id)
{
- // TODO: Check agent's capability value to determine which kind of schema request to make
+ uint32_t correlator;
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ correlator = nextCorrelator++;
+ }
+
+ if (capability >= AGENT_CAPABILITY_V2_SCHEMA) {
+ Query query(QUERY_SCHEMA, id);
+ sendQuery(query, correlator);
+ return;
+ }
#define RAW_BUFFER_SIZE 1024
char rawBuffer[RAW_BUFFER_SIZE];
@@ -541,7 +567,7 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id)
buffer.putOctet('M');
buffer.putOctet('2');
buffer.putOctet('S');
- buffer.putLong(nextCorrelator++);
+ buffer.putLong(correlator);
buffer.putShortString(id.getPackageName());
buffer.putShortString(id.getName());
buffer.putBin128(id.getHash().data());
@@ -551,9 +577,8 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id)
Message msg;
msg.setReplyTo(session.replyAddress);
msg.setContent(content);
- Sender sender(session.session.createSender(session.directBase + "/" + name));
- sender.send(msg);
- sender.close();
+ msg.setSubject(name);
+ session.directSender.send(msg);
QPID_LOG(trace, "SENT V1SchemaRequest to=" << name);
}
diff --git a/qpid/cpp/src/qmf/AgentImpl.h b/qpid/cpp/src/qmf/AgentImpl.h
index d5d2a2fdb2..aadc81b5bd 100644
--- a/qpid/cpp/src/qmf/AgentImpl.h
+++ b/qpid/cpp/src/qmf/AgentImpl.h
@@ -25,6 +25,7 @@
#include "qmf/Agent.h"
#include "qmf/ConsoleEventImpl.h"
#include "qmf/ConsoleSessionImpl.h"
+#include "qmf/QueryImpl.h"
#include "qmf/SchemaCache.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/Message.h"
@@ -41,10 +42,11 @@ namespace qmf {
// Impl-only methods
//
AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s);
- void setAttribute(const std::string& k, const qpid::types::Variant& v) { attributes[k] = v; }
+ void setAttribute(const std::string& k, const qpid::types::Variant& v);
void setAttribute(const std::string& k, const std::string& v) { attributes[k] = v; }
void touch() { touched = true; }
- uint32_t age() { untouchedCount = touched ? 0 : untouchedCount + 1; return untouchedCount; }
+ uint32_t age() { untouchedCount = touched ? 0 : untouchedCount + 1; touched = false; return untouchedCount; }
+ uint32_t getCapability() const { return capability; }
void handleException(const qpid::types::Variant::Map&, const qpid::messaging::Message&);
void handleMethodResponse(const qpid::types::Variant::Map&, const qpid::messaging::Message&);
void handleDataIndication(const qpid::types::Variant::List&, const qpid::messaging::Message&);
@@ -61,6 +63,9 @@ namespace qmf {
const qpid::types::Variant& getAttribute(const std::string& k) const;
const qpid::types::Variant::Map& getAttributes() const { return attributes; }
+ ConsoleEvent querySchema(qpid::messaging::Duration t) { return query(Query(QUERY_SCHEMA_ID), t); }
+ uint32_t querySchemaAsync() { return queryAsync(Query(QUERY_SCHEMA_ID)); }
+
ConsoleEvent query(const Query& q, qpid::messaging::Duration t);
ConsoleEvent query(const std::string& q, qpid::messaging::Duration t);
uint32_t queryAsync(const Query& q);
@@ -88,15 +93,17 @@ namespace qmf {
ConsoleSessionImpl& session;
bool touched;
uint32_t untouchedCount;
+ uint32_t capability;
qpid::types::Variant::Map attributes;
uint32_t nextCorrelator;
std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap;
boost::shared_ptr<SchemaCache> schemaCache;
mutable std::set<std::string> packageSet;
- std::set<SchemaId> schemaIdSet;
+ std::set<SchemaId, SchemaIdCompare> schemaIdSet;
Query stringToQuery(const std::string&);
void sendQuery(const Query&, uint32_t);
+ void sendSchemaIdQuery(uint32_t);
void sendMethod(const std::string&, const qpid::types::Variant::Map&, const DataAddr&, uint32_t);
void sendSchemaRequest(const SchemaId&);
void learnSchemaId(const SchemaId&);
diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp
index 28c324cc02..eca48d6b83 100644
--- a/qpid/cpp/src/qmf/AgentSession.cpp
+++ b/qpid/cpp/src/qmf/AgentSession.cpp
@@ -28,7 +28,8 @@
#include "qmf/SchemaImpl.h"
#include "qmf/DataAddrImpl.h"
#include "qmf/DataImpl.h"
-#include "qmf/Query.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/agentCapability.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Condition.h"
#include "qpid/sys/Thread.h"
@@ -86,11 +87,14 @@ namespace qmf {
private:
typedef map<DataAddr, Data, DataAddrCompare> DataIndex;
+ typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap;
mutable qpid::sys::Mutex lock;
qpid::sys::Condition cond;
Connection connection;
Session session;
+ Sender directSender;
+ Sender topicSender;
string domain;
Variant::Map attributes;
Variant::Map options;
@@ -103,6 +107,7 @@ namespace qmf {
uint32_t interval;
uint64_t lastHeartbeat;
uint64_t lastVisit;
+ bool forceHeartbeat;
bool externalStorage;
bool autoAllowQueries;
bool autoAllowMethods;
@@ -110,21 +115,20 @@ namespace qmf {
string directBase;
string topicBase;
- set<string> packages;
- map<SchemaId, Schema, SchemaIdCompare> schemata;
+ SchemaMap schemata;
DataIndex globalIndex;
- map<SchemaId, DataIndex, SchemaIdCompare> schemaIndex;
+ map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex;
void checkOpen();
void setAgentName();
void enqueueEvent(const AgentEvent&);
- void handleLocateRequest(const Variant::Map& content, const Message& msg);
+ void handleLocateRequest(const Variant::List& content, const Message& msg);
void handleMethodRequest(const Variant::Map& content, const Message& msg);
void handleQueryRequest(const Variant::Map& content, const Message& msg);
+ void handleSchemaRequest(AgentEvent&);
void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&);
void dispatch(Message);
void sendHeartbeat();
- bool predicateMatch(const Query&, const Data&);
void flushResponses(AgentEvent&, bool);
void periodicProcessing(uint64_t);
void run();
@@ -166,14 +170,14 @@ void AgentSession::raiseEvent(const Data& d) { impl->raiseEvent(d); }
AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
connection(c), domain("default"), opened(false), thread(0), threadCanceled(false),
- bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), externalStorage(false),
- autoAllowQueries(true), autoAllowMethods(true),
+ bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
+ externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
{
//
// Set Capability Level to 1
//
- attributes["_capability_level"] = 1;
+ attributes["qmf.agent_capability"] = AGENT_CAPABILITY_0_8;
if (!options.empty()) {
qpid::messaging::AddressParser parser(options);
@@ -234,6 +238,9 @@ void AgentSessionImpl::open()
directRx.setCapacity(64);
topicRx.setCapacity(64);
+ directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}");
+
// Start the receiver thread
threadCanceled = false;
thread = new qpid::sys::Thread(*this);
@@ -280,18 +287,19 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
void AgentSessionImpl::registerSchema(Schema& schema)
{
- qpid::sys::Mutex::ScopedLock l(lock);
- schemaUpdateTime = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
-
if (!schema.isFinalized())
schema.finalize();
-
const SchemaId& schemaId(schema.getSchemaId());
- const string& packageName(schemaId.getPackageName());
- packages.insert(packageName);
+ qpid::sys::Mutex::ScopedLock l(lock);
schemata[schemaId] = schema;
schemaIndex[schemaId] = DataIndex();
+
+ //
+ // Get the news out at the next periodic interval that there is new schema information.
+ //
+ schemaUpdateTime = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
+ forceHeartbeat = true;
}
@@ -380,34 +388,14 @@ void AgentSessionImpl::authAccept(AgentEvent& authEvent)
map<SchemaId, DataIndex>::const_iterator iter = schemaIndex.find(query.getSchemaId());
if (iter != schemaIndex.end())
for (DataIndex::const_iterator dIter = iter->second.begin(); dIter != iter->second.end(); dIter++)
- if (predicateMatch(query, dIter->second))
+ if (query.matchesPredicate(dIter->second.getProperties()))
response(event, dIter->second);
}
complete(event);
return;
}
- const string& className(query.getClassName());
- const string& packageName(query.getPackageName());
-
- if (className.empty()) {
- raiseException(event, "Query is Invalid");
- return;
- }
-
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- map<SchemaId, DataIndex>::const_iterator sIter;
- for (sIter = schemaIndex.begin(); sIter != schemaIndex.end(); sIter++) {
- const SchemaId& schemaId(sIter->first);
- if (schemaId.getName() == className &&
- (packageName.empty() || schemaId.getPackageName() == packageName))
- for (DataIndex::const_iterator dIter = sIter->second.begin(); dIter != sIter->second.end(); dIter++)
- if (predicateMatch(query, dIter->second))
- response(event, dIter->second);
- }
- }
- complete(event);
+ raiseException(event, "Query is Invalid");
}
@@ -501,10 +489,6 @@ void AgentSessionImpl::raiseEvent(const Data& data)
Variant::Map map;
Variant::Map& headers(msg.getProperties());
- std::stringstream address;
-
- address << topicBase << "/agent.ind.event";
-
// TODO: add severity.package.class to key
// or modify to send only to subscriptions with matching queries
@@ -513,13 +497,12 @@ void AgentSessionImpl::raiseEvent(const Data& data)
headers["qmf.content"] = "_event";
headers["qmf.agent"] = agentName;
headers["x-amqp-0-10.app-id"] = "qmf2";
+ msg.setSubject("agent.ind.event");
encode(DataImplAccess::get(data).asMap(), msg);
- Sender sender(session.createSender(address.str()));
- sender.send(msg);
- sender.close();
+ topicSender.send(msg);
- QPID_LOG(trace, "SENT EventIndication to=" << address.str());
+ QPID_LOG(trace, "SENT EventIndication to=agent.ind.event");
}
@@ -571,10 +554,19 @@ void AgentSessionImpl::setAgentName()
}
-void AgentSessionImpl::handleLocateRequest(const Variant::Map&, const Message& msg)
+void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg)
{
QPID_LOG(trace, "RCVD AgentLocateRequest");
+ if (!predicate.empty()) {
+ Query agentQuery(QUERY_OBJECT);
+ agentQuery.setPredicate(predicate);
+ if (!agentQuery.matchesPredicate(attributes)) {
+ QPID_LOG(trace, "AgentLocate predicate does not match this agent, ignoring");
+ return;
+ }
+ }
+
Message reply;
Variant::Map map;
Variant::Map& headers(reply.getProperties());
@@ -643,64 +635,72 @@ void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Mes
//
// Construct an AgentEvent to be sent to the application or directly handled by the agent.
//
+ auto_ptr<QueryImpl> queryImpl(new QueryImpl(content));
auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_AUTH_QUERY));
eventImpl->setUserId(msg.getUserId());
eventImpl->setReplyTo(msg.getReplyTo());
eventImpl->setCorrelationId(msg.getCorrelationId());
+ eventImpl->setQuery(queryImpl.release());
+ AgentEvent ae(eventImpl.release());
- Query query;
- Variant::Map::const_iterator iter;
-
- iter = content.find("_what");
- if (iter == content.end()) {
- QPID_LOG(error, "Received QueryRequest with no _what element");
+ if (ae.getQuery().getTarget() == QUERY_SCHEMA_ID || ae.getQuery().getTarget() == QUERY_SCHEMA) {
+ handleSchemaRequest(ae);
return;
}
- if (iter->second.asString() == "OBJECT") {
- //
- // This is an object query, handle the various flavors of query.
- //
- iter = content.find("_object_id");
- if (iter != content.end()) {
- auto_ptr<DataAddrImpl> addrImpl(new DataAddrImpl(iter->second.asMap()));
- query = Query(DataAddr(addrImpl.release()));
- } else {
- iter = content.find("_schema_id");
- if (iter != content.end()) {
- const Variant::Map& map(iter->second.asMap());
- string className;
- string packageName;
-
- iter = map.find("_class_name");
- if (iter == map.end()) {
- QPID_LOG(error, "Received QueryRequest with no invalid schemaId");
- return;
- }
+ if (autoAllowQueries)
+ authAccept(ae);
+ else
+ enqueueEvent(ae);
+}
- className = iter->second.asString();
- iter = map.find("_package_name");
- if (iter != map.end())
- packageName = iter->second.asString();
- query = Query(className, packageName);
- } else {
- QPID_LOG(error, "Received QueryRequest with no valid elements");
- return;
- }
- }
+void AgentSessionImpl::handleSchemaRequest(AgentEvent& event)
+{
+ SchemaMap::const_iterator iter;
+ string error;
+ const Query& query(event.getQuery());
+
+ Message msg;
+ Variant::List content;
+ Variant::Map map;
+ Variant::Map& headers(msg.getProperties());
- eventImpl->setQuery(query);
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.agent"] = agentName;
+ headers["x-amqp-0-10.app-id"] = "qmf2";
- if (autoAllowQueries) {
- AgentEvent ae(eventImpl.release());
- authAccept(ae);
- } else
- enqueueEvent(AgentEvent(eventImpl.release()));
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (query.getTarget() == QUERY_SCHEMA_ID) {
+ headers["qmf.content"] = "_schema_id";
+ for (iter = schemata.begin(); iter != schemata.end(); iter++)
+ content.push_back(SchemaIdImplAccess::get(iter->first).asMap());
+ } else if (query.getSchemaId().isValid()) {
+ headers["qmf.content"] = "_schema";
+ iter = schemata.find(query.getSchemaId());
+ if (iter != schemata.end())
+ content.push_back(SchemaImplAccess::get(iter->second).asMap());
+ } else {
+ error = "Invalid Schema Query: Requests for SCHEMA must supply a valid schema ID.";
+ }
+ }
- } else if (iter->second.asString() == "SCHEMA") {
- // TODO: process a v2 schema request
+ if (!error.empty()) {
+ raiseException(event, error);
+ return;
}
+
+ AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
+
+ msg.setCorrelationId(eventImpl.getCorrelationId());
+ encode(content, msg);
+ Sender sender(session.createSender(eventImpl.getReplyTo()));
+ sender.send(msg);
+ sender.close();
+
+ QPID_LOG(trace, "SENT QueryResponse(Schema) to=" << eventImpl.getReplyTo());
}
@@ -723,17 +723,20 @@ void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, u
SchemaId dataId(SCHEMA_TYPE_DATA, packageName, className);
dataId.setHash(hash);
- iter = schemata.find(dataId);
- if (iter != schemata.end())
- replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq);
- else {
- SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className);
- eventId.setHash(hash);
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
iter = schemata.find(dataId);
if (iter != schemata.end())
replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq);
- else
- return;
+ else {
+ SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className);
+ eventId.setHash(hash);
+ iter = schemata.find(dataId);
+ if (iter != schemata.end())
+ replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq);
+ else
+ return;
+ }
}
Message reply;
@@ -765,23 +768,30 @@ void AgentSessionImpl::dispatch(Message msg)
return;
}
- if (msg.getContentType() != "amqp/map") {
- QPID_LOG(trace, "Message received with content type '" << msg.getContentType() <<
- "'. Expected 'amqp/map'");
- return;
- }
+ const string& opcode = iter->second.asString();
- Variant::Map content;
- decode(msg, content);
+ if (msg.getContentType() == "amqp/list") {
+ Variant::List content;
+ decode(msg, content);
- const string& opcode = iter->second.asString();
+ if (opcode == "_agent_locate_request") handleLocateRequest(content, msg);
+ else {
+ QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/list' content: " << opcode);
+ }
- if (opcode == "_agent_locate_request") handleLocateRequest(content, msg);
- else if (opcode == "_method_request") handleMethodRequest(content, msg);
- else if (opcode == "_query_request") handleQueryRequest(content, msg);
- else {
- QPID_LOG(trace, "Unknown QMFv2 opcode: " << opcode);
+ } else if (msg.getContentType() == "amqp/map") {
+ Variant::Map content;
+ decode(msg, content);
+
+ if (opcode == "_method_request") handleMethodRequest(content, msg);
+ else if (opcode == "_query_request") handleQueryRequest(content, msg);
+ else {
+ QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/map' content: " << opcode);
+ }
+ } else {
+ QPID_LOG(trace, "Unexpected QMFv2 content type. Expected amqp/list or amqp/map");
}
+
} else {
//
// Dispatch a QMFv1 formatted message
@@ -812,7 +822,7 @@ void AgentSessionImpl::sendHeartbeat()
Variant::Map& headers(msg.getProperties());
std::stringstream address;
- address << topicBase << "/agent.ind.heartbeat";
+ address << "agent.ind.heartbeat";
// append .<vendor>.<product> to address key if present.
Variant::Map::const_iterator v;
@@ -827,6 +837,7 @@ void AgentSessionImpl::sendHeartbeat()
headers["qmf.opcode"] = "_agent_heartbeat_indication";
headers["qmf.agent"] = agentName;
headers["x-amqp-0-10.app-id"] = "qmf2";
+ msg.setSubject(address.str());
map["_values"] = attributes;
map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
@@ -835,17 +846,8 @@ void AgentSessionImpl::sendHeartbeat()
map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime;
encode(map, msg);
- Sender sender = session.createSender(address.str());
- sender.send(msg);
+ topicSender.send(msg);
QPID_LOG(trace, "SENT AgentHeartbeat name=" << agentName);
- sender.close();
-}
-
-
-bool AgentSessionImpl::predicateMatch(const Query&, const Data&)
-{
- // TODO: Implement a proper predicate match
- return true;
}
@@ -901,8 +903,9 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds)
//
// If the hearbeat interval has elapsed, send a heartbeat.
//
- if (seconds - lastHeartbeat >= interval) {
+ if (forceHeartbeat || (seconds - lastHeartbeat >= interval)) {
lastHeartbeat = seconds;
+ forceHeartbeat = false;
sendHeartbeat();
}
@@ -941,4 +944,3 @@ void AgentSessionImpl::run()
QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
}
-
diff --git a/qpid/cpp/src/qmf/ConsoleEvent.cpp b/qpid/cpp/src/qmf/ConsoleEvent.cpp
index d5775a86b4..b76abc83c6 100644
--- a/qpid/cpp/src/qmf/ConsoleEvent.cpp
+++ b/qpid/cpp/src/qmf/ConsoleEvent.cpp
@@ -37,12 +37,28 @@ ConsoleEvent& ConsoleEvent::operator=(const ConsoleEvent& s) { return PI::assign
ConsoleEventCode ConsoleEvent::getType() const { return impl->getType(); }
uint32_t ConsoleEvent::getCorrelator() const { return impl->getCorrelator(); }
Agent ConsoleEvent::getAgent() const { return impl->getAgent(); }
+AgentDelReason ConsoleEvent::getAgentDelReason() const { return impl->getAgentDelReason(); }
+uint32_t ConsoleEvent::getSchemaIdCount() const { return impl->getSchemaIdCount(); }
+SchemaId ConsoleEvent::getSchemaId(uint32_t i) const { return impl->getSchemaId(i); }
uint32_t ConsoleEvent::getDataCount() const { return impl->getDataCount(); }
Data ConsoleEvent::getData(uint32_t i) const { return impl->getData(i); }
bool ConsoleEvent::isFinal() const { return impl->isFinal(); }
const Variant::Map& ConsoleEvent::getArguments() const { return impl->getArguments(); }
-Data ConsoleEventImpl::getData(uint32_t i) const {
+
+SchemaId ConsoleEventImpl::getSchemaId(uint32_t i) const
+{
+ uint32_t count = 0;
+ for (list<SchemaId>::const_iterator iter = newSchemaIds.begin(); iter != newSchemaIds.end(); iter++) {
+ if (count++ == i)
+ return *iter;
+ }
+ throw IndexOutOfRange();
+}
+
+
+Data ConsoleEventImpl::getData(uint32_t i) const
+{
uint32_t count = 0;
for (list<Data>::const_iterator iter = dataList.begin(); iter != dataList.end(); iter++) {
if (count++ == i)
diff --git a/qpid/cpp/src/qmf/ConsoleEventImpl.h b/qpid/cpp/src/qmf/ConsoleEventImpl.h
index fe7405bb06..e7acb54152 100644
--- a/qpid/cpp/src/qmf/ConsoleEventImpl.h
+++ b/qpid/cpp/src/qmf/ConsoleEventImpl.h
@@ -34,10 +34,12 @@ namespace qmf {
//
// Impl-only methods
//
- ConsoleEventImpl(ConsoleEventCode e) : eventType(e), correlator(0), final(false) {}
+ ConsoleEventImpl(ConsoleEventCode e, AgentDelReason r = AGENT_DEL_AGED) :
+ eventType(e), delReason(r), correlator(0), final(false) {}
void setCorrelator(uint32_t c) { correlator = c; }
void setAgent(const Agent& a) { agent = a; }
void addData(const Data& d) { dataList.push_back(Data(d)); }
+ void addSchemaId(const SchemaId& s) { newSchemaIds.push_back(SchemaId(s)); }
void setFinal() { final = true; }
void setArguments(const qpid::types::Variant::Map& a) { arguments = a; }
@@ -47,6 +49,9 @@ namespace qmf {
ConsoleEventCode getType() const { return eventType; }
uint32_t getCorrelator() const { return correlator; }
Agent getAgent() const { return agent; }
+ AgentDelReason getAgentDelReason() const { return delReason; }
+ uint32_t getSchemaIdCount() const { return newSchemaIds.size(); }
+ SchemaId getSchemaId(uint32_t) const;
uint32_t getDataCount() const { return dataList.size(); }
Data getData(uint32_t i) const;
bool isFinal() const { return final; }
@@ -54,10 +59,12 @@ namespace qmf {
private:
const ConsoleEventCode eventType;
+ const AgentDelReason delReason;
uint32_t correlator;
Agent agent;
bool final;
std::list<Data> dataList;
+ std::list<SchemaId> newSchemaIds;
qpid::types::Variant::Map arguments;
};
diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp
index 18986222c1..868df302ce 100644
--- a/qpid/cpp/src/qmf/ConsoleSession.cpp
+++ b/qpid/cpp/src/qmf/ConsoleSession.cpp
@@ -57,8 +57,9 @@ Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnecte
//========================================================================================
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), opened(false), thread(0), threadCanceled(false),
- lastVisit(0), schemaCache(new SchemaCache())
+ connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false),
+ thread(0), threadCanceled(false),
+ lastVisit(0), lastAgePass(0), schemaCache(new SchemaCache())
{
if (!options.empty()) {
qpid::messaging::AddressParser parser(options);
@@ -70,6 +71,10 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
iter = optMap.find("domain");
if (iter != optMap.end())
domain = iter->second.asString();
+
+ iter = optMap.find("max-agent-age");
+ if (iter != optMap.end())
+ maxAgentAgeMinutes = iter->second.asUint32();
}
}
@@ -81,13 +86,35 @@ ConsoleSessionImpl::~ConsoleSessionImpl()
}
-void ConsoleSessionImpl::setAgentFilter(const string&)
+void ConsoleSessionImpl::setAgentFilter(const string& predicate)
{
+ agentQuery = Query(QUERY_OBJECT, predicate);
+
//
- // TODO: Setup the new agent filter
- // TODO: Purge the agent list of any agents that don't match the filter
- // TODO: Send an agent locate with the new filter
+ // Purge the agent list of any agents that don't match the filter.
//
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ map<string, Agent> toDelete;
+ for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++)
+ if ((iter->second.getName() != connectedBrokerAgent.getName()) &&
+ (!agentQuery.matchesPredicate(iter->second.getAttributes()))) {
+ toDelete[iter->first] = iter->second;
+ }
+
+ for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) {
+ agents.erase(iter->first);
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_FILTER));
+ eventImpl->setAgent(iter->second);
+ enqueueEventLH(eventImpl.release());
+ }
+ }
+
+ //
+ // Broadcast an agent locate request with our new criteria.
+ //
+ if (opened)
+ sendAgentLocate();
}
@@ -111,15 +138,23 @@ void ConsoleSessionImpl::open()
Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}");
directRx.setCapacity(64);
- topicRx.setCapacity(64);
+ topicRx.setCapacity(128);
legacyRx.setCapacity(64);
+ directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}");
+
+ directSender.setCapacity(64);
+ topicSender.setCapacity(128);
+
// Start the receiver thread
threadCanceled = false;
thread = new qpid::sys::Thread(*this);
// Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
sendBrokerLocate();
+ if (agentQuery)
+ sendAgentLocate();
opened = true;
}
@@ -198,18 +233,17 @@ void ConsoleSessionImpl::dispatch(Message msg)
{
const Variant::Map& properties(msg.getProperties());
Variant::Map::const_iterator iter;
+ Variant::Map::const_iterator oiter;
+ oiter = properties.find("qmf.opcode");
iter = properties.find("x-amqp-0-10.app-id");
- if (iter != properties.end() && iter->second.asString() == "qmf2") {
+ if (iter == properties.end())
+ iter = properties.find("app_id");
+ if (iter != properties.end() && iter->second.asString() == "qmf2" && oiter != properties.end()) {
//
// Dispatch a QMFv2 formatted message
//
- iter = properties.find("qmf.opcode");
- if (iter == properties.end()) {
- QPID_LOG(trace, "Message received with no 'qmf.opcode' header");
- return;
- }
- const string& opcode = iter->second.asString();
+ const string& opcode = oiter->second.asString();
iter = properties.find("qmf.agent");
if (iter == properties.end()) {
@@ -240,11 +274,8 @@ void ConsoleSessionImpl::dispatch(Message msg)
return;
}
- if (!agent.isValid()) {
- QPID_LOG(trace, "Received a QMFv2 message with opcode=" << opcode <<
- " from an unknown agent " << agentName);
+ if (!agent.isValid())
return;
- }
AgentImpl& agentImpl(AgentImplAccess::get(agent));
@@ -305,14 +336,34 @@ void ConsoleSessionImpl::sendBrokerLocate()
msg.setReplyTo(replyAddress);
msg.setCorrelationId("broker-locate");
- Sender sender(session.createSender(directBase + "/broker"));
- sender.send(msg);
- sender.close();
+ msg.setSubject("broker");
+
+ directSender.send(msg);
QPID_LOG(trace, "SENT AgentLocate to broker");
}
+void ConsoleSessionImpl::sendAgentLocate()
+{
+ Message msg;
+ Variant::Map& headers(msg.getProperties());
+
+ headers["method"] = "request";
+ headers["qmf.opcode"] = "_agent_locate_request";
+ headers["x-amqp-0-10.app-id"] = "qmf2";
+
+ msg.setReplyTo(replyAddress);
+ msg.setCorrelationId("agent-locate");
+ msg.setSubject("console.request.agent_locate");
+ encode(agentQuery.getPredicate(), msg);
+
+ topicSender.send(msg);
+
+ QPID_LOG(trace, "SENT AgentLocate to topic");
+}
+
+
void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Variant::Map& content, const Message& msg)
{
Variant::Map::const_iterator iter;
@@ -326,18 +377,25 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
Variant::Map attrs(iter->second.asMap());
//
- // TODO: Check this agent against the agent filter. Exit if it doesn't match.
- // (only if this isn't the connected broker agent)
+ // Check this agent against the agent filter. Exit if it doesn't match.
+ // (only if this isn't the connected broker agent)
//
+ if ((cid != "broker-locate") && agentQuery && (!agentQuery.matchesPredicate(attrs)))
+ return;
+
+ QPID_LOG(trace, "RCVD AgentHeartbeat from an agent matching our filter: " << agentName);
- iter = content.find("epoch");
- if (iter != content.end())
+ iter = attrs.find("epoch");
+ if (iter != attrs.end())
epoch = iter->second.asUint32();
{
qpid::sys::Mutex::ScopedLock l(lock);
map<string, Agent>::iterator aIter = agents.find(agentName);
if (aIter == agents.end()) {
+ //
+ // This is a new agent. We have no current record of its existence.
+ //
auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this));
for (iter = attrs.begin(); iter != attrs.end(); iter++)
if (iter->first != "epoch")
@@ -345,24 +403,47 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
agent = Agent(impl.release());
agents[agentName] = agent;
+ //
+ // Enqueue a notification of the new agent.
+ //
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD));
eventImpl->setAgent(agent);
enqueueEventLH(ConsoleEvent(eventImpl.release()));
- } else
+ } else {
+ //
+ // This is a refresh of an agent we are already tracking.
+ //
agent = aIter->second;
+ AgentImpl& impl(AgentImplAccess::get(agent));
+ impl.touch();
+ if (impl.getEpoch() != epoch) {
+ //
+ // The agent has restarted since the last time we heard from it.
+ // Enqueue a notification.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ }
+
+ iter = attrs.find("schemaUpdated");
+ if (iter != attrs.end()) {
+ uint64_t ts(iter->second.asUint64());
+ if (ts > impl.getAttribute("schemaUpdated").asUint64()) {
+ //
+ // The agent has added new schema entries since we last heard from it.
+ // Enqueue a notification.
+ //
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE));
+ eventImpl->setAgent(agent);
+ enqueueEventLH(ConsoleEvent(eventImpl.release()));
+ }
+ }
+ }
if (cid == "broker-locate")
connectedBrokerAgent = agent;
}
-
- AgentImplAccess::get(agent).touch();
-
- //
- // Changes we are interested in:
- //
- // agentEpoch - indicates that the agent restarted since we last heard from it
- // schemaUpdated - indicates that the agent has registered new schemata
- //
}
@@ -385,8 +466,27 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds)
lastVisit = seconds;
//
- // TODO: Handle the aging of agent records
+ // Handle the aging of agent records
//
+ if (lastAgePass == 0)
+ lastAgePass = seconds;
+ if (seconds - lastAgePass >= 60) {
+ lastAgePass = seconds;
+ map<string, Agent> toDelete;
+ qpid::sys::Mutex::ScopedLock l(lock);
+
+ for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++)
+ if ((iter->second.getName() != connectedBrokerAgent.getName()) &&
+ (AgentImplAccess::get(iter->second).age() > maxAgentAgeMinutes))
+ toDelete[iter->first] = iter->second;
+
+ for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) {
+ agents.erase(iter->first);
+ auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_AGED));
+ eventImpl->setAgent(iter->second);
+ enqueueEventLH(eventImpl.release());
+ }
+ }
}
diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
index 9a077b0390..705d9de4d8 100644
--- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h
+++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
@@ -28,6 +28,7 @@
#include "qmf/Schema.h"
#include "qmf/ConsoleEventImpl.h"
#include "qmf/SchemaCache.h"
+#include "qmf/Query.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Condition.h"
#include "qpid/sys/Thread.h"
@@ -36,6 +37,7 @@
#include "qpid/messaging/Message.h"
#include "qpid/messaging/Connection.h"
#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Address.h"
#include "qpid/management/Buffer.h"
#include "qpid/types/Variant.h"
@@ -65,13 +67,17 @@ namespace qmf {
qpid::sys::Condition cond;
qpid::messaging::Connection connection;
qpid::messaging::Session session;
+ qpid::messaging::Sender directSender;
+ qpid::messaging::Sender topicSender;
std::string domain;
- qpid::types::Variant::Map agentFilter;
+ uint32_t maxAgentAgeMinutes;
+ Query agentQuery;
bool opened;
std::queue<ConsoleEvent> eventQueue;
qpid::sys::Thread* thread;
bool threadCanceled;
uint64_t lastVisit;
+ uint64_t lastAgePass;
std::map<std::string, Agent> agents;
Agent connectedBrokerAgent;
qpid::messaging::Address replyAddress;
@@ -83,6 +89,7 @@ namespace qmf {
void enqueueEventLH(const ConsoleEvent&);
void dispatch(qpid::messaging::Message);
void sendBrokerLocate();
+ void sendAgentLocate();
void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&);
void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&);
void periodicProcessing(uint64_t);
diff --git a/qpid/cpp/src/qmf/Expression.cpp b/qpid/cpp/src/qmf/Expression.cpp
new file mode 100644
index 0000000000..7d48678c15
--- /dev/null
+++ b/qpid/cpp/src/qmf/Expression.cpp
@@ -0,0 +1,441 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qmf/exceptions.h"
+#include "qmf/Expression.h"
+#include <iostream>
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::types;
+
+Expression::Expression(const Variant::List& expr)
+{
+ static int level(0);
+ level++;
+ Variant::List::const_iterator iter(expr.begin());
+ string op(iter->asString());
+ iter++;
+
+ if (op == "not") logicalOp = LOGICAL_NOT;
+ else if (op == "and") logicalOp = LOGICAL_AND;
+ else if (op == "or") logicalOp = LOGICAL_OR;
+ else {
+ logicalOp = LOGICAL_ID;
+ if (op == "eq") boolOp = BOOL_EQ;
+ else if (op == "ne") boolOp = BOOL_NE;
+ else if (op == "lt") boolOp = BOOL_LT;
+ else if (op == "le") boolOp = BOOL_LE;
+ else if (op == "gt") boolOp = BOOL_GT;
+ else if (op == "ge") boolOp = BOOL_GE;
+ else if (op == "re_match") boolOp = BOOL_RE_MATCH;
+ else if (op == "exists") boolOp = BOOL_EXISTS;
+ else if (op == "true") boolOp = BOOL_TRUE;
+ else if (op == "false") boolOp = BOOL_FALSE;
+ else
+ throw QmfException("Invalid operator in predicate expression");
+ }
+
+ if (logicalOp == LOGICAL_ID) {
+ switch (boolOp) {
+ case BOOL_EQ:
+ case BOOL_NE:
+ case BOOL_LT:
+ case BOOL_LE:
+ case BOOL_GT:
+ case BOOL_GE:
+ case BOOL_RE_MATCH:
+ //
+ // Binary operator: get two operands.
+ //
+ operandCount = 2;
+ break;
+
+ case BOOL_EXISTS:
+ //
+ // Unary operator: get one operand.
+ //
+ operandCount = 1;
+ break;
+
+ case BOOL_TRUE:
+ case BOOL_FALSE:
+ //
+ // Literal operator: no operands.
+ //
+ operandCount = 0;
+ break;
+ }
+
+ for (int idx = 0; idx < operandCount; idx++) {
+ if (iter == expr.end())
+ throw QmfException("Too few operands for operation: " + op);
+ if (iter->getType() == VAR_STRING) {
+ quoted[idx] = false;
+ operands[idx] = *iter;
+ } else if (iter->getType() == VAR_LIST) {
+ const Variant::List& sublist(iter->asList());
+ Variant::List::const_iterator subIter(sublist.begin());
+ if (subIter != sublist.end() && subIter->asString() == "quote") {
+ quoted[idx] = true;
+ subIter++;
+ if (subIter != sublist.end()) {
+ operands[idx] = *subIter;
+ subIter++;
+ if (subIter != sublist.end())
+ throw QmfException("Extra tokens at end of 'quote'");
+ }
+ } else
+ throw QmfException("Expected '[quote, <token>]'");
+ } else
+ throw QmfException("Expected string or list as operand for: " + op);
+ iter++;
+ }
+
+ if (iter != expr.end())
+ throw QmfException("Too many operands for operation: " + op);
+
+ } else {
+ //
+ // This is a logical expression, collect sub-expressions
+ //
+ while (iter != expr.end()) {
+ if (iter->getType() != VAR_LIST)
+ throw QmfException("Operands of " + op + " must be lists");
+ expressionList.push_back(boost::shared_ptr<Expression>(new Expression(iter->asList())));
+ iter++;
+ }
+ }
+ level--;
+}
+
+
+bool Expression::evaluate(const Variant::Map& data) const
+{
+ list<boost::shared_ptr<Expression> >::const_iterator iter;
+
+ switch (logicalOp) {
+ case LOGICAL_ID:
+ return boolEval(data);
+
+ case LOGICAL_NOT:
+ for (iter = expressionList.begin(); iter != expressionList.end(); iter++)
+ if ((*iter)->evaluate(data))
+ return false;
+ return true;
+
+ case LOGICAL_AND:
+ for (iter = expressionList.begin(); iter != expressionList.end(); iter++)
+ if (!(*iter)->evaluate(data))
+ return false;
+ return true;
+
+ case LOGICAL_OR:
+ for (iter = expressionList.begin(); iter != expressionList.end(); iter++)
+ if ((*iter)->evaluate(data))
+ return true;
+ return false;
+ }
+
+ return false;
+}
+
+
+bool Expression::boolEval(const Variant::Map& data) const
+{
+ Variant val[2];
+ bool exists[2];
+
+ for (int idx = 0; idx < operandCount; idx++) {
+ if (quoted[idx]) {
+ exists[idx] = true;
+ val[idx] = operands[idx];
+ } else {
+ Variant::Map::const_iterator mIter(data.find(operands[idx].asString()));
+ if (mIter == data.end()) {
+ exists[idx] = false;
+ } else {
+ exists[idx] = true;
+ val[idx] = mIter->second;
+ }
+ }
+ }
+
+ switch (boolOp) {
+ case BOOL_EQ: return (exists[0] && exists[1] && (val[0].asString() == val[1].asString()));
+ case BOOL_NE: return (exists[0] && exists[1] && (val[0].asString() != val[1].asString()));
+ case BOOL_LT: return (exists[0] && exists[1] && lessThan(val[0], val[1]));
+ case BOOL_LE: return (exists[0] && exists[1] && lessEqual(val[0], val[1]));
+ case BOOL_GT: return (exists[0] && exists[1] && greaterThan(val[0], val[1]));
+ case BOOL_GE: return (exists[0] && exists[1] && greaterEqual(val[0], val[1]));
+ case BOOL_RE_MATCH: return false; // TODO
+ case BOOL_EXISTS: return exists[0];
+ case BOOL_TRUE: return true;
+ case BOOL_FALSE: return false;
+ }
+
+ return false;
+}
+
+bool Expression::lessThan(const Variant& left, const Variant& right) const
+{
+ switch (left.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ return left.asInt64() < right.asInt64();
+ case VAR_STRING:
+ try {
+ return left.asInt64() < right.asInt64();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ switch (right.getType()) {
+ case VAR_FLOAT: case VAR_DOUBLE:
+ return left.asDouble() < right.asDouble();
+ case VAR_STRING:
+ try {
+ return left.asDouble() < right.asDouble();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_STRING:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ try {
+ return left.asInt64() < right.asInt64();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ try {
+ return left.asDouble() < right.asDouble();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_STRING:
+ return left.asString() < right.asString();
+ default:
+ break;
+ }
+ default:
+ break;
+ }
+
+ return false;
+}
+
+
+bool Expression::lessEqual(const Variant& left, const Variant& right) const
+{
+ switch (left.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ return left.asInt64() <= right.asInt64();
+ case VAR_STRING:
+ try {
+ return left.asInt64() <= right.asInt64();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ switch (right.getType()) {
+ case VAR_FLOAT: case VAR_DOUBLE:
+ return left.asDouble() <= right.asDouble();
+ case VAR_STRING:
+ try {
+ return left.asDouble() <= right.asDouble();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_STRING:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ try {
+ return left.asInt64() <= right.asInt64();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ try {
+ return left.asDouble() <= right.asDouble();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_STRING:
+ return left.asString() <= right.asString();
+ default:
+ break;
+ }
+ default:
+ break;
+ }
+
+ return false;
+}
+
+
+bool Expression::greaterThan(const Variant& left, const Variant& right) const
+{
+ switch (left.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ return left.asInt64() > right.asInt64();
+ case VAR_STRING:
+ try {
+ return left.asInt64() > right.asInt64();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ switch (right.getType()) {
+ case VAR_FLOAT: case VAR_DOUBLE:
+ return left.asDouble() > right.asDouble();
+ case VAR_STRING:
+ try {
+ return left.asDouble() > right.asDouble();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_STRING:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ try {
+ return left.asInt64() > right.asInt64();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ try {
+ return left.asDouble() > right.asDouble();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_STRING:
+ return left.asString() > right.asString();
+ default:
+ break;
+ }
+ default:
+ break;
+ }
+
+ return false;
+}
+
+
+bool Expression::greaterEqual(const Variant& left, const Variant& right) const
+{
+ switch (left.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ return left.asInt64() >= right.asInt64();
+ case VAR_STRING:
+ try {
+ return left.asInt64() >= right.asInt64();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ switch (right.getType()) {
+ case VAR_FLOAT: case VAR_DOUBLE:
+ return left.asDouble() >= right.asDouble();
+ case VAR_STRING:
+ try {
+ return left.asDouble() >= right.asDouble();
+ } catch (std::exception&) {}
+ break;
+ default:
+ break;
+ }
+ break;
+
+ case VAR_STRING:
+ switch (right.getType()) {
+ case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64:
+ case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64:
+ try {
+ return left.asInt64() >= right.asInt64();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_FLOAT: case VAR_DOUBLE:
+ try {
+ return left.asDouble() >= right.asDouble();
+ } catch (std::exception&) {}
+ break;
+
+ case VAR_STRING:
+ return left.asString() >= right.asString();
+ default:
+ break;
+ }
+ default:
+ break;
+ }
+
+ return false;
+}
+
+
diff --git a/qpid/cpp/src/qmf/Expression.h b/qpid/cpp/src/qmf/Expression.h
new file mode 100644
index 0000000000..6fbfdbc4ba
--- /dev/null
+++ b/qpid/cpp/src/qmf/Expression.h
@@ -0,0 +1,73 @@
+#ifndef _QMF_EXPRESSION_H_
+#define _QMF_EXPRESSION_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/types/Variant.h"
+#include <string>
+#include <list>
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+
+ enum LogicalOp {
+ LOGICAL_ID = 1,
+ LOGICAL_NOT = 2,
+ LOGICAL_AND = 3,
+ LOGICAL_OR = 4
+ };
+
+ enum BooleanOp {
+ BOOL_EQ = 1,
+ BOOL_NE = 2,
+ BOOL_LT = 3,
+ BOOL_LE = 4,
+ BOOL_GT = 5,
+ BOOL_GE = 6,
+ BOOL_RE_MATCH = 7,
+ BOOL_EXISTS = 8,
+ BOOL_TRUE = 9,
+ BOOL_FALSE = 10
+ };
+
+ class Expression {
+ public:
+ Expression(const qpid::types::Variant::List& expr);
+ bool evaluate(const qpid::types::Variant::Map& data) const;
+ private:
+ LogicalOp logicalOp;
+ BooleanOp boolOp;
+ int operandCount;
+ qpid::types::Variant operands[2];
+ bool quoted[2];
+ std::list<boost::shared_ptr<Expression> > expressionList;
+
+ bool boolEval(const qpid::types::Variant::Map& data) const;
+ bool lessThan(const qpid::types::Variant& left, const qpid::types::Variant& right) const;
+ bool lessEqual(const qpid::types::Variant& left, const qpid::types::Variant& right) const;
+ bool greaterThan(const qpid::types::Variant& left, const qpid::types::Variant& right) const;
+ bool greaterEqual(const qpid::types::Variant& left, const qpid::types::Variant& right) const;
+ };
+
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/Query.cpp b/qpid/cpp/src/qmf/Query.cpp
index 9b67c7d8b3..09cca4d5bb 100644
--- a/qpid/cpp/src/qmf/Query.cpp
+++ b/qpid/cpp/src/qmf/Query.cpp
@@ -19,56 +19,135 @@
*
*/
-#include "qpid/RefCounted.h"
#include "qmf/PrivateImplRef.h"
-#include "qmf/Query.h"
-#include "qmf/DataAddr.h"
-#include "qmf/SchemaId.h"
+#include "qmf/exceptions.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/DataAddrImpl.h"
+#include "qmf/SchemaIdImpl.h"
+#include "qpid/messaging/AddressParser.h"
using namespace std;
+using namespace qmf;
using qpid::types::Variant;
-namespace qmf {
- class QueryImpl : public virtual qpid::RefCounted {
- public:
+typedef PrivateImplRef<Query> PI;
+
+Query::Query(QueryImpl* impl) { PI::ctor(*this, impl); }
+Query::Query(const Query& s) : qmf::Handle<QueryImpl>() { PI::copy(*this, s); }
+Query::~Query() { PI::dtor(*this); }
+Query& Query::operator=(const Query& s) { return PI::assign(*this, s); }
+
+Query::Query(QueryTarget t, const string& pr) { PI::ctor(*this, new QueryImpl(t, pr)); }
+Query::Query(QueryTarget t, const string& c, const string& p, const string& pr) { PI::ctor(*this, new QueryImpl(t, c, p, pr)); }
+Query::Query(QueryTarget t, const SchemaId& s, const string& pr) { PI::ctor(*this, new QueryImpl(t, s, pr)); }
+Query::Query(const DataAddr& a) { PI::ctor(*this, new QueryImpl(a)); }
+
+QueryTarget Query::getTarget() const { return impl->getTarget(); }
+const DataAddr& Query::getDataAddr() const { return impl->getDataAddr(); }
+const SchemaId& Query::getSchemaId() const { return impl->getSchemaId(); }
+void Query::setPredicate(const Variant::List& pr) { impl->setPredicate(pr); }
+const Variant::List& Query::getPredicate() const { return impl->getPredicate(); }
+bool Query::matchesPredicate(const qpid::types::Variant::Map& map) const { return impl->matchesPredicate(map); }
+
+
+QueryImpl::QueryImpl(const Variant::Map& map)
+{
+ Variant::Map::const_iterator iter;
+
+ iter = map.find("_what");
+ if (iter == map.end())
+ throw QmfException("Query missing _what element");
+
+ const string& targetString(iter->second.asString());
+ if (targetString == "OBJECT") target = QUERY_OBJECT;
+ else if (targetString == "OBJECT_ID") target = QUERY_OBJECT_ID;
+ else if (targetString == "SCHEMA") target = QUERY_SCHEMA;
+ else if (targetString == "SCHEMA_ID") target = QUERY_SCHEMA_ID;
+ else
+ throw QmfException("Query with invalid _what value: " + targetString);
+
+ iter = map.find("_object_id");
+ if (iter != map.end()) {
+ auto_ptr<DataAddrImpl> addrImpl(new DataAddrImpl(iter->second.asMap()));
+ dataAddr = DataAddr(addrImpl.release());
+ }
+
+ iter = map.find("_schema_id");
+ if (iter != map.end()) {
+ auto_ptr<SchemaIdImpl> sidImpl(new SchemaIdImpl(iter->second.asMap()));
+ schemaId = SchemaId(sidImpl.release());
+ }
+
+ iter = map.find("_where");
+ if (iter != map.end())
+ predicate = iter->second.asList();
+}
+
+
+Variant::Map QueryImpl::asMap() const
+{
+ Variant::Map map;
+ string targetString;
+
+ switch (target) {
+ case QUERY_OBJECT : targetString = "OBJECT"; break;
+ case QUERY_OBJECT_ID : targetString = "OBJECT_ID"; break;
+ case QUERY_SCHEMA : targetString = "SCHEMA"; break;
+ case QUERY_SCHEMA_ID : targetString = "SCHEMA_ID"; break;
+ }
+
+ map["_what"] = targetString;
+
+ if (dataAddr.isValid())
+ map["_object_id"] = DataAddrImplAccess::get(dataAddr).asMap();
+
+ if (schemaId.isValid())
+ map["_schema_id"] = SchemaIdImplAccess::get(schemaId).asMap();
+
+ if (!predicate.empty())
+ map["_where"] = predicate;
+
+ return map;
+}
+
+
+bool QueryImpl::matchesPredicate(const qpid::types::Variant::Map& data) const
+{
+ if (predicate.empty())
+ return true;
+
+ if (!predicateCompiled) {
+ expression.reset(new Expression(predicate));
+ predicateCompiled = true;
+ }
+
+ return expression->evaluate(data);
+}
+
+
+void QueryImpl::parsePredicate(const string& pred)
+{
+ if (pred.empty())
+ return;
+
+ if (pred[0] == '[') {
//
- // Methods from API handle
+ // Parse this as an AddressParser list.
//
- QueryImpl(const string& c, const string& p, const string&) : packageName(p), className(c) {}
- QueryImpl(const SchemaId& s) : schemaId(s) {}
- QueryImpl(const DataAddr& a) : dataAddr(a) {}
-
- const DataAddr& getDataAddr() const { return dataAddr; }
- const SchemaId& getSchemaId() const { return schemaId; }
- const string& getClassName() const { return className; }
- const string& getPackageName() const { return packageName; }
- void addPredicate(const string& k, const Variant& v) { predicate[k] = v; }
- const Variant::Map& getPredicate() const { return predicate; }
-
- private:
- string packageName;
- string className;
- SchemaId schemaId;
- DataAddr dataAddr;
- Variant::Map predicate;
- };
-
- typedef PrivateImplRef<Query> PI;
-
- Query::Query(QueryImpl* impl) { PI::ctor(*this, impl); }
- Query::Query(const Query& s) : qmf::Handle<QueryImpl>() { PI::copy(*this, s); }
- Query::~Query() { PI::dtor(*this); }
- Query& Query::operator=(const Query& s) { return PI::assign(*this, s); }
-
- Query::Query(const string& c, const string& p, const string& pr) { PI::ctor(*this, new QueryImpl(c, p, pr)); }
- Query::Query(const SchemaId& s) { PI::ctor(*this, new QueryImpl(s)); }
- Query::Query(const DataAddr& a) { PI::ctor(*this, new QueryImpl(a)); }
-
- const DataAddr& Query::getDataAddr() const { return impl->getDataAddr(); }
- const SchemaId& Query::getSchemaId() const { return impl->getSchemaId(); }
- const string& Query::getClassName() const { return impl->getClassName(); }
- const string& Query::getPackageName() const { return impl->getPackageName(); }
- void Query::addPredicate(const string& k, const Variant& v) { impl->addPredicate(k, v); }
- const Variant::Map& Query::getPredicate() const { return impl->getPredicate(); }
+ qpid::messaging::AddressParser parser(pred);
+ parser.parseList(predicate);
+ } else
+ throw QmfException("Invalid predicate format");
+}
+
+
+QueryImpl& QueryImplAccess::get(Query& item)
+{
+ return *item.impl;
}
+
+const QueryImpl& QueryImplAccess::get(const Query& item)
+{
+ return *item.impl;
+}
diff --git a/qpid/cpp/src/qmf/QueryImpl.h b/qpid/cpp/src/qmf/QueryImpl.h
new file mode 100644
index 0000000000..27ec427684
--- /dev/null
+++ b/qpid/cpp/src/qmf/QueryImpl.h
@@ -0,0 +1,77 @@
+#ifndef _QMF_QUERY_IMPL_H_
+#define _QMF_QUERY_IMPL_H_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/Query.h"
+#include "qmf/DataAddr.h"
+#include "qmf/SchemaId.h"
+#include "qmf/Expression.h"
+#include "qpid/types/Variant.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+ class QueryImpl : public virtual qpid::RefCounted {
+ public:
+ //
+ // Public impl-only methods
+ //
+ QueryImpl(const qpid::types::Variant::Map&);
+ qpid::types::Variant::Map asMap() const;
+
+ //
+ // Methods from API handle
+ //
+ QueryImpl(QueryTarget t, const std::string& pr) : target(t), predicateCompiled(false) { parsePredicate(pr); }
+ QueryImpl(QueryTarget t, const std::string& c, const std::string& p, const std::string& pr) :
+ target(t), schemaId(SCHEMA_TYPE_DATA, p, c), predicateCompiled(false) { parsePredicate(pr); }
+ QueryImpl(QueryTarget t, const SchemaId& s, const std::string& pr) :
+ target(t), schemaId(s), predicateCompiled(false) { parsePredicate(pr); }
+ QueryImpl(const DataAddr& a) : target(QUERY_OBJECT), dataAddr(a), predicateCompiled(false) {}
+
+ QueryTarget getTarget() const { return target; }
+ const DataAddr& getDataAddr() const { return dataAddr; }
+ const SchemaId& getSchemaId() const { return schemaId; }
+ void setPredicate(const qpid::types::Variant::List& pr) { predicate = pr; }
+ const qpid::types::Variant::List& getPredicate() const { return predicate; }
+ bool matchesPredicate(const qpid::types::Variant::Map& map) const;
+
+ private:
+ QueryTarget target;
+ SchemaId schemaId;
+ DataAddr dataAddr;
+ qpid::types::Variant::List predicate;
+ mutable bool predicateCompiled;
+ mutable boost::shared_ptr<Expression> expression;
+
+ void parsePredicate(const std::string& s);
+ };
+
+ struct QueryImplAccess
+ {
+ static QueryImpl& get(Query&);
+ static const QueryImpl& get(const Query&);
+ };
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/Schema.cpp b/qpid/cpp/src/qmf/Schema.cpp
index 5676125c22..e003f9d06f 100644
--- a/qpid/cpp/src/qmf/Schema.cpp
+++ b/qpid/cpp/src/qmf/Schema.cpp
@@ -61,8 +61,65 @@ SchemaMethod Schema::getMethod(uint32_t i) const { return impl->getMethod(i); }
// Impl Method Bodies
//========================================================================================
-SchemaImpl::SchemaImpl(const qpid::types::Variant::Map&) : finalized(true)
+SchemaImpl::SchemaImpl(const Variant::Map& map) : finalized(false)
{
+ Variant::Map::const_iterator iter;
+ Variant::List::const_iterator lIter;
+
+ iter = map.find("_schema_id");
+ if (iter == map.end())
+ throw QmfException("Schema map missing _schema_id element");
+ schemaId = SchemaId(new SchemaIdImpl(iter->second.asMap()));
+
+ iter = map.find("_desc");
+ if (iter != map.end())
+ description = iter->second.asString();
+
+ iter = map.find("_default_severity");
+ if (iter != map.end())
+ defaultSeverity = int(iter->second.asUint32());
+
+ iter = map.find("_properties");
+ if (iter != map.end()) {
+ const Variant::List& props(iter->second.asList());
+ for (lIter = props.begin(); lIter != props.end(); lIter++)
+ addProperty(SchemaProperty(new SchemaPropertyImpl(lIter->asMap())));
+ }
+
+ iter = map.find("_methods");
+ if (iter != map.end()) {
+ const Variant::List& meths(iter->second.asList());
+ for (lIter = meths.begin(); lIter != meths.end(); lIter++)
+ addMethod(SchemaMethod(new SchemaMethodImpl(lIter->asMap())));
+ }
+
+ finalized = true;
+}
+
+
+Variant::Map SchemaImpl::asMap() const
+{
+ Variant::Map map;
+ Variant::List propList;
+ Variant::List methList;
+
+ checkNotFinal();
+
+ map["_schema_id"] = SchemaIdImplAccess::get(schemaId).asMap();
+ if (!description.empty())
+ map["_desc"] = description;
+ if (schemaId.getType() == SCHEMA_TYPE_EVENT)
+ map["_default_severity"] = uint32_t(defaultSeverity);
+
+ for (list<SchemaProperty>::const_iterator pIter = properties.begin(); pIter != properties.end(); pIter++)
+ propList.push_back(SchemaPropertyImplAccess::get(*pIter).asMap());
+
+ for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++)
+ methList.push_back(SchemaMethodImplAccess::get(*mIter).asMap());
+
+ map["_properties"] = propList;
+ map["_methods"] = methList;
+ return map;
}
diff --git a/qpid/cpp/src/qmf/SchemaIdImpl.h b/qpid/cpp/src/qmf/SchemaIdImpl.h
index df3cc076b9..ae1a3d8d3b 100644
--- a/qpid/cpp/src/qmf/SchemaIdImpl.h
+++ b/qpid/cpp/src/qmf/SchemaIdImpl.h
@@ -69,6 +69,15 @@ namespace qmf {
return lhs.getHash() < rhs.getHash();
}
};
+
+ struct SchemaIdCompareNoHash {
+ bool operator() (const SchemaId& lhs, const SchemaId& rhs) const
+ {
+ if (lhs.getName() != rhs.getName())
+ return lhs.getName() < rhs.getName();
+ return lhs.getPackageName() < rhs.getPackageName();
+ }
+ };
}
#endif
diff --git a/qpid/cpp/src/qmf/SchemaImpl.h b/qpid/cpp/src/qmf/SchemaImpl.h
index eae3a3c37f..267a5d5138 100644
--- a/qpid/cpp/src/qmf/SchemaImpl.h
+++ b/qpid/cpp/src/qmf/SchemaImpl.h
@@ -43,6 +43,7 @@ namespace qmf {
// Impl-only public methods
//
SchemaImpl(const qpid::types::Variant::Map& m);
+ qpid::types::Variant::Map asMap() const;
SchemaImpl(qpid::management::Buffer& v1Buffer);
std::string asV1Content(uint32_t sequence) const;
diff --git a/qpid/cpp/src/qmf/SchemaMethod.cpp b/qpid/cpp/src/qmf/SchemaMethod.cpp
index 7ee6646ec4..e8dcfd2401 100644
--- a/qpid/cpp/src/qmf/SchemaMethod.cpp
+++ b/qpid/cpp/src/qmf/SchemaMethod.cpp
@@ -68,10 +68,48 @@ SchemaMethodImpl::SchemaMethodImpl(const string& n, const string& options) : nam
}
}
-SchemaMethodImpl::SchemaMethodImpl(const qpid::types::Variant::Map&)
+
+SchemaMethodImpl::SchemaMethodImpl(const qpid::types::Variant::Map& map)
+{
+ Variant::Map::const_iterator iter;
+ Variant::List::const_iterator lIter;
+
+ iter = map.find("_name");
+ if (iter == map.end())
+ throw QmfException("SchemaMethod without a _name element");
+ name = iter->second.asString();
+
+ iter = map.find("_desc");
+ if (iter != map.end())
+ desc = iter->second.asString();
+
+ iter = map.find("_arguments");
+ if (iter != map.end()) {
+ const Variant::List& argList(iter->second.asList());
+ for (lIter = argList.begin(); lIter != argList.end(); lIter++)
+ addArgument(SchemaProperty(new SchemaPropertyImpl(lIter->asMap())));
+ }
+}
+
+
+Variant::Map SchemaMethodImpl::asMap() const
{
+ Variant::Map map;
+ Variant::List argList;
+
+ map["_name"] = name;
+
+ if (!desc.empty())
+ map["_desc"] = desc;
+
+ for (list<SchemaProperty>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++)
+ argList.push_back(SchemaPropertyImplAccess::get(*iter).asMap());
+ map["_arguments"] = argList;
+
+ return map;
}
+
SchemaMethodImpl::SchemaMethodImpl(qpid::management::Buffer& buffer)
{
Variant::Map::const_iterator iter;
diff --git a/qpid/cpp/src/qmf/SchemaMethodImpl.h b/qpid/cpp/src/qmf/SchemaMethodImpl.h
index 4b0ff9134d..930d48509c 100644
--- a/qpid/cpp/src/qmf/SchemaMethodImpl.h
+++ b/qpid/cpp/src/qmf/SchemaMethodImpl.h
@@ -43,6 +43,7 @@ namespace qmf {
//
SchemaMethodImpl(const qpid::types::Variant::Map& m);
SchemaMethodImpl(qpid::management::Buffer& v1Buffer);
+ qpid::types::Variant::Map asMap() const;
void updateHash(Hash&) const;
void encodeV1(qpid::management::Buffer&) const;
diff --git a/qpid/cpp/src/qmf/SchemaProperty.cpp b/qpid/cpp/src/qmf/SchemaProperty.cpp
index 244115b8a9..106127261b 100644
--- a/qpid/cpp/src/qmf/SchemaProperty.cpp
+++ b/qpid/cpp/src/qmf/SchemaProperty.cpp
@@ -51,6 +51,7 @@ void SchemaProperty::setSubtype(const string& s) { impl->setSubtype(s); }
void SchemaProperty::setDirection(int d) { impl->setDirection(d); }
const string& SchemaProperty::getName() const { return impl->getName(); }
+int SchemaProperty::getType() const { return impl->getType(); }
int SchemaProperty::getAccess() const { return impl->getAccess(); }
bool SchemaProperty::isIndex() const { return impl->isIndex(); }
bool SchemaProperty::isOptional() const { return impl->isOptional(); }
@@ -132,9 +133,115 @@ SchemaPropertyImpl::SchemaPropertyImpl(const string& n, int t, const string opti
}
-SchemaPropertyImpl::SchemaPropertyImpl(const Variant::Map&) :
+SchemaPropertyImpl::SchemaPropertyImpl(const Variant::Map& map) :
access(ACCESS_READ_ONLY), index(false), optional(false), direction(DIR_IN)
{
+ Variant::Map::const_iterator iter;
+
+ iter = map.find("_name");
+ if (iter == map.end())
+ throw QmfException("SchemaProperty without a _name element");
+ name = iter->second.asString();
+
+ iter = map.find("_type");
+ if (iter == map.end())
+ throw QmfException("SchemaProperty without a _type element");
+ const string& ts(iter->second.asString());
+ if (ts == "TYPE_VOID") dataType = SCHEMA_DATA_VOID;
+ else if (ts == "TYPE_BOOL") dataType = SCHEMA_DATA_BOOL;
+ else if (ts == "TYPE_INT") dataType = SCHEMA_DATA_INT;
+ else if (ts == "TYPE_FLOAT") dataType = SCHEMA_DATA_FLOAT;
+ else if (ts == "TYPE_STRING") dataType = SCHEMA_DATA_STRING;
+ else if (ts == "TYPE_MAP") dataType = SCHEMA_DATA_MAP;
+ else if (ts == "TYPE_LIST") dataType = SCHEMA_DATA_LIST;
+ else if (ts == "TYPE_UUID") dataType = SCHEMA_DATA_UUID;
+ else
+ throw QmfException("SchemaProperty with an invalid type code: " + ts);
+
+ iter = map.find("_access");
+ if (iter != map.end()) {
+ const string& as(iter->second.asString());
+ if (as == "RO") access = ACCESS_READ_ONLY;
+ else if (as == "RC") access = ACCESS_READ_CREATE;
+ else if (as == "RW") access = ACCESS_READ_WRITE;
+ else
+ throw QmfException("SchemaProperty with an invalid access code: " + as);
+ }
+
+ iter = map.find("_unit");
+ if (iter != map.end())
+ unit = iter->second.asString();
+
+ iter = map.find("_dir");
+ if (iter != map.end()) {
+ const string& ds(iter->second.asString());
+ if (ds == "I") direction = DIR_IN;
+ else if (ds == "O") direction = DIR_OUT;
+ else if (ds == "IO") direction = DIR_IN_OUT;
+ else
+ throw QmfException("SchemaProperty with an invalid direction code: " + ds);
+ }
+
+ iter = map.find("_desc");
+ if (iter != map.end())
+ desc = iter->second.asString();
+
+ iter = map.find("_index");
+ if (iter != map.end())
+ index = iter->second.asBool();
+
+ iter = map.find("_subtype");
+ if (iter != map.end())
+ subtype = iter->second.asString();
+}
+
+
+Variant::Map SchemaPropertyImpl::asMap() const
+{
+ Variant::Map map;
+ string ts;
+
+ map["_name"] = name;
+
+ switch (dataType) {
+ case SCHEMA_DATA_VOID: ts = "TYPE_VOID"; break;
+ case SCHEMA_DATA_BOOL: ts = "TYPE_BOOL"; break;
+ case SCHEMA_DATA_INT: ts = "TYPE_INT"; break;
+ case SCHEMA_DATA_FLOAT: ts = "TYPE_FLOAT"; break;
+ case SCHEMA_DATA_STRING: ts = "TYPE_STRING"; break;
+ case SCHEMA_DATA_MAP: ts = "TYPE_MAP"; break;
+ case SCHEMA_DATA_LIST: ts = "TYPE_LIST"; break;
+ case SCHEMA_DATA_UUID: ts = "TYPE_UUID"; break;
+ }
+ map["_type"] = ts;
+
+ switch (access) {
+ case ACCESS_READ_ONLY: ts = "RO"; break;
+ case ACCESS_READ_CREATE: ts = "RC"; break;
+ case ACCESS_READ_WRITE: ts = "RW"; break;
+ }
+ map["_access"] = ts;
+
+ if (!unit.empty())
+ map["_unit"] = unit;
+
+ switch (direction) {
+ case DIR_IN: ts = "I"; break;
+ case DIR_OUT: ts = "O"; break;
+ case DIR_IN_OUT: ts = "IO"; break;
+ }
+ map["_dir"] = ts;
+
+ if (!desc.empty())
+ map["_desc"] = desc;
+
+ if (index)
+ map["_index"] = true;
+
+ if (!subtype.empty())
+ map["_subtype"] = subtype;
+
+ return map;
}
diff --git a/qpid/cpp/src/qmf/SchemaPropertyImpl.h b/qpid/cpp/src/qmf/SchemaPropertyImpl.h
index 94994c722d..cdfc29066f 100644
--- a/qpid/cpp/src/qmf/SchemaPropertyImpl.h
+++ b/qpid/cpp/src/qmf/SchemaPropertyImpl.h
@@ -42,6 +42,7 @@ namespace qmf {
//
SchemaPropertyImpl(const qpid::types::Variant::Map& m);
SchemaPropertyImpl(qpid::management::Buffer& v1Buffer);
+ qpid::types::Variant::Map asMap() const;
void updateHash(Hash&) const;
void encodeV1(qpid::management::Buffer&, bool isArg, bool isMethodArg) const;
@@ -58,6 +59,7 @@ namespace qmf {
void setDirection(int d) { direction = d; }
const std::string& getName() const { return name; }
+ int getType() const { return dataType; }
int getAccess() const { return access; }
bool isIndex() const { return index; }
bool isOptional() const { return optional; }
@@ -65,6 +67,7 @@ namespace qmf {
const std::string& getDesc() const { return desc; }
const std::string& getSubtype() const { return subtype; }
int getDirection() const { return direction; }
+
private:
std::string name;
int dataType;
diff --git a/qpid/cpp/src/qmf/agentCapability.h b/qpid/cpp/src/qmf/agentCapability.h
new file mode 100644
index 0000000000..6a3f6f8534
--- /dev/null
+++ b/qpid/cpp/src/qmf/agentCapability.h
@@ -0,0 +1,39 @@
+#ifndef QMF_AGENT_CAPABILITY_H
+#define QMF_AGENT_CAPABILITY_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qmf {
+
+ /**
+ * Legacy (Qpid 0.7 C++ Agent, 0.7 Broker Agent) capabilities
+ */
+ const uint32_t AGENT_CAPABILITY_LEGACY = 0;
+
+ /**
+ * Qpid 0.8 QMFv2 capabilities
+ */
+ const uint32_t AGENT_CAPABILITY_0_8 = 1;
+ const uint32_t AGENT_CAPABILITY_V2_SCHEMA = 1;
+ const uint32_t AGENT_CAPABILITY_AGENT_PREDICATE = 1;
+}
+
+#endif
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 0f44089005..0e61a40c29 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -30,6 +30,7 @@ lib_messaging = $(abs_builddir)/../libqpidmessaging.la
lib_common = $(abs_builddir)/../libqpidcommon.la
lib_broker = $(abs_builddir)/../libqpidbroker.la
lib_console = $(abs_builddir)/../libqmfconsole.la
+lib_qmf2 = $(abs_builddir)/../libqmf2.la
# lib_amqp_0_10 = $(abs_builddir)/../libqpidamqp_0_10.la
#
@@ -66,7 +67,7 @@ tmodule_LTLIBRARIES=
TESTS+=unit_test
check_PROGRAMS+=unit_test
unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \
- $(lib_messaging) $(lib_broker) $(lib_console)
+ $(lib_messaging) $(lib_broker) $(lib_console) $(lib_qmf2)
unit_test_SOURCES= unit_test.cpp unit_test.h \
MessagingSessionTests.cpp \
@@ -121,7 +122,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
PollableCondition.cpp \
Variant.cpp \
Address.cpp \
- ClientMessage.cpp
+ ClientMessage.cpp \
+ Qmf2.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
diff --git a/qpid/cpp/src/tests/Qmf2.cpp b/qpid/cpp/src/tests/Qmf2.cpp
new file mode 100644
index 0000000000..e56a35db5c
--- /dev/null
+++ b/qpid/cpp/src/tests/Qmf2.cpp
@@ -0,0 +1,320 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/types/Variant.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/exceptions.h"
+
+#include "unit_test.h"
+
+using namespace qpid::types;
+using namespace qmf;
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(Qmf2Suite)
+
+QPID_AUTO_TEST_CASE(testQuery)
+{
+ Query query(QUERY_OBJECT, "class_name", "package_name", "[and, [eq, name, [quote, smith]], [lt, age, [quote, 27]]]");
+ Query newQuery(new QueryImpl(QueryImplAccess::get(query).asMap()));
+
+ BOOST_CHECK_EQUAL(newQuery.getTarget(), QUERY_OBJECT);
+ BOOST_CHECK_EQUAL(newQuery.getSchemaId().getName(), "class_name");
+ BOOST_CHECK_EQUAL(newQuery.getSchemaId().getPackageName(), "package_name");
+
+ Variant::List pred(newQuery.getPredicate());
+ BOOST_CHECK_EQUAL(pred.size(), 3);
+
+ Variant::List::iterator iter(pred.begin());
+ BOOST_CHECK_EQUAL(iter->asString(), "and");
+ iter++;
+ BOOST_CHECK_EQUAL(iter->getType(), VAR_LIST);
+ iter++;
+ BOOST_CHECK_EQUAL(iter->getType(), VAR_LIST);
+ iter = iter->asList().begin();
+ BOOST_CHECK_EQUAL(iter->asString(), "lt");
+ iter++;
+ BOOST_CHECK_EQUAL(iter->asString(), "age");
+ iter++;
+ BOOST_CHECK_EQUAL(iter->getType(), VAR_LIST);
+ iter = iter->asList().begin();
+ BOOST_CHECK_EQUAL(iter->asString(), "quote");
+ iter++;
+ BOOST_CHECK_EQUAL(iter->asUint32(), 27);
+
+ Query query2(QUERY_OBJECT_ID);
+ Query newQuery2(new QueryImpl(QueryImplAccess::get(query2).asMap()));
+ BOOST_CHECK_EQUAL(newQuery2.getTarget(), QUERY_OBJECT_ID);
+
+ Query query3(QUERY_SCHEMA);
+ Query newQuery3(new QueryImpl(QueryImplAccess::get(query3).asMap()));
+ BOOST_CHECK_EQUAL(newQuery3.getTarget(), QUERY_SCHEMA);
+
+ Query query4(QUERY_SCHEMA_ID);
+ Query newQuery4(new QueryImpl(QueryImplAccess::get(query4).asMap()));
+ BOOST_CHECK_EQUAL(newQuery4.getTarget(), QUERY_SCHEMA_ID);
+
+ DataAddr addr("name", "agent_name", 34);
+ Query query5(addr);
+ Query newQuery5(new QueryImpl(QueryImplAccess::get(query5).asMap()));
+ BOOST_CHECK_EQUAL(newQuery5.getTarget(), QUERY_OBJECT);
+ BOOST_CHECK_EQUAL(newQuery5.getDataAddr().getName(), "name");
+ BOOST_CHECK_EQUAL(newQuery5.getDataAddr().getAgentName(), "agent_name");
+ BOOST_CHECK_EQUAL(newQuery5.getDataAddr().getAgentEpoch(), 34);
+}
+
+QPID_AUTO_TEST_CASE(testQueryPredicateErrors)
+{
+ Query query;
+ Variant::Map map;
+
+ BOOST_CHECK_THROW(Query(QUERY_OBJECT, "INVALID"), QmfException);
+ query = Query(QUERY_OBJECT, "[unknown, one, two]");
+ BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException);
+
+ query = Query(QUERY_OBJECT, "[eq, first]");
+ BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException);
+
+ query = Query(QUERY_OBJECT, "[exists]");
+ BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException);
+
+ query = Query(QUERY_OBJECT, "[eq, first, [quote, 1, 2, 3]]]");
+ BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException);
+
+ query = Query(QUERY_OBJECT, "[eq, first, [unexpected, 3]]]");
+ BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException);
+
+ query = Query(QUERY_OBJECT, "[eq, first, {}]]");
+ BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException);
+
+ query = Query(QUERY_OBJECT, "[eq, first, second, third]");
+ BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException);
+
+ query = Query(QUERY_OBJECT, "[and, first, second, third]");
+ BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException);
+}
+
+QPID_AUTO_TEST_CASE(testQueryPredicate)
+{
+ Query query;
+ Variant::Map map;
+
+ map["forty"] = 40;
+ map["fifty"] = 50;
+ map["minus_ten"] = -10;
+ map["pos_float"] = 100.05;
+ map["neg_float"] = -1000.33;
+ map["name"] = "jones";
+ map["bool_t"] = true;
+ map["bool_f"] = false;
+
+ BOOST_CHECK_THROW(Query(QUERY_OBJECT, "INVALID"), QmfException);
+
+ query = Query(QUERY_OBJECT);
+ BOOST_CHECK_EQUAL(query.matchesPredicate(Variant::Map()), true);
+
+ query = Query(QUERY_OBJECT, "[eq, forty, [quote, 40]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[eq, forty, [quote, 41]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), false);
+
+ query = Query(QUERY_OBJECT, "[le, forty, fifty]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[and, [eq, forty, [quote, 40]], [eq, name, [quote, jones]]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[and, [eq, forty, [quote, 40]], [eq, name, [quote, smith]]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), false);
+
+ query = Query(QUERY_OBJECT, "[or, [eq, forty, [quote, 40]], [eq, name, [quote, smith]]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[or, [eq, forty, [quote, 41]], [eq, name, [quote, smith]]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), false);
+
+ query = Query(QUERY_OBJECT, "[not, [le, forty, [quote, 40]]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), false);
+
+ query = Query(QUERY_OBJECT, "[le, forty, [quote, 40]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[ge, forty, [quote, 40]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[lt, forty, [quote, 45]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[lt, [quote, 45], forty]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), false);
+
+ query = Query(QUERY_OBJECT, "[gt, forty, [quote, 45]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), false);
+
+ query = Query(QUERY_OBJECT, "[gt, [quote, 45], forty]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[eq, bool_t, [quote, True]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[eq, bool_t, [quote, False]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), false);
+
+ query = Query(QUERY_OBJECT, "[eq, bool_f, [quote, True]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), false);
+
+ query = Query(QUERY_OBJECT, "[eq, bool_f, [quote, False]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[eq, minus_ten, [quote, -10]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[lt, minus_ten, [quote, -20]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), false);
+
+ query = Query(QUERY_OBJECT, "[lt, [quote, -20], minus_ten]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[exists, name]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+
+ query = Query(QUERY_OBJECT, "[exists, nonexfield]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), false);
+
+ query = Query(QUERY_OBJECT, "[eq, pos_float, [quote, 100.05]]");
+ BOOST_CHECK_EQUAL(query.matchesPredicate(map), true);
+}
+
+QPID_AUTO_TEST_CASE(testSchema)
+{
+ Schema in(SCHEMA_TYPE_DATA, "package", "class");
+ in.addProperty(SchemaProperty("prop1", SCHEMA_DATA_BOOL, "{desc:'Property One'}"));
+ in.addProperty(SchemaProperty("prop2", SCHEMA_DATA_INT, "{desc:'Property Two',unit:'Furlong'}"));
+ in.addProperty(SchemaProperty("prop3", SCHEMA_DATA_STRING, "{desc:'Property Three'}"));
+
+ SchemaMethod method1("method1", "{desc:'Method One'}");
+ method1.addArgument(SchemaProperty("arg1", SCHEMA_DATA_BOOL, "{desc:'Argument One',dir:IN}"));
+ method1.addArgument(SchemaProperty("arg2", SCHEMA_DATA_INT, "{desc:'Argument Two',dir:OUT}"));
+ method1.addArgument(SchemaProperty("arg3", SCHEMA_DATA_FLOAT, "{desc:'Argument Three',dir:INOUT}"));
+ in.addMethod(method1);
+
+ SchemaMethod method2("method2", "{desc:'Method Two'}");
+ method2.addArgument(SchemaProperty("arg21", SCHEMA_DATA_BOOL, "{desc:'Argument One',dir:IN}"));
+ method2.addArgument(SchemaProperty("arg22", SCHEMA_DATA_INT, "{desc:'Argument Two',dir:OUT}"));
+ method2.addArgument(SchemaProperty("arg23", SCHEMA_DATA_FLOAT, "{desc:'Argument Three',dir:INOUT}"));
+ in.addMethod(method2);
+
+ BOOST_CHECK(!in.isFinalized());
+ in.finalize();
+ BOOST_CHECK(in.isFinalized());
+
+ Variant::Map map(SchemaImplAccess::get(in).asMap());
+ Schema out(new SchemaImpl(map));
+
+ BOOST_CHECK(out.isFinalized());
+ BOOST_CHECK_EQUAL(out.getSchemaId().getType(), SCHEMA_TYPE_DATA);
+ BOOST_CHECK_EQUAL(out.getSchemaId().getPackageName(), "package");
+ BOOST_CHECK_EQUAL(out.getSchemaId().getName(), "class");
+ BOOST_CHECK_EQUAL(out.getSchemaId().getHash(), in.getSchemaId().getHash());
+
+ BOOST_CHECK_EQUAL(out.getPropertyCount(), 3);
+ SchemaProperty prop;
+
+ prop = out.getProperty(0);
+ BOOST_CHECK_EQUAL(prop.getName(), "prop1");
+ BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_BOOL);
+ BOOST_CHECK_EQUAL(prop.getDesc(), "Property One");
+
+ prop = out.getProperty(1);
+ BOOST_CHECK_EQUAL(prop.getName(), "prop2");
+ BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_INT);
+ BOOST_CHECK_EQUAL(prop.getDesc(), "Property Two");
+ BOOST_CHECK_EQUAL(prop.getUnit(), "Furlong");
+ BOOST_CHECK(!prop.isIndex());
+
+ prop = out.getProperty(2);
+ BOOST_CHECK_EQUAL(prop.getName(), "prop3");
+ BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_STRING);
+ BOOST_CHECK_EQUAL(prop.getDesc(), "Property Three");
+
+ BOOST_CHECK_THROW(out.getProperty(3), QmfException);
+
+ BOOST_CHECK_EQUAL(out.getMethodCount(), 2);
+ SchemaMethod method;
+
+ method = out.getMethod(0);
+ BOOST_CHECK_EQUAL(method.getName(), "method1");
+ BOOST_CHECK_EQUAL(method.getDesc(), "Method One");
+ BOOST_CHECK_EQUAL(method.getArgumentCount(), 3);
+
+ prop = method.getArgument(0);
+ BOOST_CHECK_EQUAL(prop.getName(), "arg1");
+ BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_BOOL);
+ BOOST_CHECK_EQUAL(prop.getDesc(), "Argument One");
+ BOOST_CHECK_EQUAL(prop.getDirection(), DIR_IN);
+
+ prop = method.getArgument(1);
+ BOOST_CHECK_EQUAL(prop.getName(), "arg2");
+ BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_INT);
+ BOOST_CHECK_EQUAL(prop.getDesc(), "Argument Two");
+ BOOST_CHECK_EQUAL(prop.getDirection(), DIR_OUT);
+
+ prop = method.getArgument(2);
+ BOOST_CHECK_EQUAL(prop.getName(), "arg3");
+ BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_FLOAT);
+ BOOST_CHECK_EQUAL(prop.getDesc(), "Argument Three");
+ BOOST_CHECK_EQUAL(prop.getDirection(), DIR_IN_OUT);
+
+ BOOST_CHECK_THROW(method.getArgument(3), QmfException);
+
+ method = out.getMethod(1);
+ BOOST_CHECK_EQUAL(method.getName(), "method2");
+ BOOST_CHECK_EQUAL(method.getDesc(), "Method Two");
+ BOOST_CHECK_EQUAL(method.getArgumentCount(), 3);
+
+ prop = method.getArgument(0);
+ BOOST_CHECK_EQUAL(prop.getName(), "arg21");
+ BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_BOOL);
+ BOOST_CHECK_EQUAL(prop.getDesc(), "Argument One");
+ BOOST_CHECK_EQUAL(prop.getDirection(), DIR_IN);
+
+ prop = method.getArgument(1);
+ BOOST_CHECK_EQUAL(prop.getName(), "arg22");
+ BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_INT);
+ BOOST_CHECK_EQUAL(prop.getDesc(), "Argument Two");
+ BOOST_CHECK_EQUAL(prop.getDirection(), DIR_OUT);
+
+ prop = method.getArgument(2);
+ BOOST_CHECK_EQUAL(prop.getName(), "arg23");
+ BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_FLOAT);
+ BOOST_CHECK_EQUAL(prop.getDesc(), "Argument Three");
+ BOOST_CHECK_EQUAL(prop.getDirection(), DIR_IN_OUT);
+
+ BOOST_CHECK_THROW(method.getArgument(3), QmfException);
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests