diff options
author | Ted Ross <tross@apache.org> | 2010-09-21 21:48:41 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-09-21 21:48:41 +0000 |
commit | d47927b3e150057f6d615a0d00c8eff6c83320ac (patch) | |
tree | 6cf1da8bd7a46fd3cef8251af94f88bbad0e627d | |
parent | 81414cc0fb52efbd77e3e3bc83ed0c5dcb7fe83a (diff) | |
download | qpid-python-d47927b3e150057f6d615a0d00c8eff6c83320ac.tar.gz |
QMFv2 Additions:
- QMFv2 schema encoding completed
- Schema queries handled by the agent and initiated by the console by user request
- Full query support with predicates evaluated on the agent (regex not yet implemented)
- Agent filtering in the console
- Agent aging in the console
- Unit tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@999662 13f79535-47bb-0310-9956-ffa450edef68
29 files changed, 1806 insertions, 300 deletions
diff --git a/qpid/cpp/bindings/qmf2/python/python.i b/qpid/cpp/bindings/qmf2/python/python.i index 7d83465bb3..02dd1632b0 100644 --- a/qpid/cpp/bindings/qmf2/python/python.i +++ b/qpid/cpp/bindings/qmf2/python/python.i @@ -23,13 +23,16 @@ /* Define the general-purpose exception handling */ %exception { + std::string error; + Py_BEGIN_ALLOW_THREADS; try { - Py_BEGIN_ALLOW_THREADS $action - Py_END_ALLOW_THREADS + } catch (qpid::types::Exception& ex) { + error = ex.what(); } - catch (qpid::types::Exception& ex) { - PyErr_SetString(PyExc_RuntimeError, ex.what()); + Py_END_ALLOW_THREADS; + if (!error.empty()) { + PyErr_SetString(PyExc_RuntimeError, error.c_str()); return NULL; } } diff --git a/qpid/cpp/bindings/qmf2/python/qmf2.py b/qpid/cpp/bindings/qmf2/python/qmf2.py index f3ece32866..285b47ebbe 100644 --- a/qpid/cpp/bindings/qmf2/python/qmf2.py +++ b/qpid/cpp/bindings/qmf2/python/qmf2.py @@ -54,6 +54,11 @@ SEV_NOTICE = cqmf2.SEV_NOTICE SEV_INFORM = cqmf2.SEV_INFORM SEV_DEBUG = cqmf2.SEV_DEBUG +QUERY_OBJECT = cqmf2.QUERY_OBJECT +QUERY_OBJECT_ID = cqmf2.QUERY_OBJECT_ID +QUERY_SCHEMA = cqmf2.QUERY_SCHEMA +QUERY_SCHEMA_ID = cqmf2.QUERY_SCHEMA_ID + #=================================================================================================== # EXCEPTIONS @@ -153,7 +158,7 @@ class ConsoleSession(object): def setAgentFilter(self, filt): """ """ - self.setAgentFilter(filt) + self._impl.setAgentFilter(filt) def open(self): """ @@ -321,6 +326,12 @@ class Agent(object): dataList.append(Data(result.getData(i))) return dataList + def loadSchemaInfo(self, timeout=30): + """ + """ + dur = cqpid.Duration(cqpid.Duration.SECOND.getMilliseconds() * timeout) + self._impl.querySchema(dur) + def getPackages(self): """ """ @@ -457,7 +468,7 @@ class Data(object): result = agent.callMethod(name, argmap, addr, dur) ## - ## If the agent sent an exception, raise it in a QmfAgentExeption. + ## If the agent sent an exception, raise it in a QmfAgentException. ## if result.getType() == cqmf2.CONSOLE_EXCEPTION: exdata = result.getData(0) diff --git a/qpid/cpp/include/qmf/Agent.h b/qpid/cpp/include/qmf/Agent.h index 8ddea0fae9..8d427ab2fb 100644 --- a/qpid/cpp/include/qmf/Agent.h +++ b/qpid/cpp/include/qmf/Agent.h @@ -65,10 +65,52 @@ namespace qmf { qpid::messaging::Duration timeout=qpid::messaging::Duration::MINUTE); QMF_EXTERN uint32_t callMethodAsync(const std::string&, const qpid::types::Variant::Map&, const DataAddr&); + /** + * Query the agent for a list of schema classes that it exposes. This operation comes in both + * synchronous (blocking) and asynchronous flavors. + * + * This method will typically be used after receiving an AGENT_SCHEMA_UPDATE event from the console session. + * It may also be used on a newly discovered agent to learn what schemata are exposed. + * + * querySchema returns a ConsoleEvent that contains a list of SchemaId objects exposed by the agent. + * This list is cached locally and can be locally queried using getPackage[Count] and getSchemaId[Count]. + */ + QMF_EXTERN ConsoleEvent querySchema(qpid::messaging::Duration timeout=qpid::messaging::Duration::MINUTE); + QMF_EXTERN uint32_t querySchemaAsync(); + + /** + * Get the list of schema packages exposed by the agent. + * + * getPackageCount returns the number of packages exposed. + * getPackage returns the name of the package by index (0..package-count) + * + * Note that both of these calls are synchronous and non-blocking. They only return locally cached data + * and will not send any messages to the remote agent. Use querySchema[Async] to get the latest schema + * information from the remote agent. + */ QMF_EXTERN uint32_t getPackageCount() const; QMF_EXTERN const std::string& getPackage(uint32_t) const; + + /** + * Get the list of schema identifiers for a particular package. + * + * getSchemaIdCount returns the number of IDs in the indicates package. + * getSchemaId returns the SchemaId by index (0..schema-id-count) + * + * Note that both of these calls are synchronous and non-blocking. They only return locally cached data + * and will not send any messages to the remote agent. Use querySchema[Async] to get the latest schema + * information from the remote agent. + */ QMF_EXTERN uint32_t getSchemaIdCount(const std::string&) const; QMF_EXTERN SchemaId getSchemaId(const std::string&, uint32_t) const; + + /** + * Get detailed schema information for a specified schema ID. + * + * This call will return cached information if it is available. If not, it will send a query message to the + * remote agent and block waiting for a response. The timeout argument specifies the maximum time to wait + * for a response from the agent. + */ QMF_EXTERN Schema getSchema(const SchemaId&, qpid::messaging::Duration timeout=qpid::messaging::Duration::MINUTE); diff --git a/qpid/cpp/include/qmf/ConsoleEvent.h b/qpid/cpp/include/qmf/ConsoleEvent.h index 61f625b137..3e75631a61 100644 --- a/qpid/cpp/include/qmf/ConsoleEvent.h +++ b/qpid/cpp/include/qmf/ConsoleEvent.h @@ -23,6 +23,9 @@ #include <qmf/ImportExport.h> #include "qmf/Handle.h" +#include "qmf/Agent.h" +#include "qmf/Data.h" +#include "qmf/SchemaId.h" #include "qpid/types/Variant.h" namespace qmf { @@ -32,18 +35,24 @@ namespace qmf { #endif class ConsoleEventImpl; - class Agent; - class Data; enum ConsoleEventCode { - CONSOLE_AGENT_ADD = 1, - CONSOLE_AGENT_AGE = 2, - CONSOLE_EVENT = 3, - CONSOLE_QUERY_RESPONSE = 4, - CONSOLE_METHOD_RESPONSE = 5, - CONSOLE_EXCEPTION = 6, - CONSOLE_SUBSCRIBE_UPDATE = 7, - CONSOLE_THREAD_FAILED = 8 + CONSOLE_AGENT_ADD = 1, + CONSOLE_AGENT_DEL = 2, + CONSOLE_AGENT_RESTART = 3, + CONSOLE_AGENT_SCHEMA_UPDATE = 4, + CONSOLE_AGENT_SCHEMA_RESPONSE = 5, + CONSOLE_EVENT = 6, + CONSOLE_QUERY_RESPONSE = 7, + CONSOLE_METHOD_RESPONSE = 8, + CONSOLE_EXCEPTION = 9, + CONSOLE_SUBSCRIBE_UPDATE = 10, + CONSOLE_THREAD_FAILED = 11 + }; + + enum AgentDelReason { + AGENT_DEL_AGED = 1, + AGENT_DEL_FILTER = 2 }; class ConsoleEvent : public qmf::Handle<ConsoleEventImpl> { @@ -56,6 +65,9 @@ namespace qmf { QMF_EXTERN ConsoleEventCode getType() const; QMF_EXTERN uint32_t getCorrelator() const; QMF_EXTERN Agent getAgent() const; + QMF_EXTERN AgentDelReason getAgentDelReason() const; + QMF_EXTERN uint32_t getSchemaIdCount() const; + QMF_EXTERN SchemaId getSchemaId(uint32_t) const; QMF_EXTERN uint32_t getDataCount() const; QMF_EXTERN Data getData(uint32_t) const; QMF_EXTERN bool isFinal() const; diff --git a/qpid/cpp/include/qmf/ConsoleSession.h b/qpid/cpp/include/qmf/ConsoleSession.h index 0e58a647d6..c17f4510f1 100644 --- a/qpid/cpp/include/qmf/ConsoleSession.h +++ b/qpid/cpp/include/qmf/ConsoleSession.h @@ -44,6 +44,19 @@ namespace qmf { QMF_EXTERN ConsoleSession& operator=(const ConsoleSession&); QMF_EXTERN ~ConsoleSession(); + /** + * ConsoleSession + * A session that runs over an AMQP connection for QMF console operation. + * + * @param connection - An opened qpid::messaging::Connection + * @param options - An optional string containing options + * + * The options string is of the form "{key:value,key:value}". The following keys are supported: + * + * domain:NAME - QMF Domain to join [default: "default"] + * max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from + * an agent before deleting it [default: 5] + */ QMF_EXTERN ConsoleSession(qpid::messaging::Connection&, const std::string& options=""); QMF_EXTERN void setDomain(const std::string&); QMF_EXTERN void setAgentFilter(const std::string&); diff --git a/qpid/cpp/include/qmf/Query.h b/qpid/cpp/include/qmf/Query.h index 71ef31b104..fec4660bd7 100644 --- a/qpid/cpp/include/qmf/Query.h +++ b/qpid/cpp/include/qmf/Query.h @@ -36,6 +36,13 @@ namespace qmf { class SchemaId; class DataAddr; + enum QueryTarget { + QUERY_OBJECT = 1, + QUERY_OBJECT_ID = 2, + QUERY_SCHEMA = 3, + QUERY_SCHEMA_ID = 4 + }; + class Query : public qmf::Handle<QueryImpl> { public: QMF_EXTERN Query(QueryImpl* impl = 0); @@ -43,20 +50,22 @@ namespace qmf { QMF_EXTERN Query& operator=(const Query&); QMF_EXTERN ~Query(); - QMF_EXTERN Query(const std::string& className, const std::string& package="", const std::string& predicate=""); - QMF_EXTERN Query(const SchemaId&); + QMF_EXTERN Query(QueryTarget, const std::string& predicate=""); + QMF_EXTERN Query(QueryTarget, const std::string& className, const std::string& package, const std::string& predicate=""); + QMF_EXTERN Query(QueryTarget, const SchemaId&, const std::string& predicate=""); QMF_EXTERN Query(const DataAddr&); + QMF_EXTERN QueryTarget getTarget() const; QMF_EXTERN const DataAddr& getDataAddr() const; QMF_EXTERN const SchemaId& getSchemaId() const; - QMF_EXTERN const std::string& getClassName() const; - QMF_EXTERN const std::string& getPackageName() const; - QMF_EXTERN void addPredicate(const std::string&, const qpid::types::Variant&); - QMF_EXTERN const qpid::types::Variant::Map& getPredicate() const; + QMF_EXTERN void setPredicate(const qpid::types::Variant::List&); + QMF_EXTERN const qpid::types::Variant::List& getPredicate() const; + QMF_EXTERN bool matchesPredicate(const qpid::types::Variant::Map& map) const; #ifndef SWIG private: friend class qmf::PrivateImplRef<Query>; + friend class QueryImplAccess; #endif }; diff --git a/qpid/cpp/include/qmf/SchemaProperty.h b/qpid/cpp/include/qmf/SchemaProperty.h index 2e770c2ef1..a3a328b60b 100644 --- a/qpid/cpp/include/qmf/SchemaProperty.h +++ b/qpid/cpp/include/qmf/SchemaProperty.h @@ -54,6 +54,7 @@ namespace qmf { QMF_EXTERN void setDirection(int); QMF_EXTERN const std::string& getName() const; + QMF_EXTERN int getType() const; QMF_EXTERN int getAccess() const; QMF_EXTERN bool isIndex() const; QMF_EXTERN bool isOptional() const; diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk index ef02cf7562..d0a186e89f 100644 --- a/qpid/cpp/src/qmf.mk +++ b/qpid/cpp/src/qmf.mk @@ -85,6 +85,7 @@ libqmf_la_SOURCES = \ libqmf2_la_SOURCES = \ $(QMF2_API) \ + qmf/agentCapability.h \ qmf/Agent.cpp \ qmf/AgentEvent.cpp \ qmf/AgentEventImpl.h \ @@ -99,10 +100,13 @@ libqmf2_la_SOURCES = \ qmf/Data.cpp \ qmf/DataImpl.h \ qmf/exceptions.cpp \ + qmf/Expression.cpp \ + qmf/Expression.h \ qmf/Hash.cpp \ qmf/Hash.h \ qmf/PrivateImplRef.h \ qmf/Query.cpp \ + qmf/QueryImpl.h \ qmf/Schema.cpp \ qmf/SchemaCache.cpp \ qmf/SchemaCache.h \ diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp index c7ccea35d5..05bf1a38aa 100644 --- a/qpid/cpp/src/qmf/Agent.cpp +++ b/qpid/cpp/src/qmf/Agent.cpp @@ -25,6 +25,8 @@ #include "qmf/ConsoleSession.h" #include "qmf/DataImpl.h" #include "qmf/Query.h" +#include "qmf/SchemaImpl.h" +#include "qmf/agentCapability.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/AddressParser.h" #include "qpid/management/Buffer.h" @@ -51,6 +53,8 @@ string Agent::getProduct() const { return isValid() ? impl->getProduct() : ""; } string Agent::getInstance() const { return isValid() ? impl->getInstance() : ""; } const Variant& Agent::getAttribute(const string& k) const { return impl->getAttribute(k); } const Variant::Map& Agent::getAttributes() const { return impl->getAttributes(); } +ConsoleEvent Agent::querySchema(Duration t) { return impl->querySchema(t); } +uint32_t Agent::querySchemaAsync() { return impl->querySchemaAsync(); } ConsoleEvent Agent::query(const Query& q, Duration t) { return impl->query(q, t); } ConsoleEvent Agent::query(const string& q, Duration t) { return impl->query(q, t); } uint32_t Agent::queryAsync(const Query& q) { return impl->queryAsync(q); } @@ -66,11 +70,20 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema( AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) : - name(n), epoch(e), session(s), touched(true), untouchedCount(0), + name(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0), nextCorrelator(1), schemaCache(s.schemaCache) { } +void AgentImpl::setAttribute(const std::string& k, const qpid::types::Variant& v) +{ + attributes[k] = v; + if (k == "qmf.agent_capability") + try { + capability = v.asUint32(); + } catch (std::exception&) {} +} + const Variant& AgentImpl::getAttribute(const string& k) const { Variant::Map::const_iterator iter = attributes.find(k); @@ -79,6 +92,7 @@ const Variant& AgentImpl::getAttribute(const string& k) const return iter->second; } + ConsoleEvent AgentImpl::query(const Query& query, Duration timeout) { boost::shared_ptr<SyncContext> context(new SyncContext()); @@ -258,7 +272,6 @@ SchemaId AgentImpl::getSchemaId(const string& pname, uint32_t idx) const Schema AgentImpl::getSchema(const SchemaId& id, Duration timeout) { - qpid::sys::Mutex::ScopedLock l(lock); if (!schemaCache->haveSchema(id)) // // The desired schema is not in the cache. We need to asynchronously query the remote @@ -375,6 +388,14 @@ void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& ms if (aIter == props.end()) final = true; + aIter = props.find("qmf.content"); + if (aIter == props.end()) + return; + + string content_type(aIter->second.asString()); + if (content_type != "_schema" && content_type != "_schema_id" && content_type != "_data") + return; + try { correlator = boost::lexical_cast<uint32_t>(cid); } catch(const boost::bad_lexical_cast&) { correlator = 0; } @@ -392,12 +413,26 @@ void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& ms qpid::sys::Mutex::ScopedLock cl(context->lock); if (!context->response.isValid()) context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE)); - for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { - Data data(new DataImpl(lIter->asMap(), this)); - ConsoleEventImplAccess::get(context->response).addData(data); - if (data.hasSchema()) - learnSchemaId(data.getSchemaId()); - } + + if (content_type == "_data") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Data data(new DataImpl(lIter->asMap(), this)); + ConsoleEventImplAccess::get(context->response).addData(data); + if (data.hasSchema()) + learnSchemaId(data.getSchemaId()); + } + else if (content_type == "_schema_id") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + SchemaId schemaId(new SchemaIdImpl(lIter->asMap())); + ConsoleEventImplAccess::get(context->response).addSchemaId(schemaId); + learnSchemaId(schemaId); + } + else if (content_type == "_schema") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Schema schema(new SchemaImpl(lIter->asMap())); + schemaCache->declareSchema(schema); + } + if (final) { ConsoleEventImplAccess::get(context->response).setFinal(); ConsoleEventImplAccess::get(context->response).setAgent(this); @@ -410,15 +445,30 @@ void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& ms auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE)); eventImpl->setCorrelator(correlator); eventImpl->setAgent(this); - for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { - Data data(new DataImpl(lIter->asMap(), this)); - eventImpl->addData(data); - if (data.hasSchema()) - learnSchemaId(data.getSchemaId()); - } + + if (content_type == "_data") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Data data(new DataImpl(lIter->asMap(), this)); + eventImpl->addData(data); + if (data.hasSchema()) + learnSchemaId(data.getSchemaId()); + } + else if (content_type == "_schema_id") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + SchemaId schemaId(new SchemaIdImpl(lIter->asMap())); + eventImpl->addSchemaId(schemaId); + learnSchemaId(schemaId); + } + else if (content_type == "_schema") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Schema schema(new SchemaImpl(lIter->asMap())); + schemaCache->declareSchema(schema); + } + if (final) eventImpl->setFinal(); - session.enqueueEvent(eventImpl.release()); + if (content_type != "_schema") + session.enqueueEvent(eventImpl.release()); } } @@ -441,14 +491,11 @@ Query AgentImpl::stringToQuery(const std::string& text) if (iter != map.end()) packageName = iter->second.asString(); - Query query(className, packageName); + Query query(QUERY_OBJECT, className, packageName); iter = map.find("where"); - if (iter != map.end()) { - const Variant::Map& pred(iter->second.asMap()); - for (iter = pred.begin(); iter != pred.end(); iter++) - query.addPredicate(iter->first, iter->second); - } + if (iter != map.end()) + query.setPredicate(iter->second.asList()); return query; } @@ -464,42 +511,11 @@ void AgentImpl::sendQuery(const Query& query, uint32_t correlator) headers["qmf.opcode"] = "_query_request"; headers["x-amqp-0-10.app-id"] = "qmf2"; - map["_what"] = "OBJECT"; - - const DataAddr& dataAddr(query.getDataAddr()); - const SchemaId& schemaId(query.getSchemaId()); - - if (dataAddr.isValid()) - map["_object_id"] = dataAddr.asMap(); - else { - string className; - string packageName; - if (schemaId.isValid()) { - className = schemaId.getName(); - packageName = schemaId.getPackageName(); - } else { - className = query.getClassName(); - if (className.empty()) - throw QmfException("Invalid Query"); - packageName = query.getPackageName(); - } - Variant::Map idMap; - idMap["_class_name"] = className; - if (!packageName.empty()) - idMap["_package_name"] = packageName; - map["_schema_id"] = idMap; - } - - // - // TODO: Encode a simple-predicate if present. - // - msg.setReplyTo(session.replyAddress); msg.setCorrelationId(boost::lexical_cast<string>(correlator)); - encode(map, msg); - Sender sender(session.session.createSender(session.directBase + "/" + name)); - sender.send(msg); - sender.close(); + msg.setSubject(name); + encode(QueryImplAccess::get(query).asMap(), msg); + session.directSender.send(msg); QPID_LOG(trace, "SENT QueryRequest to=" << name); } @@ -521,17 +537,27 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const msg.setReplyTo(session.replyAddress); msg.setCorrelationId(boost::lexical_cast<string>(correlator)); + msg.setSubject(name); encode(map, msg); - Sender sender(session.session.createSender(session.directBase + "/" + name)); - sender.send(msg); - sender.close(); + session.directSender.send(msg); QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name); } void AgentImpl::sendSchemaRequest(const SchemaId& id) { - // TODO: Check agent's capability value to determine which kind of schema request to make + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } + + if (capability >= AGENT_CAPABILITY_V2_SCHEMA) { + Query query(QUERY_SCHEMA, id); + sendQuery(query, correlator); + return; + } #define RAW_BUFFER_SIZE 1024 char rawBuffer[RAW_BUFFER_SIZE]; @@ -541,7 +567,7 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id) buffer.putOctet('M'); buffer.putOctet('2'); buffer.putOctet('S'); - buffer.putLong(nextCorrelator++); + buffer.putLong(correlator); buffer.putShortString(id.getPackageName()); buffer.putShortString(id.getName()); buffer.putBin128(id.getHash().data()); @@ -551,9 +577,8 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id) Message msg; msg.setReplyTo(session.replyAddress); msg.setContent(content); - Sender sender(session.session.createSender(session.directBase + "/" + name)); - sender.send(msg); - sender.close(); + msg.setSubject(name); + session.directSender.send(msg); QPID_LOG(trace, "SENT V1SchemaRequest to=" << name); } diff --git a/qpid/cpp/src/qmf/AgentImpl.h b/qpid/cpp/src/qmf/AgentImpl.h index d5d2a2fdb2..aadc81b5bd 100644 --- a/qpid/cpp/src/qmf/AgentImpl.h +++ b/qpid/cpp/src/qmf/AgentImpl.h @@ -25,6 +25,7 @@ #include "qmf/Agent.h" #include "qmf/ConsoleEventImpl.h" #include "qmf/ConsoleSessionImpl.h" +#include "qmf/QueryImpl.h" #include "qmf/SchemaCache.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/Message.h" @@ -41,10 +42,11 @@ namespace qmf { // Impl-only methods // AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s); - void setAttribute(const std::string& k, const qpid::types::Variant& v) { attributes[k] = v; } + void setAttribute(const std::string& k, const qpid::types::Variant& v); void setAttribute(const std::string& k, const std::string& v) { attributes[k] = v; } void touch() { touched = true; } - uint32_t age() { untouchedCount = touched ? 0 : untouchedCount + 1; return untouchedCount; } + uint32_t age() { untouchedCount = touched ? 0 : untouchedCount + 1; touched = false; return untouchedCount; } + uint32_t getCapability() const { return capability; } void handleException(const qpid::types::Variant::Map&, const qpid::messaging::Message&); void handleMethodResponse(const qpid::types::Variant::Map&, const qpid::messaging::Message&); void handleDataIndication(const qpid::types::Variant::List&, const qpid::messaging::Message&); @@ -61,6 +63,9 @@ namespace qmf { const qpid::types::Variant& getAttribute(const std::string& k) const; const qpid::types::Variant::Map& getAttributes() const { return attributes; } + ConsoleEvent querySchema(qpid::messaging::Duration t) { return query(Query(QUERY_SCHEMA_ID), t); } + uint32_t querySchemaAsync() { return queryAsync(Query(QUERY_SCHEMA_ID)); } + ConsoleEvent query(const Query& q, qpid::messaging::Duration t); ConsoleEvent query(const std::string& q, qpid::messaging::Duration t); uint32_t queryAsync(const Query& q); @@ -88,15 +93,17 @@ namespace qmf { ConsoleSessionImpl& session; bool touched; uint32_t untouchedCount; + uint32_t capability; qpid::types::Variant::Map attributes; uint32_t nextCorrelator; std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap; boost::shared_ptr<SchemaCache> schemaCache; mutable std::set<std::string> packageSet; - std::set<SchemaId> schemaIdSet; + std::set<SchemaId, SchemaIdCompare> schemaIdSet; Query stringToQuery(const std::string&); void sendQuery(const Query&, uint32_t); + void sendSchemaIdQuery(uint32_t); void sendMethod(const std::string&, const qpid::types::Variant::Map&, const DataAddr&, uint32_t); void sendSchemaRequest(const SchemaId&); void learnSchemaId(const SchemaId&); diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp index 28c324cc02..eca48d6b83 100644 --- a/qpid/cpp/src/qmf/AgentSession.cpp +++ b/qpid/cpp/src/qmf/AgentSession.cpp @@ -28,7 +28,8 @@ #include "qmf/SchemaImpl.h" #include "qmf/DataAddrImpl.h" #include "qmf/DataImpl.h" -#include "qmf/Query.h" +#include "qmf/QueryImpl.h" +#include "qmf/agentCapability.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Condition.h" #include "qpid/sys/Thread.h" @@ -86,11 +87,14 @@ namespace qmf { private: typedef map<DataAddr, Data, DataAddrCompare> DataIndex; + typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap; mutable qpid::sys::Mutex lock; qpid::sys::Condition cond; Connection connection; Session session; + Sender directSender; + Sender topicSender; string domain; Variant::Map attributes; Variant::Map options; @@ -103,6 +107,7 @@ namespace qmf { uint32_t interval; uint64_t lastHeartbeat; uint64_t lastVisit; + bool forceHeartbeat; bool externalStorage; bool autoAllowQueries; bool autoAllowMethods; @@ -110,21 +115,20 @@ namespace qmf { string directBase; string topicBase; - set<string> packages; - map<SchemaId, Schema, SchemaIdCompare> schemata; + SchemaMap schemata; DataIndex globalIndex; - map<SchemaId, DataIndex, SchemaIdCompare> schemaIndex; + map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex; void checkOpen(); void setAgentName(); void enqueueEvent(const AgentEvent&); - void handleLocateRequest(const Variant::Map& content, const Message& msg); + void handleLocateRequest(const Variant::List& content, const Message& msg); void handleMethodRequest(const Variant::Map& content, const Message& msg); void handleQueryRequest(const Variant::Map& content, const Message& msg); + void handleSchemaRequest(AgentEvent&); void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); void dispatch(Message); void sendHeartbeat(); - bool predicateMatch(const Query&, const Data&); void flushResponses(AgentEvent&, bool); void periodicProcessing(uint64_t); void run(); @@ -166,14 +170,14 @@ void AgentSession::raiseEvent(const Data& d) { impl->raiseEvent(d); } AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), - bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), externalStorage(false), - autoAllowQueries(true), autoAllowMethods(true), + bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false), + externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) { // // Set Capability Level to 1 // - attributes["_capability_level"] = 1; + attributes["qmf.agent_capability"] = AGENT_CAPABILITY_0_8; if (!options.empty()) { qpid::messaging::AddressParser parser(options); @@ -234,6 +238,9 @@ void AgentSessionImpl::open() directRx.setCapacity(64); topicRx.setCapacity(64); + directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); + topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}"); + // Start the receiver thread threadCanceled = false; thread = new qpid::sys::Thread(*this); @@ -280,18 +287,19 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) void AgentSessionImpl::registerSchema(Schema& schema) { - qpid::sys::Mutex::ScopedLock l(lock); - schemaUpdateTime = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); - if (!schema.isFinalized()) schema.finalize(); - const SchemaId& schemaId(schema.getSchemaId()); - const string& packageName(schemaId.getPackageName()); - packages.insert(packageName); + qpid::sys::Mutex::ScopedLock l(lock); schemata[schemaId] = schema; schemaIndex[schemaId] = DataIndex(); + + // + // Get the news out at the next periodic interval that there is new schema information. + // + schemaUpdateTime = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + forceHeartbeat = true; } @@ -380,34 +388,14 @@ void AgentSessionImpl::authAccept(AgentEvent& authEvent) map<SchemaId, DataIndex>::const_iterator iter = schemaIndex.find(query.getSchemaId()); if (iter != schemaIndex.end()) for (DataIndex::const_iterator dIter = iter->second.begin(); dIter != iter->second.end(); dIter++) - if (predicateMatch(query, dIter->second)) + if (query.matchesPredicate(dIter->second.getProperties())) response(event, dIter->second); } complete(event); return; } - const string& className(query.getClassName()); - const string& packageName(query.getPackageName()); - - if (className.empty()) { - raiseException(event, "Query is Invalid"); - return; - } - - { - qpid::sys::Mutex::ScopedLock l(lock); - map<SchemaId, DataIndex>::const_iterator sIter; - for (sIter = schemaIndex.begin(); sIter != schemaIndex.end(); sIter++) { - const SchemaId& schemaId(sIter->first); - if (schemaId.getName() == className && - (packageName.empty() || schemaId.getPackageName() == packageName)) - for (DataIndex::const_iterator dIter = sIter->second.begin(); dIter != sIter->second.end(); dIter++) - if (predicateMatch(query, dIter->second)) - response(event, dIter->second); - } - } - complete(event); + raiseException(event, "Query is Invalid"); } @@ -501,10 +489,6 @@ void AgentSessionImpl::raiseEvent(const Data& data) Variant::Map map; Variant::Map& headers(msg.getProperties()); - std::stringstream address; - - address << topicBase << "/agent.ind.event"; - // TODO: add severity.package.class to key // or modify to send only to subscriptions with matching queries @@ -513,13 +497,12 @@ void AgentSessionImpl::raiseEvent(const Data& data) headers["qmf.content"] = "_event"; headers["qmf.agent"] = agentName; headers["x-amqp-0-10.app-id"] = "qmf2"; + msg.setSubject("agent.ind.event"); encode(DataImplAccess::get(data).asMap(), msg); - Sender sender(session.createSender(address.str())); - sender.send(msg); - sender.close(); + topicSender.send(msg); - QPID_LOG(trace, "SENT EventIndication to=" << address.str()); + QPID_LOG(trace, "SENT EventIndication to=agent.ind.event"); } @@ -571,10 +554,19 @@ void AgentSessionImpl::setAgentName() } -void AgentSessionImpl::handleLocateRequest(const Variant::Map&, const Message& msg) +void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg) { QPID_LOG(trace, "RCVD AgentLocateRequest"); + if (!predicate.empty()) { + Query agentQuery(QUERY_OBJECT); + agentQuery.setPredicate(predicate); + if (!agentQuery.matchesPredicate(attributes)) { + QPID_LOG(trace, "AgentLocate predicate does not match this agent, ignoring"); + return; + } + } + Message reply; Variant::Map map; Variant::Map& headers(reply.getProperties()); @@ -643,64 +635,72 @@ void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Mes // // Construct an AgentEvent to be sent to the application or directly handled by the agent. // + auto_ptr<QueryImpl> queryImpl(new QueryImpl(content)); auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_AUTH_QUERY)); eventImpl->setUserId(msg.getUserId()); eventImpl->setReplyTo(msg.getReplyTo()); eventImpl->setCorrelationId(msg.getCorrelationId()); + eventImpl->setQuery(queryImpl.release()); + AgentEvent ae(eventImpl.release()); - Query query; - Variant::Map::const_iterator iter; - - iter = content.find("_what"); - if (iter == content.end()) { - QPID_LOG(error, "Received QueryRequest with no _what element"); + if (ae.getQuery().getTarget() == QUERY_SCHEMA_ID || ae.getQuery().getTarget() == QUERY_SCHEMA) { + handleSchemaRequest(ae); return; } - if (iter->second.asString() == "OBJECT") { - // - // This is an object query, handle the various flavors of query. - // - iter = content.find("_object_id"); - if (iter != content.end()) { - auto_ptr<DataAddrImpl> addrImpl(new DataAddrImpl(iter->second.asMap())); - query = Query(DataAddr(addrImpl.release())); - } else { - iter = content.find("_schema_id"); - if (iter != content.end()) { - const Variant::Map& map(iter->second.asMap()); - string className; - string packageName; - - iter = map.find("_class_name"); - if (iter == map.end()) { - QPID_LOG(error, "Received QueryRequest with no invalid schemaId"); - return; - } + if (autoAllowQueries) + authAccept(ae); + else + enqueueEvent(ae); +} - className = iter->second.asString(); - iter = map.find("_package_name"); - if (iter != map.end()) - packageName = iter->second.asString(); - query = Query(className, packageName); - } else { - QPID_LOG(error, "Received QueryRequest with no valid elements"); - return; - } - } +void AgentSessionImpl::handleSchemaRequest(AgentEvent& event) +{ + SchemaMap::const_iterator iter; + string error; + const Query& query(event.getQuery()); + + Message msg; + Variant::List content; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); - eventImpl->setQuery(query); + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.agent"] = agentName; + headers["x-amqp-0-10.app-id"] = "qmf2"; - if (autoAllowQueries) { - AgentEvent ae(eventImpl.release()); - authAccept(ae); - } else - enqueueEvent(AgentEvent(eventImpl.release())); + { + qpid::sys::Mutex::ScopedLock l(lock); + if (query.getTarget() == QUERY_SCHEMA_ID) { + headers["qmf.content"] = "_schema_id"; + for (iter = schemata.begin(); iter != schemata.end(); iter++) + content.push_back(SchemaIdImplAccess::get(iter->first).asMap()); + } else if (query.getSchemaId().isValid()) { + headers["qmf.content"] = "_schema"; + iter = schemata.find(query.getSchemaId()); + if (iter != schemata.end()) + content.push_back(SchemaImplAccess::get(iter->second).asMap()); + } else { + error = "Invalid Schema Query: Requests for SCHEMA must supply a valid schema ID."; + } + } - } else if (iter->second.asString() == "SCHEMA") { - // TODO: process a v2 schema request + if (!error.empty()) { + raiseException(event, error); + return; } + + AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); + + msg.setCorrelationId(eventImpl.getCorrelationId()); + encode(content, msg); + Sender sender(session.createSender(eventImpl.getReplyTo())); + sender.send(msg); + sender.close(); + + QPID_LOG(trace, "SENT QueryResponse(Schema) to=" << eventImpl.getReplyTo()); } @@ -723,17 +723,20 @@ void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, u SchemaId dataId(SCHEMA_TYPE_DATA, packageName, className); dataId.setHash(hash); - iter = schemata.find(dataId); - if (iter != schemata.end()) - replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq); - else { - SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className); - eventId.setHash(hash); + { + qpid::sys::Mutex::ScopedLock l(lock); iter = schemata.find(dataId); if (iter != schemata.end()) replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq); - else - return; + else { + SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className); + eventId.setHash(hash); + iter = schemata.find(dataId); + if (iter != schemata.end()) + replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq); + else + return; + } } Message reply; @@ -765,23 +768,30 @@ void AgentSessionImpl::dispatch(Message msg) return; } - if (msg.getContentType() != "amqp/map") { - QPID_LOG(trace, "Message received with content type '" << msg.getContentType() << - "'. Expected 'amqp/map'"); - return; - } + const string& opcode = iter->second.asString(); - Variant::Map content; - decode(msg, content); + if (msg.getContentType() == "amqp/list") { + Variant::List content; + decode(msg, content); - const string& opcode = iter->second.asString(); + if (opcode == "_agent_locate_request") handleLocateRequest(content, msg); + else { + QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/list' content: " << opcode); + } - if (opcode == "_agent_locate_request") handleLocateRequest(content, msg); - else if (opcode == "_method_request") handleMethodRequest(content, msg); - else if (opcode == "_query_request") handleQueryRequest(content, msg); - else { - QPID_LOG(trace, "Unknown QMFv2 opcode: " << opcode); + } else if (msg.getContentType() == "amqp/map") { + Variant::Map content; + decode(msg, content); + + if (opcode == "_method_request") handleMethodRequest(content, msg); + else if (opcode == "_query_request") handleQueryRequest(content, msg); + else { + QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/map' content: " << opcode); + } + } else { + QPID_LOG(trace, "Unexpected QMFv2 content type. Expected amqp/list or amqp/map"); } + } else { // // Dispatch a QMFv1 formatted message @@ -812,7 +822,7 @@ void AgentSessionImpl::sendHeartbeat() Variant::Map& headers(msg.getProperties()); std::stringstream address; - address << topicBase << "/agent.ind.heartbeat"; + address << "agent.ind.heartbeat"; // append .<vendor>.<product> to address key if present. Variant::Map::const_iterator v; @@ -827,6 +837,7 @@ void AgentSessionImpl::sendHeartbeat() headers["qmf.opcode"] = "_agent_heartbeat_indication"; headers["qmf.agent"] = agentName; headers["x-amqp-0-10.app-id"] = "qmf2"; + msg.setSubject(address.str()); map["_values"] = attributes; map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); @@ -835,17 +846,8 @@ void AgentSessionImpl::sendHeartbeat() map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime; encode(map, msg); - Sender sender = session.createSender(address.str()); - sender.send(msg); + topicSender.send(msg); QPID_LOG(trace, "SENT AgentHeartbeat name=" << agentName); - sender.close(); -} - - -bool AgentSessionImpl::predicateMatch(const Query&, const Data&) -{ - // TODO: Implement a proper predicate match - return true; } @@ -901,8 +903,9 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds) // // If the hearbeat interval has elapsed, send a heartbeat. // - if (seconds - lastHeartbeat >= interval) { + if (forceHeartbeat || (seconds - lastHeartbeat >= interval)) { lastHeartbeat = seconds; + forceHeartbeat = false; sendHeartbeat(); } @@ -941,4 +944,3 @@ void AgentSessionImpl::run() QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName); } - diff --git a/qpid/cpp/src/qmf/ConsoleEvent.cpp b/qpid/cpp/src/qmf/ConsoleEvent.cpp index d5775a86b4..b76abc83c6 100644 --- a/qpid/cpp/src/qmf/ConsoleEvent.cpp +++ b/qpid/cpp/src/qmf/ConsoleEvent.cpp @@ -37,12 +37,28 @@ ConsoleEvent& ConsoleEvent::operator=(const ConsoleEvent& s) { return PI::assign ConsoleEventCode ConsoleEvent::getType() const { return impl->getType(); } uint32_t ConsoleEvent::getCorrelator() const { return impl->getCorrelator(); } Agent ConsoleEvent::getAgent() const { return impl->getAgent(); } +AgentDelReason ConsoleEvent::getAgentDelReason() const { return impl->getAgentDelReason(); } +uint32_t ConsoleEvent::getSchemaIdCount() const { return impl->getSchemaIdCount(); } +SchemaId ConsoleEvent::getSchemaId(uint32_t i) const { return impl->getSchemaId(i); } uint32_t ConsoleEvent::getDataCount() const { return impl->getDataCount(); } Data ConsoleEvent::getData(uint32_t i) const { return impl->getData(i); } bool ConsoleEvent::isFinal() const { return impl->isFinal(); } const Variant::Map& ConsoleEvent::getArguments() const { return impl->getArguments(); } -Data ConsoleEventImpl::getData(uint32_t i) const { + +SchemaId ConsoleEventImpl::getSchemaId(uint32_t i) const +{ + uint32_t count = 0; + for (list<SchemaId>::const_iterator iter = newSchemaIds.begin(); iter != newSchemaIds.end(); iter++) { + if (count++ == i) + return *iter; + } + throw IndexOutOfRange(); +} + + +Data ConsoleEventImpl::getData(uint32_t i) const +{ uint32_t count = 0; for (list<Data>::const_iterator iter = dataList.begin(); iter != dataList.end(); iter++) { if (count++ == i) diff --git a/qpid/cpp/src/qmf/ConsoleEventImpl.h b/qpid/cpp/src/qmf/ConsoleEventImpl.h index fe7405bb06..e7acb54152 100644 --- a/qpid/cpp/src/qmf/ConsoleEventImpl.h +++ b/qpid/cpp/src/qmf/ConsoleEventImpl.h @@ -34,10 +34,12 @@ namespace qmf { // // Impl-only methods // - ConsoleEventImpl(ConsoleEventCode e) : eventType(e), correlator(0), final(false) {} + ConsoleEventImpl(ConsoleEventCode e, AgentDelReason r = AGENT_DEL_AGED) : + eventType(e), delReason(r), correlator(0), final(false) {} void setCorrelator(uint32_t c) { correlator = c; } void setAgent(const Agent& a) { agent = a; } void addData(const Data& d) { dataList.push_back(Data(d)); } + void addSchemaId(const SchemaId& s) { newSchemaIds.push_back(SchemaId(s)); } void setFinal() { final = true; } void setArguments(const qpid::types::Variant::Map& a) { arguments = a; } @@ -47,6 +49,9 @@ namespace qmf { ConsoleEventCode getType() const { return eventType; } uint32_t getCorrelator() const { return correlator; } Agent getAgent() const { return agent; } + AgentDelReason getAgentDelReason() const { return delReason; } + uint32_t getSchemaIdCount() const { return newSchemaIds.size(); } + SchemaId getSchemaId(uint32_t) const; uint32_t getDataCount() const { return dataList.size(); } Data getData(uint32_t i) const; bool isFinal() const { return final; } @@ -54,10 +59,12 @@ namespace qmf { private: const ConsoleEventCode eventType; + const AgentDelReason delReason; uint32_t correlator; Agent agent; bool final; std::list<Data> dataList; + std::list<SchemaId> newSchemaIds; qpid::types::Variant::Map arguments; }; diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp index 18986222c1..868df302ce 100644 --- a/qpid/cpp/src/qmf/ConsoleSession.cpp +++ b/qpid/cpp/src/qmf/ConsoleSession.cpp @@ -57,8 +57,9 @@ Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnecte //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), - lastVisit(0), schemaCache(new SchemaCache()) + connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false), + thread(0), threadCanceled(false), + lastVisit(0), lastAgePass(0), schemaCache(new SchemaCache()) { if (!options.empty()) { qpid::messaging::AddressParser parser(options); @@ -70,6 +71,10 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : iter = optMap.find("domain"); if (iter != optMap.end()) domain = iter->second.asString(); + + iter = optMap.find("max-agent-age"); + if (iter != optMap.end()) + maxAgentAgeMinutes = iter->second.asUint32(); } } @@ -81,13 +86,35 @@ ConsoleSessionImpl::~ConsoleSessionImpl() } -void ConsoleSessionImpl::setAgentFilter(const string&) +void ConsoleSessionImpl::setAgentFilter(const string& predicate) { + agentQuery = Query(QUERY_OBJECT, predicate); + // - // TODO: Setup the new agent filter - // TODO: Purge the agent list of any agents that don't match the filter - // TODO: Send an agent locate with the new filter + // Purge the agent list of any agents that don't match the filter. // + { + qpid::sys::Mutex::ScopedLock l(lock); + map<string, Agent> toDelete; + for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++) + if ((iter->second.getName() != connectedBrokerAgent.getName()) && + (!agentQuery.matchesPredicate(iter->second.getAttributes()))) { + toDelete[iter->first] = iter->second; + } + + for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) { + agents.erase(iter->first); + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_FILTER)); + eventImpl->setAgent(iter->second); + enqueueEventLH(eventImpl.release()); + } + } + + // + // Broadcast an agent locate request with our new criteria. + // + if (opened) + sendAgentLocate(); } @@ -111,15 +138,23 @@ void ConsoleSessionImpl::open() Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}"); directRx.setCapacity(64); - topicRx.setCapacity(64); + topicRx.setCapacity(128); legacyRx.setCapacity(64); + directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); + topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}"); + + directSender.setCapacity(64); + topicSender.setCapacity(128); + // Start the receiver thread threadCanceled = false; thread = new qpid::sys::Thread(*this); // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent. sendBrokerLocate(); + if (agentQuery) + sendAgentLocate(); opened = true; } @@ -198,18 +233,17 @@ void ConsoleSessionImpl::dispatch(Message msg) { const Variant::Map& properties(msg.getProperties()); Variant::Map::const_iterator iter; + Variant::Map::const_iterator oiter; + oiter = properties.find("qmf.opcode"); iter = properties.find("x-amqp-0-10.app-id"); - if (iter != properties.end() && iter->second.asString() == "qmf2") { + if (iter == properties.end()) + iter = properties.find("app_id"); + if (iter != properties.end() && iter->second.asString() == "qmf2" && oiter != properties.end()) { // // Dispatch a QMFv2 formatted message // - iter = properties.find("qmf.opcode"); - if (iter == properties.end()) { - QPID_LOG(trace, "Message received with no 'qmf.opcode' header"); - return; - } - const string& opcode = iter->second.asString(); + const string& opcode = oiter->second.asString(); iter = properties.find("qmf.agent"); if (iter == properties.end()) { @@ -240,11 +274,8 @@ void ConsoleSessionImpl::dispatch(Message msg) return; } - if (!agent.isValid()) { - QPID_LOG(trace, "Received a QMFv2 message with opcode=" << opcode << - " from an unknown agent " << agentName); + if (!agent.isValid()) return; - } AgentImpl& agentImpl(AgentImplAccess::get(agent)); @@ -305,14 +336,34 @@ void ConsoleSessionImpl::sendBrokerLocate() msg.setReplyTo(replyAddress); msg.setCorrelationId("broker-locate"); - Sender sender(session.createSender(directBase + "/broker")); - sender.send(msg); - sender.close(); + msg.setSubject("broker"); + + directSender.send(msg); QPID_LOG(trace, "SENT AgentLocate to broker"); } +void ConsoleSessionImpl::sendAgentLocate() +{ + Message msg; + Variant::Map& headers(msg.getProperties()); + + headers["method"] = "request"; + headers["qmf.opcode"] = "_agent_locate_request"; + headers["x-amqp-0-10.app-id"] = "qmf2"; + + msg.setReplyTo(replyAddress); + msg.setCorrelationId("agent-locate"); + msg.setSubject("console.request.agent_locate"); + encode(agentQuery.getPredicate(), msg); + + topicSender.send(msg); + + QPID_LOG(trace, "SENT AgentLocate to topic"); +} + + void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Variant::Map& content, const Message& msg) { Variant::Map::const_iterator iter; @@ -326,18 +377,25 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian Variant::Map attrs(iter->second.asMap()); // - // TODO: Check this agent against the agent filter. Exit if it doesn't match. - // (only if this isn't the connected broker agent) + // Check this agent against the agent filter. Exit if it doesn't match. + // (only if this isn't the connected broker agent) // + if ((cid != "broker-locate") && agentQuery && (!agentQuery.matchesPredicate(attrs))) + return; + + QPID_LOG(trace, "RCVD AgentHeartbeat from an agent matching our filter: " << agentName); - iter = content.find("epoch"); - if (iter != content.end()) + iter = attrs.find("epoch"); + if (iter != attrs.end()) epoch = iter->second.asUint32(); { qpid::sys::Mutex::ScopedLock l(lock); map<string, Agent>::iterator aIter = agents.find(agentName); if (aIter == agents.end()) { + // + // This is a new agent. We have no current record of its existence. + // auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this)); for (iter = attrs.begin(); iter != attrs.end(); iter++) if (iter->first != "epoch") @@ -345,24 +403,47 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian agent = Agent(impl.release()); agents[agentName] = agent; + // + // Enqueue a notification of the new agent. + // auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD)); eventImpl->setAgent(agent); enqueueEventLH(ConsoleEvent(eventImpl.release())); - } else + } else { + // + // This is a refresh of an agent we are already tracking. + // agent = aIter->second; + AgentImpl& impl(AgentImplAccess::get(agent)); + impl.touch(); + if (impl.getEpoch() != epoch) { + // + // The agent has restarted since the last time we heard from it. + // Enqueue a notification. + // + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART)); + eventImpl->setAgent(agent); + enqueueEventLH(ConsoleEvent(eventImpl.release())); + } + + iter = attrs.find("schemaUpdated"); + if (iter != attrs.end()) { + uint64_t ts(iter->second.asUint64()); + if (ts > impl.getAttribute("schemaUpdated").asUint64()) { + // + // The agent has added new schema entries since we last heard from it. + // Enqueue a notification. + // + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE)); + eventImpl->setAgent(agent); + enqueueEventLH(ConsoleEvent(eventImpl.release())); + } + } + } if (cid == "broker-locate") connectedBrokerAgent = agent; } - - AgentImplAccess::get(agent).touch(); - - // - // Changes we are interested in: - // - // agentEpoch - indicates that the agent restarted since we last heard from it - // schemaUpdated - indicates that the agent has registered new schemata - // } @@ -385,8 +466,27 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds) lastVisit = seconds; // - // TODO: Handle the aging of agent records + // Handle the aging of agent records // + if (lastAgePass == 0) + lastAgePass = seconds; + if (seconds - lastAgePass >= 60) { + lastAgePass = seconds; + map<string, Agent> toDelete; + qpid::sys::Mutex::ScopedLock l(lock); + + for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++) + if ((iter->second.getName() != connectedBrokerAgent.getName()) && + (AgentImplAccess::get(iter->second).age() > maxAgentAgeMinutes)) + toDelete[iter->first] = iter->second; + + for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) { + agents.erase(iter->first); + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_AGED)); + eventImpl->setAgent(iter->second); + enqueueEventLH(eventImpl.release()); + } + } } diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h index 9a077b0390..705d9de4d8 100644 --- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h +++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h @@ -28,6 +28,7 @@ #include "qmf/Schema.h" #include "qmf/ConsoleEventImpl.h" #include "qmf/SchemaCache.h" +#include "qmf/Query.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Condition.h" #include "qpid/sys/Thread.h" @@ -36,6 +37,7 @@ #include "qpid/messaging/Message.h" #include "qpid/messaging/Connection.h" #include "qpid/messaging/Session.h" +#include "qpid/messaging/Sender.h" #include "qpid/messaging/Address.h" #include "qpid/management/Buffer.h" #include "qpid/types/Variant.h" @@ -65,13 +67,17 @@ namespace qmf { qpid::sys::Condition cond; qpid::messaging::Connection connection; qpid::messaging::Session session; + qpid::messaging::Sender directSender; + qpid::messaging::Sender topicSender; std::string domain; - qpid::types::Variant::Map agentFilter; + uint32_t maxAgentAgeMinutes; + Query agentQuery; bool opened; std::queue<ConsoleEvent> eventQueue; qpid::sys::Thread* thread; bool threadCanceled; uint64_t lastVisit; + uint64_t lastAgePass; std::map<std::string, Agent> agents; Agent connectedBrokerAgent; qpid::messaging::Address replyAddress; @@ -83,6 +89,7 @@ namespace qmf { void enqueueEventLH(const ConsoleEvent&); void dispatch(qpid::messaging::Message); void sendBrokerLocate(); + void sendAgentLocate(); void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&); void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&); void periodicProcessing(uint64_t); diff --git a/qpid/cpp/src/qmf/Expression.cpp b/qpid/cpp/src/qmf/Expression.cpp new file mode 100644 index 0000000000..7d48678c15 --- /dev/null +++ b/qpid/cpp/src/qmf/Expression.cpp @@ -0,0 +1,441 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/exceptions.h" +#include "qmf/Expression.h" +#include <iostream> + +using namespace std; +using namespace qmf; +using namespace qpid::types; + +Expression::Expression(const Variant::List& expr) +{ + static int level(0); + level++; + Variant::List::const_iterator iter(expr.begin()); + string op(iter->asString()); + iter++; + + if (op == "not") logicalOp = LOGICAL_NOT; + else if (op == "and") logicalOp = LOGICAL_AND; + else if (op == "or") logicalOp = LOGICAL_OR; + else { + logicalOp = LOGICAL_ID; + if (op == "eq") boolOp = BOOL_EQ; + else if (op == "ne") boolOp = BOOL_NE; + else if (op == "lt") boolOp = BOOL_LT; + else if (op == "le") boolOp = BOOL_LE; + else if (op == "gt") boolOp = BOOL_GT; + else if (op == "ge") boolOp = BOOL_GE; + else if (op == "re_match") boolOp = BOOL_RE_MATCH; + else if (op == "exists") boolOp = BOOL_EXISTS; + else if (op == "true") boolOp = BOOL_TRUE; + else if (op == "false") boolOp = BOOL_FALSE; + else + throw QmfException("Invalid operator in predicate expression"); + } + + if (logicalOp == LOGICAL_ID) { + switch (boolOp) { + case BOOL_EQ: + case BOOL_NE: + case BOOL_LT: + case BOOL_LE: + case BOOL_GT: + case BOOL_GE: + case BOOL_RE_MATCH: + // + // Binary operator: get two operands. + // + operandCount = 2; + break; + + case BOOL_EXISTS: + // + // Unary operator: get one operand. + // + operandCount = 1; + break; + + case BOOL_TRUE: + case BOOL_FALSE: + // + // Literal operator: no operands. + // + operandCount = 0; + break; + } + + for (int idx = 0; idx < operandCount; idx++) { + if (iter == expr.end()) + throw QmfException("Too few operands for operation: " + op); + if (iter->getType() == VAR_STRING) { + quoted[idx] = false; + operands[idx] = *iter; + } else if (iter->getType() == VAR_LIST) { + const Variant::List& sublist(iter->asList()); + Variant::List::const_iterator subIter(sublist.begin()); + if (subIter != sublist.end() && subIter->asString() == "quote") { + quoted[idx] = true; + subIter++; + if (subIter != sublist.end()) { + operands[idx] = *subIter; + subIter++; + if (subIter != sublist.end()) + throw QmfException("Extra tokens at end of 'quote'"); + } + } else + throw QmfException("Expected '[quote, <token>]'"); + } else + throw QmfException("Expected string or list as operand for: " + op); + iter++; + } + + if (iter != expr.end()) + throw QmfException("Too many operands for operation: " + op); + + } else { + // + // This is a logical expression, collect sub-expressions + // + while (iter != expr.end()) { + if (iter->getType() != VAR_LIST) + throw QmfException("Operands of " + op + " must be lists"); + expressionList.push_back(boost::shared_ptr<Expression>(new Expression(iter->asList()))); + iter++; + } + } + level--; +} + + +bool Expression::evaluate(const Variant::Map& data) const +{ + list<boost::shared_ptr<Expression> >::const_iterator iter; + + switch (logicalOp) { + case LOGICAL_ID: + return boolEval(data); + + case LOGICAL_NOT: + for (iter = expressionList.begin(); iter != expressionList.end(); iter++) + if ((*iter)->evaluate(data)) + return false; + return true; + + case LOGICAL_AND: + for (iter = expressionList.begin(); iter != expressionList.end(); iter++) + if (!(*iter)->evaluate(data)) + return false; + return true; + + case LOGICAL_OR: + for (iter = expressionList.begin(); iter != expressionList.end(); iter++) + if ((*iter)->evaluate(data)) + return true; + return false; + } + + return false; +} + + +bool Expression::boolEval(const Variant::Map& data) const +{ + Variant val[2]; + bool exists[2]; + + for (int idx = 0; idx < operandCount; idx++) { + if (quoted[idx]) { + exists[idx] = true; + val[idx] = operands[idx]; + } else { + Variant::Map::const_iterator mIter(data.find(operands[idx].asString())); + if (mIter == data.end()) { + exists[idx] = false; + } else { + exists[idx] = true; + val[idx] = mIter->second; + } + } + } + + switch (boolOp) { + case BOOL_EQ: return (exists[0] && exists[1] && (val[0].asString() == val[1].asString())); + case BOOL_NE: return (exists[0] && exists[1] && (val[0].asString() != val[1].asString())); + case BOOL_LT: return (exists[0] && exists[1] && lessThan(val[0], val[1])); + case BOOL_LE: return (exists[0] && exists[1] && lessEqual(val[0], val[1])); + case BOOL_GT: return (exists[0] && exists[1] && greaterThan(val[0], val[1])); + case BOOL_GE: return (exists[0] && exists[1] && greaterEqual(val[0], val[1])); + case BOOL_RE_MATCH: return false; // TODO + case BOOL_EXISTS: return exists[0]; + case BOOL_TRUE: return true; + case BOOL_FALSE: return false; + } + + return false; +} + +bool Expression::lessThan(const Variant& left, const Variant& right) const +{ + switch (left.getType()) { + case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64: + case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64: + switch (right.getType()) { + case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64: + case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64: + return left.asInt64() < right.asInt64(); + case VAR_STRING: + try { + return left.asInt64() < right.asInt64(); + } catch (std::exception&) {} + break; + default: + break; + } + break; + + case VAR_FLOAT: case VAR_DOUBLE: + switch (right.getType()) { + case VAR_FLOAT: case VAR_DOUBLE: + return left.asDouble() < right.asDouble(); + case VAR_STRING: + try { + return left.asDouble() < right.asDouble(); + } catch (std::exception&) {} + break; + default: + break; + } + break; + + case VAR_STRING: + switch (right.getType()) { + case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64: + case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64: + try { + return left.asInt64() < right.asInt64(); + } catch (std::exception&) {} + break; + + case VAR_FLOAT: case VAR_DOUBLE: + try { + return left.asDouble() < right.asDouble(); + } catch (std::exception&) {} + break; + + case VAR_STRING: + return left.asString() < right.asString(); + default: + break; + } + default: + break; + } + + return false; +} + + +bool Expression::lessEqual(const Variant& left, const Variant& right) const +{ + switch (left.getType()) { + case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64: + case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64: + switch (right.getType()) { + case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64: + case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64: + return left.asInt64() <= right.asInt64(); + case VAR_STRING: + try { + return left.asInt64() <= right.asInt64(); + } catch (std::exception&) {} + break; + default: + break; + } + break; + + case VAR_FLOAT: case VAR_DOUBLE: + switch (right.getType()) { + case VAR_FLOAT: case VAR_DOUBLE: + return left.asDouble() <= right.asDouble(); + case VAR_STRING: + try { + return left.asDouble() <= right.asDouble(); + } catch (std::exception&) {} + break; + default: + break; + } + break; + + case VAR_STRING: + switch (right.getType()) { + case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64: + case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64: + try { + return left.asInt64() <= right.asInt64(); + } catch (std::exception&) {} + break; + + case VAR_FLOAT: case VAR_DOUBLE: + try { + return left.asDouble() <= right.asDouble(); + } catch (std::exception&) {} + break; + + case VAR_STRING: + return left.asString() <= right.asString(); + default: + break; + } + default: + break; + } + + return false; +} + + +bool Expression::greaterThan(const Variant& left, const Variant& right) const +{ + switch (left.getType()) { + case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64: + case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64: + switch (right.getType()) { + case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64: + case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64: + return left.asInt64() > right.asInt64(); + case VAR_STRING: + try { + return left.asInt64() > right.asInt64(); + } catch (std::exception&) {} + break; + default: + break; + } + break; + + case VAR_FLOAT: case VAR_DOUBLE: + switch (right.getType()) { + case VAR_FLOAT: case VAR_DOUBLE: + return left.asDouble() > right.asDouble(); + case VAR_STRING: + try { + return left.asDouble() > right.asDouble(); + } catch (std::exception&) {} + break; + default: + break; + } + break; + + case VAR_STRING: + switch (right.getType()) { + case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64: + case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64: + try { + return left.asInt64() > right.asInt64(); + } catch (std::exception&) {} + break; + + case VAR_FLOAT: case VAR_DOUBLE: + try { + return left.asDouble() > right.asDouble(); + } catch (std::exception&) {} + break; + + case VAR_STRING: + return left.asString() > right.asString(); + default: + break; + } + default: + break; + } + + return false; +} + + +bool Expression::greaterEqual(const Variant& left, const Variant& right) const +{ + switch (left.getType()) { + case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64: + case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64: + switch (right.getType()) { + case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64: + case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64: + return left.asInt64() >= right.asInt64(); + case VAR_STRING: + try { + return left.asInt64() >= right.asInt64(); + } catch (std::exception&) {} + break; + default: + break; + } + break; + + case VAR_FLOAT: case VAR_DOUBLE: + switch (right.getType()) { + case VAR_FLOAT: case VAR_DOUBLE: + return left.asDouble() >= right.asDouble(); + case VAR_STRING: + try { + return left.asDouble() >= right.asDouble(); + } catch (std::exception&) {} + break; + default: + break; + } + break; + + case VAR_STRING: + switch (right.getType()) { + case VAR_UINT8: case VAR_UINT16: case VAR_UINT32: case VAR_UINT64: + case VAR_INT8: case VAR_INT16: case VAR_INT32: case VAR_INT64: + try { + return left.asInt64() >= right.asInt64(); + } catch (std::exception&) {} + break; + + case VAR_FLOAT: case VAR_DOUBLE: + try { + return left.asDouble() >= right.asDouble(); + } catch (std::exception&) {} + break; + + case VAR_STRING: + return left.asString() >= right.asString(); + default: + break; + } + default: + break; + } + + return false; +} + + diff --git a/qpid/cpp/src/qmf/Expression.h b/qpid/cpp/src/qmf/Expression.h new file mode 100644 index 0000000000..6fbfdbc4ba --- /dev/null +++ b/qpid/cpp/src/qmf/Expression.h @@ -0,0 +1,73 @@ +#ifndef _QMF_EXPRESSION_H_ +#define _QMF_EXPRESSION_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/types/Variant.h" +#include <string> +#include <list> +#include <boost/shared_ptr.hpp> + +namespace qmf { + + enum LogicalOp { + LOGICAL_ID = 1, + LOGICAL_NOT = 2, + LOGICAL_AND = 3, + LOGICAL_OR = 4 + }; + + enum BooleanOp { + BOOL_EQ = 1, + BOOL_NE = 2, + BOOL_LT = 3, + BOOL_LE = 4, + BOOL_GT = 5, + BOOL_GE = 6, + BOOL_RE_MATCH = 7, + BOOL_EXISTS = 8, + BOOL_TRUE = 9, + BOOL_FALSE = 10 + }; + + class Expression { + public: + Expression(const qpid::types::Variant::List& expr); + bool evaluate(const qpid::types::Variant::Map& data) const; + private: + LogicalOp logicalOp; + BooleanOp boolOp; + int operandCount; + qpid::types::Variant operands[2]; + bool quoted[2]; + std::list<boost::shared_ptr<Expression> > expressionList; + + bool boolEval(const qpid::types::Variant::Map& data) const; + bool lessThan(const qpid::types::Variant& left, const qpid::types::Variant& right) const; + bool lessEqual(const qpid::types::Variant& left, const qpid::types::Variant& right) const; + bool greaterThan(const qpid::types::Variant& left, const qpid::types::Variant& right) const; + bool greaterEqual(const qpid::types::Variant& left, const qpid::types::Variant& right) const; + }; + +} + +#endif + diff --git a/qpid/cpp/src/qmf/Query.cpp b/qpid/cpp/src/qmf/Query.cpp index 9b67c7d8b3..09cca4d5bb 100644 --- a/qpid/cpp/src/qmf/Query.cpp +++ b/qpid/cpp/src/qmf/Query.cpp @@ -19,56 +19,135 @@ * */ -#include "qpid/RefCounted.h" #include "qmf/PrivateImplRef.h" -#include "qmf/Query.h" -#include "qmf/DataAddr.h" -#include "qmf/SchemaId.h" +#include "qmf/exceptions.h" +#include "qmf/QueryImpl.h" +#include "qmf/DataAddrImpl.h" +#include "qmf/SchemaIdImpl.h" +#include "qpid/messaging/AddressParser.h" using namespace std; +using namespace qmf; using qpid::types::Variant; -namespace qmf { - class QueryImpl : public virtual qpid::RefCounted { - public: +typedef PrivateImplRef<Query> PI; + +Query::Query(QueryImpl* impl) { PI::ctor(*this, impl); } +Query::Query(const Query& s) : qmf::Handle<QueryImpl>() { PI::copy(*this, s); } +Query::~Query() { PI::dtor(*this); } +Query& Query::operator=(const Query& s) { return PI::assign(*this, s); } + +Query::Query(QueryTarget t, const string& pr) { PI::ctor(*this, new QueryImpl(t, pr)); } +Query::Query(QueryTarget t, const string& c, const string& p, const string& pr) { PI::ctor(*this, new QueryImpl(t, c, p, pr)); } +Query::Query(QueryTarget t, const SchemaId& s, const string& pr) { PI::ctor(*this, new QueryImpl(t, s, pr)); } +Query::Query(const DataAddr& a) { PI::ctor(*this, new QueryImpl(a)); } + +QueryTarget Query::getTarget() const { return impl->getTarget(); } +const DataAddr& Query::getDataAddr() const { return impl->getDataAddr(); } +const SchemaId& Query::getSchemaId() const { return impl->getSchemaId(); } +void Query::setPredicate(const Variant::List& pr) { impl->setPredicate(pr); } +const Variant::List& Query::getPredicate() const { return impl->getPredicate(); } +bool Query::matchesPredicate(const qpid::types::Variant::Map& map) const { return impl->matchesPredicate(map); } + + +QueryImpl::QueryImpl(const Variant::Map& map) +{ + Variant::Map::const_iterator iter; + + iter = map.find("_what"); + if (iter == map.end()) + throw QmfException("Query missing _what element"); + + const string& targetString(iter->second.asString()); + if (targetString == "OBJECT") target = QUERY_OBJECT; + else if (targetString == "OBJECT_ID") target = QUERY_OBJECT_ID; + else if (targetString == "SCHEMA") target = QUERY_SCHEMA; + else if (targetString == "SCHEMA_ID") target = QUERY_SCHEMA_ID; + else + throw QmfException("Query with invalid _what value: " + targetString); + + iter = map.find("_object_id"); + if (iter != map.end()) { + auto_ptr<DataAddrImpl> addrImpl(new DataAddrImpl(iter->second.asMap())); + dataAddr = DataAddr(addrImpl.release()); + } + + iter = map.find("_schema_id"); + if (iter != map.end()) { + auto_ptr<SchemaIdImpl> sidImpl(new SchemaIdImpl(iter->second.asMap())); + schemaId = SchemaId(sidImpl.release()); + } + + iter = map.find("_where"); + if (iter != map.end()) + predicate = iter->second.asList(); +} + + +Variant::Map QueryImpl::asMap() const +{ + Variant::Map map; + string targetString; + + switch (target) { + case QUERY_OBJECT : targetString = "OBJECT"; break; + case QUERY_OBJECT_ID : targetString = "OBJECT_ID"; break; + case QUERY_SCHEMA : targetString = "SCHEMA"; break; + case QUERY_SCHEMA_ID : targetString = "SCHEMA_ID"; break; + } + + map["_what"] = targetString; + + if (dataAddr.isValid()) + map["_object_id"] = DataAddrImplAccess::get(dataAddr).asMap(); + + if (schemaId.isValid()) + map["_schema_id"] = SchemaIdImplAccess::get(schemaId).asMap(); + + if (!predicate.empty()) + map["_where"] = predicate; + + return map; +} + + +bool QueryImpl::matchesPredicate(const qpid::types::Variant::Map& data) const +{ + if (predicate.empty()) + return true; + + if (!predicateCompiled) { + expression.reset(new Expression(predicate)); + predicateCompiled = true; + } + + return expression->evaluate(data); +} + + +void QueryImpl::parsePredicate(const string& pred) +{ + if (pred.empty()) + return; + + if (pred[0] == '[') { // - // Methods from API handle + // Parse this as an AddressParser list. // - QueryImpl(const string& c, const string& p, const string&) : packageName(p), className(c) {} - QueryImpl(const SchemaId& s) : schemaId(s) {} - QueryImpl(const DataAddr& a) : dataAddr(a) {} - - const DataAddr& getDataAddr() const { return dataAddr; } - const SchemaId& getSchemaId() const { return schemaId; } - const string& getClassName() const { return className; } - const string& getPackageName() const { return packageName; } - void addPredicate(const string& k, const Variant& v) { predicate[k] = v; } - const Variant::Map& getPredicate() const { return predicate; } - - private: - string packageName; - string className; - SchemaId schemaId; - DataAddr dataAddr; - Variant::Map predicate; - }; - - typedef PrivateImplRef<Query> PI; - - Query::Query(QueryImpl* impl) { PI::ctor(*this, impl); } - Query::Query(const Query& s) : qmf::Handle<QueryImpl>() { PI::copy(*this, s); } - Query::~Query() { PI::dtor(*this); } - Query& Query::operator=(const Query& s) { return PI::assign(*this, s); } - - Query::Query(const string& c, const string& p, const string& pr) { PI::ctor(*this, new QueryImpl(c, p, pr)); } - Query::Query(const SchemaId& s) { PI::ctor(*this, new QueryImpl(s)); } - Query::Query(const DataAddr& a) { PI::ctor(*this, new QueryImpl(a)); } - - const DataAddr& Query::getDataAddr() const { return impl->getDataAddr(); } - const SchemaId& Query::getSchemaId() const { return impl->getSchemaId(); } - const string& Query::getClassName() const { return impl->getClassName(); } - const string& Query::getPackageName() const { return impl->getPackageName(); } - void Query::addPredicate(const string& k, const Variant& v) { impl->addPredicate(k, v); } - const Variant::Map& Query::getPredicate() const { return impl->getPredicate(); } + qpid::messaging::AddressParser parser(pred); + parser.parseList(predicate); + } else + throw QmfException("Invalid predicate format"); +} + + +QueryImpl& QueryImplAccess::get(Query& item) +{ + return *item.impl; } + +const QueryImpl& QueryImplAccess::get(const Query& item) +{ + return *item.impl; +} diff --git a/qpid/cpp/src/qmf/QueryImpl.h b/qpid/cpp/src/qmf/QueryImpl.h new file mode 100644 index 0000000000..27ec427684 --- /dev/null +++ b/qpid/cpp/src/qmf/QueryImpl.h @@ -0,0 +1,77 @@ +#ifndef _QMF_QUERY_IMPL_H_ +#define _QMF_QUERY_IMPL_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/Query.h" +#include "qmf/DataAddr.h" +#include "qmf/SchemaId.h" +#include "qmf/Expression.h" +#include "qpid/types/Variant.h" +#include <boost/shared_ptr.hpp> + +namespace qmf { + class QueryImpl : public virtual qpid::RefCounted { + public: + // + // Public impl-only methods + // + QueryImpl(const qpid::types::Variant::Map&); + qpid::types::Variant::Map asMap() const; + + // + // Methods from API handle + // + QueryImpl(QueryTarget t, const std::string& pr) : target(t), predicateCompiled(false) { parsePredicate(pr); } + QueryImpl(QueryTarget t, const std::string& c, const std::string& p, const std::string& pr) : + target(t), schemaId(SCHEMA_TYPE_DATA, p, c), predicateCompiled(false) { parsePredicate(pr); } + QueryImpl(QueryTarget t, const SchemaId& s, const std::string& pr) : + target(t), schemaId(s), predicateCompiled(false) { parsePredicate(pr); } + QueryImpl(const DataAddr& a) : target(QUERY_OBJECT), dataAddr(a), predicateCompiled(false) {} + + QueryTarget getTarget() const { return target; } + const DataAddr& getDataAddr() const { return dataAddr; } + const SchemaId& getSchemaId() const { return schemaId; } + void setPredicate(const qpid::types::Variant::List& pr) { predicate = pr; } + const qpid::types::Variant::List& getPredicate() const { return predicate; } + bool matchesPredicate(const qpid::types::Variant::Map& map) const; + + private: + QueryTarget target; + SchemaId schemaId; + DataAddr dataAddr; + qpid::types::Variant::List predicate; + mutable bool predicateCompiled; + mutable boost::shared_ptr<Expression> expression; + + void parsePredicate(const std::string& s); + }; + + struct QueryImplAccess + { + static QueryImpl& get(Query&); + static const QueryImpl& get(const Query&); + }; +} + +#endif + diff --git a/qpid/cpp/src/qmf/Schema.cpp b/qpid/cpp/src/qmf/Schema.cpp index 5676125c22..e003f9d06f 100644 --- a/qpid/cpp/src/qmf/Schema.cpp +++ b/qpid/cpp/src/qmf/Schema.cpp @@ -61,8 +61,65 @@ SchemaMethod Schema::getMethod(uint32_t i) const { return impl->getMethod(i); } // Impl Method Bodies //======================================================================================== -SchemaImpl::SchemaImpl(const qpid::types::Variant::Map&) : finalized(true) +SchemaImpl::SchemaImpl(const Variant::Map& map) : finalized(false) { + Variant::Map::const_iterator iter; + Variant::List::const_iterator lIter; + + iter = map.find("_schema_id"); + if (iter == map.end()) + throw QmfException("Schema map missing _schema_id element"); + schemaId = SchemaId(new SchemaIdImpl(iter->second.asMap())); + + iter = map.find("_desc"); + if (iter != map.end()) + description = iter->second.asString(); + + iter = map.find("_default_severity"); + if (iter != map.end()) + defaultSeverity = int(iter->second.asUint32()); + + iter = map.find("_properties"); + if (iter != map.end()) { + const Variant::List& props(iter->second.asList()); + for (lIter = props.begin(); lIter != props.end(); lIter++) + addProperty(SchemaProperty(new SchemaPropertyImpl(lIter->asMap()))); + } + + iter = map.find("_methods"); + if (iter != map.end()) { + const Variant::List& meths(iter->second.asList()); + for (lIter = meths.begin(); lIter != meths.end(); lIter++) + addMethod(SchemaMethod(new SchemaMethodImpl(lIter->asMap()))); + } + + finalized = true; +} + + +Variant::Map SchemaImpl::asMap() const +{ + Variant::Map map; + Variant::List propList; + Variant::List methList; + + checkNotFinal(); + + map["_schema_id"] = SchemaIdImplAccess::get(schemaId).asMap(); + if (!description.empty()) + map["_desc"] = description; + if (schemaId.getType() == SCHEMA_TYPE_EVENT) + map["_default_severity"] = uint32_t(defaultSeverity); + + for (list<SchemaProperty>::const_iterator pIter = properties.begin(); pIter != properties.end(); pIter++) + propList.push_back(SchemaPropertyImplAccess::get(*pIter).asMap()); + + for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++) + methList.push_back(SchemaMethodImplAccess::get(*mIter).asMap()); + + map["_properties"] = propList; + map["_methods"] = methList; + return map; } diff --git a/qpid/cpp/src/qmf/SchemaIdImpl.h b/qpid/cpp/src/qmf/SchemaIdImpl.h index df3cc076b9..ae1a3d8d3b 100644 --- a/qpid/cpp/src/qmf/SchemaIdImpl.h +++ b/qpid/cpp/src/qmf/SchemaIdImpl.h @@ -69,6 +69,15 @@ namespace qmf { return lhs.getHash() < rhs.getHash(); } }; + + struct SchemaIdCompareNoHash { + bool operator() (const SchemaId& lhs, const SchemaId& rhs) const + { + if (lhs.getName() != rhs.getName()) + return lhs.getName() < rhs.getName(); + return lhs.getPackageName() < rhs.getPackageName(); + } + }; } #endif diff --git a/qpid/cpp/src/qmf/SchemaImpl.h b/qpid/cpp/src/qmf/SchemaImpl.h index eae3a3c37f..267a5d5138 100644 --- a/qpid/cpp/src/qmf/SchemaImpl.h +++ b/qpid/cpp/src/qmf/SchemaImpl.h @@ -43,6 +43,7 @@ namespace qmf { // Impl-only public methods // SchemaImpl(const qpid::types::Variant::Map& m); + qpid::types::Variant::Map asMap() const; SchemaImpl(qpid::management::Buffer& v1Buffer); std::string asV1Content(uint32_t sequence) const; diff --git a/qpid/cpp/src/qmf/SchemaMethod.cpp b/qpid/cpp/src/qmf/SchemaMethod.cpp index 7ee6646ec4..e8dcfd2401 100644 --- a/qpid/cpp/src/qmf/SchemaMethod.cpp +++ b/qpid/cpp/src/qmf/SchemaMethod.cpp @@ -68,10 +68,48 @@ SchemaMethodImpl::SchemaMethodImpl(const string& n, const string& options) : nam } } -SchemaMethodImpl::SchemaMethodImpl(const qpid::types::Variant::Map&) + +SchemaMethodImpl::SchemaMethodImpl(const qpid::types::Variant::Map& map) +{ + Variant::Map::const_iterator iter; + Variant::List::const_iterator lIter; + + iter = map.find("_name"); + if (iter == map.end()) + throw QmfException("SchemaMethod without a _name element"); + name = iter->second.asString(); + + iter = map.find("_desc"); + if (iter != map.end()) + desc = iter->second.asString(); + + iter = map.find("_arguments"); + if (iter != map.end()) { + const Variant::List& argList(iter->second.asList()); + for (lIter = argList.begin(); lIter != argList.end(); lIter++) + addArgument(SchemaProperty(new SchemaPropertyImpl(lIter->asMap()))); + } +} + + +Variant::Map SchemaMethodImpl::asMap() const { + Variant::Map map; + Variant::List argList; + + map["_name"] = name; + + if (!desc.empty()) + map["_desc"] = desc; + + for (list<SchemaProperty>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++) + argList.push_back(SchemaPropertyImplAccess::get(*iter).asMap()); + map["_arguments"] = argList; + + return map; } + SchemaMethodImpl::SchemaMethodImpl(qpid::management::Buffer& buffer) { Variant::Map::const_iterator iter; diff --git a/qpid/cpp/src/qmf/SchemaMethodImpl.h b/qpid/cpp/src/qmf/SchemaMethodImpl.h index 4b0ff9134d..930d48509c 100644 --- a/qpid/cpp/src/qmf/SchemaMethodImpl.h +++ b/qpid/cpp/src/qmf/SchemaMethodImpl.h @@ -43,6 +43,7 @@ namespace qmf { // SchemaMethodImpl(const qpid::types::Variant::Map& m); SchemaMethodImpl(qpid::management::Buffer& v1Buffer); + qpid::types::Variant::Map asMap() const; void updateHash(Hash&) const; void encodeV1(qpid::management::Buffer&) const; diff --git a/qpid/cpp/src/qmf/SchemaProperty.cpp b/qpid/cpp/src/qmf/SchemaProperty.cpp index 244115b8a9..106127261b 100644 --- a/qpid/cpp/src/qmf/SchemaProperty.cpp +++ b/qpid/cpp/src/qmf/SchemaProperty.cpp @@ -51,6 +51,7 @@ void SchemaProperty::setSubtype(const string& s) { impl->setSubtype(s); } void SchemaProperty::setDirection(int d) { impl->setDirection(d); } const string& SchemaProperty::getName() const { return impl->getName(); } +int SchemaProperty::getType() const { return impl->getType(); } int SchemaProperty::getAccess() const { return impl->getAccess(); } bool SchemaProperty::isIndex() const { return impl->isIndex(); } bool SchemaProperty::isOptional() const { return impl->isOptional(); } @@ -132,9 +133,115 @@ SchemaPropertyImpl::SchemaPropertyImpl(const string& n, int t, const string opti } -SchemaPropertyImpl::SchemaPropertyImpl(const Variant::Map&) : +SchemaPropertyImpl::SchemaPropertyImpl(const Variant::Map& map) : access(ACCESS_READ_ONLY), index(false), optional(false), direction(DIR_IN) { + Variant::Map::const_iterator iter; + + iter = map.find("_name"); + if (iter == map.end()) + throw QmfException("SchemaProperty without a _name element"); + name = iter->second.asString(); + + iter = map.find("_type"); + if (iter == map.end()) + throw QmfException("SchemaProperty without a _type element"); + const string& ts(iter->second.asString()); + if (ts == "TYPE_VOID") dataType = SCHEMA_DATA_VOID; + else if (ts == "TYPE_BOOL") dataType = SCHEMA_DATA_BOOL; + else if (ts == "TYPE_INT") dataType = SCHEMA_DATA_INT; + else if (ts == "TYPE_FLOAT") dataType = SCHEMA_DATA_FLOAT; + else if (ts == "TYPE_STRING") dataType = SCHEMA_DATA_STRING; + else if (ts == "TYPE_MAP") dataType = SCHEMA_DATA_MAP; + else if (ts == "TYPE_LIST") dataType = SCHEMA_DATA_LIST; + else if (ts == "TYPE_UUID") dataType = SCHEMA_DATA_UUID; + else + throw QmfException("SchemaProperty with an invalid type code: " + ts); + + iter = map.find("_access"); + if (iter != map.end()) { + const string& as(iter->second.asString()); + if (as == "RO") access = ACCESS_READ_ONLY; + else if (as == "RC") access = ACCESS_READ_CREATE; + else if (as == "RW") access = ACCESS_READ_WRITE; + else + throw QmfException("SchemaProperty with an invalid access code: " + as); + } + + iter = map.find("_unit"); + if (iter != map.end()) + unit = iter->second.asString(); + + iter = map.find("_dir"); + if (iter != map.end()) { + const string& ds(iter->second.asString()); + if (ds == "I") direction = DIR_IN; + else if (ds == "O") direction = DIR_OUT; + else if (ds == "IO") direction = DIR_IN_OUT; + else + throw QmfException("SchemaProperty with an invalid direction code: " + ds); + } + + iter = map.find("_desc"); + if (iter != map.end()) + desc = iter->second.asString(); + + iter = map.find("_index"); + if (iter != map.end()) + index = iter->second.asBool(); + + iter = map.find("_subtype"); + if (iter != map.end()) + subtype = iter->second.asString(); +} + + +Variant::Map SchemaPropertyImpl::asMap() const +{ + Variant::Map map; + string ts; + + map["_name"] = name; + + switch (dataType) { + case SCHEMA_DATA_VOID: ts = "TYPE_VOID"; break; + case SCHEMA_DATA_BOOL: ts = "TYPE_BOOL"; break; + case SCHEMA_DATA_INT: ts = "TYPE_INT"; break; + case SCHEMA_DATA_FLOAT: ts = "TYPE_FLOAT"; break; + case SCHEMA_DATA_STRING: ts = "TYPE_STRING"; break; + case SCHEMA_DATA_MAP: ts = "TYPE_MAP"; break; + case SCHEMA_DATA_LIST: ts = "TYPE_LIST"; break; + case SCHEMA_DATA_UUID: ts = "TYPE_UUID"; break; + } + map["_type"] = ts; + + switch (access) { + case ACCESS_READ_ONLY: ts = "RO"; break; + case ACCESS_READ_CREATE: ts = "RC"; break; + case ACCESS_READ_WRITE: ts = "RW"; break; + } + map["_access"] = ts; + + if (!unit.empty()) + map["_unit"] = unit; + + switch (direction) { + case DIR_IN: ts = "I"; break; + case DIR_OUT: ts = "O"; break; + case DIR_IN_OUT: ts = "IO"; break; + } + map["_dir"] = ts; + + if (!desc.empty()) + map["_desc"] = desc; + + if (index) + map["_index"] = true; + + if (!subtype.empty()) + map["_subtype"] = subtype; + + return map; } diff --git a/qpid/cpp/src/qmf/SchemaPropertyImpl.h b/qpid/cpp/src/qmf/SchemaPropertyImpl.h index 94994c722d..cdfc29066f 100644 --- a/qpid/cpp/src/qmf/SchemaPropertyImpl.h +++ b/qpid/cpp/src/qmf/SchemaPropertyImpl.h @@ -42,6 +42,7 @@ namespace qmf { // SchemaPropertyImpl(const qpid::types::Variant::Map& m); SchemaPropertyImpl(qpid::management::Buffer& v1Buffer); + qpid::types::Variant::Map asMap() const; void updateHash(Hash&) const; void encodeV1(qpid::management::Buffer&, bool isArg, bool isMethodArg) const; @@ -58,6 +59,7 @@ namespace qmf { void setDirection(int d) { direction = d; } const std::string& getName() const { return name; } + int getType() const { return dataType; } int getAccess() const { return access; } bool isIndex() const { return index; } bool isOptional() const { return optional; } @@ -65,6 +67,7 @@ namespace qmf { const std::string& getDesc() const { return desc; } const std::string& getSubtype() const { return subtype; } int getDirection() const { return direction; } + private: std::string name; int dataType; diff --git a/qpid/cpp/src/qmf/agentCapability.h b/qpid/cpp/src/qmf/agentCapability.h new file mode 100644 index 0000000000..6a3f6f8534 --- /dev/null +++ b/qpid/cpp/src/qmf/agentCapability.h @@ -0,0 +1,39 @@ +#ifndef QMF_AGENT_CAPABILITY_H +#define QMF_AGENT_CAPABILITY_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qmf { + + /** + * Legacy (Qpid 0.7 C++ Agent, 0.7 Broker Agent) capabilities + */ + const uint32_t AGENT_CAPABILITY_LEGACY = 0; + + /** + * Qpid 0.8 QMFv2 capabilities + */ + const uint32_t AGENT_CAPABILITY_0_8 = 1; + const uint32_t AGENT_CAPABILITY_V2_SCHEMA = 1; + const uint32_t AGENT_CAPABILITY_AGENT_PREDICATE = 1; +} + +#endif diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 0f44089005..0e61a40c29 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -30,6 +30,7 @@ lib_messaging = $(abs_builddir)/../libqpidmessaging.la lib_common = $(abs_builddir)/../libqpidcommon.la lib_broker = $(abs_builddir)/../libqpidbroker.la lib_console = $(abs_builddir)/../libqmfconsole.la +lib_qmf2 = $(abs_builddir)/../libqmf2.la # lib_amqp_0_10 = $(abs_builddir)/../libqpidamqp_0_10.la # @@ -66,7 +67,7 @@ tmodule_LTLIBRARIES= TESTS+=unit_test check_PROGRAMS+=unit_test unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \ - $(lib_messaging) $(lib_broker) $(lib_console) + $(lib_messaging) $(lib_broker) $(lib_console) $(lib_qmf2) unit_test_SOURCES= unit_test.cpp unit_test.h \ MessagingSessionTests.cpp \ @@ -121,7 +122,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ PollableCondition.cpp \ Variant.cpp \ Address.cpp \ - ClientMessage.cpp + ClientMessage.cpp \ + Qmf2.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp diff --git a/qpid/cpp/src/tests/Qmf2.cpp b/qpid/cpp/src/tests/Qmf2.cpp new file mode 100644 index 0000000000..e56a35db5c --- /dev/null +++ b/qpid/cpp/src/tests/Qmf2.cpp @@ -0,0 +1,320 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include "qpid/types/Variant.h" +#include "qmf/QueryImpl.h" +#include "qmf/SchemaImpl.h" +#include "qmf/exceptions.h" + +#include "unit_test.h" + +using namespace qpid::types; +using namespace qmf; + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(Qmf2Suite) + +QPID_AUTO_TEST_CASE(testQuery) +{ + Query query(QUERY_OBJECT, "class_name", "package_name", "[and, [eq, name, [quote, smith]], [lt, age, [quote, 27]]]"); + Query newQuery(new QueryImpl(QueryImplAccess::get(query).asMap())); + + BOOST_CHECK_EQUAL(newQuery.getTarget(), QUERY_OBJECT); + BOOST_CHECK_EQUAL(newQuery.getSchemaId().getName(), "class_name"); + BOOST_CHECK_EQUAL(newQuery.getSchemaId().getPackageName(), "package_name"); + + Variant::List pred(newQuery.getPredicate()); + BOOST_CHECK_EQUAL(pred.size(), 3); + + Variant::List::iterator iter(pred.begin()); + BOOST_CHECK_EQUAL(iter->asString(), "and"); + iter++; + BOOST_CHECK_EQUAL(iter->getType(), VAR_LIST); + iter++; + BOOST_CHECK_EQUAL(iter->getType(), VAR_LIST); + iter = iter->asList().begin(); + BOOST_CHECK_EQUAL(iter->asString(), "lt"); + iter++; + BOOST_CHECK_EQUAL(iter->asString(), "age"); + iter++; + BOOST_CHECK_EQUAL(iter->getType(), VAR_LIST); + iter = iter->asList().begin(); + BOOST_CHECK_EQUAL(iter->asString(), "quote"); + iter++; + BOOST_CHECK_EQUAL(iter->asUint32(), 27); + + Query query2(QUERY_OBJECT_ID); + Query newQuery2(new QueryImpl(QueryImplAccess::get(query2).asMap())); + BOOST_CHECK_EQUAL(newQuery2.getTarget(), QUERY_OBJECT_ID); + + Query query3(QUERY_SCHEMA); + Query newQuery3(new QueryImpl(QueryImplAccess::get(query3).asMap())); + BOOST_CHECK_EQUAL(newQuery3.getTarget(), QUERY_SCHEMA); + + Query query4(QUERY_SCHEMA_ID); + Query newQuery4(new QueryImpl(QueryImplAccess::get(query4).asMap())); + BOOST_CHECK_EQUAL(newQuery4.getTarget(), QUERY_SCHEMA_ID); + + DataAddr addr("name", "agent_name", 34); + Query query5(addr); + Query newQuery5(new QueryImpl(QueryImplAccess::get(query5).asMap())); + BOOST_CHECK_EQUAL(newQuery5.getTarget(), QUERY_OBJECT); + BOOST_CHECK_EQUAL(newQuery5.getDataAddr().getName(), "name"); + BOOST_CHECK_EQUAL(newQuery5.getDataAddr().getAgentName(), "agent_name"); + BOOST_CHECK_EQUAL(newQuery5.getDataAddr().getAgentEpoch(), 34); +} + +QPID_AUTO_TEST_CASE(testQueryPredicateErrors) +{ + Query query; + Variant::Map map; + + BOOST_CHECK_THROW(Query(QUERY_OBJECT, "INVALID"), QmfException); + query = Query(QUERY_OBJECT, "[unknown, one, two]"); + BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException); + + query = Query(QUERY_OBJECT, "[eq, first]"); + BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException); + + query = Query(QUERY_OBJECT, "[exists]"); + BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException); + + query = Query(QUERY_OBJECT, "[eq, first, [quote, 1, 2, 3]]]"); + BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException); + + query = Query(QUERY_OBJECT, "[eq, first, [unexpected, 3]]]"); + BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException); + + query = Query(QUERY_OBJECT, "[eq, first, {}]]"); + BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException); + + query = Query(QUERY_OBJECT, "[eq, first, second, third]"); + BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException); + + query = Query(QUERY_OBJECT, "[and, first, second, third]"); + BOOST_CHECK_THROW(query.matchesPredicate(map), QmfException); +} + +QPID_AUTO_TEST_CASE(testQueryPredicate) +{ + Query query; + Variant::Map map; + + map["forty"] = 40; + map["fifty"] = 50; + map["minus_ten"] = -10; + map["pos_float"] = 100.05; + map["neg_float"] = -1000.33; + map["name"] = "jones"; + map["bool_t"] = true; + map["bool_f"] = false; + + BOOST_CHECK_THROW(Query(QUERY_OBJECT, "INVALID"), QmfException); + + query = Query(QUERY_OBJECT); + BOOST_CHECK_EQUAL(query.matchesPredicate(Variant::Map()), true); + + query = Query(QUERY_OBJECT, "[eq, forty, [quote, 40]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[eq, forty, [quote, 41]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), false); + + query = Query(QUERY_OBJECT, "[le, forty, fifty]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[and, [eq, forty, [quote, 40]], [eq, name, [quote, jones]]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[and, [eq, forty, [quote, 40]], [eq, name, [quote, smith]]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), false); + + query = Query(QUERY_OBJECT, "[or, [eq, forty, [quote, 40]], [eq, name, [quote, smith]]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[or, [eq, forty, [quote, 41]], [eq, name, [quote, smith]]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), false); + + query = Query(QUERY_OBJECT, "[not, [le, forty, [quote, 40]]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), false); + + query = Query(QUERY_OBJECT, "[le, forty, [quote, 40]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[ge, forty, [quote, 40]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[lt, forty, [quote, 45]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[lt, [quote, 45], forty]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), false); + + query = Query(QUERY_OBJECT, "[gt, forty, [quote, 45]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), false); + + query = Query(QUERY_OBJECT, "[gt, [quote, 45], forty]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[eq, bool_t, [quote, True]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[eq, bool_t, [quote, False]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), false); + + query = Query(QUERY_OBJECT, "[eq, bool_f, [quote, True]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), false); + + query = Query(QUERY_OBJECT, "[eq, bool_f, [quote, False]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[eq, minus_ten, [quote, -10]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[lt, minus_ten, [quote, -20]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), false); + + query = Query(QUERY_OBJECT, "[lt, [quote, -20], minus_ten]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[exists, name]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); + + query = Query(QUERY_OBJECT, "[exists, nonexfield]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), false); + + query = Query(QUERY_OBJECT, "[eq, pos_float, [quote, 100.05]]"); + BOOST_CHECK_EQUAL(query.matchesPredicate(map), true); +} + +QPID_AUTO_TEST_CASE(testSchema) +{ + Schema in(SCHEMA_TYPE_DATA, "package", "class"); + in.addProperty(SchemaProperty("prop1", SCHEMA_DATA_BOOL, "{desc:'Property One'}")); + in.addProperty(SchemaProperty("prop2", SCHEMA_DATA_INT, "{desc:'Property Two',unit:'Furlong'}")); + in.addProperty(SchemaProperty("prop3", SCHEMA_DATA_STRING, "{desc:'Property Three'}")); + + SchemaMethod method1("method1", "{desc:'Method One'}"); + method1.addArgument(SchemaProperty("arg1", SCHEMA_DATA_BOOL, "{desc:'Argument One',dir:IN}")); + method1.addArgument(SchemaProperty("arg2", SCHEMA_DATA_INT, "{desc:'Argument Two',dir:OUT}")); + method1.addArgument(SchemaProperty("arg3", SCHEMA_DATA_FLOAT, "{desc:'Argument Three',dir:INOUT}")); + in.addMethod(method1); + + SchemaMethod method2("method2", "{desc:'Method Two'}"); + method2.addArgument(SchemaProperty("arg21", SCHEMA_DATA_BOOL, "{desc:'Argument One',dir:IN}")); + method2.addArgument(SchemaProperty("arg22", SCHEMA_DATA_INT, "{desc:'Argument Two',dir:OUT}")); + method2.addArgument(SchemaProperty("arg23", SCHEMA_DATA_FLOAT, "{desc:'Argument Three',dir:INOUT}")); + in.addMethod(method2); + + BOOST_CHECK(!in.isFinalized()); + in.finalize(); + BOOST_CHECK(in.isFinalized()); + + Variant::Map map(SchemaImplAccess::get(in).asMap()); + Schema out(new SchemaImpl(map)); + + BOOST_CHECK(out.isFinalized()); + BOOST_CHECK_EQUAL(out.getSchemaId().getType(), SCHEMA_TYPE_DATA); + BOOST_CHECK_EQUAL(out.getSchemaId().getPackageName(), "package"); + BOOST_CHECK_EQUAL(out.getSchemaId().getName(), "class"); + BOOST_CHECK_EQUAL(out.getSchemaId().getHash(), in.getSchemaId().getHash()); + + BOOST_CHECK_EQUAL(out.getPropertyCount(), 3); + SchemaProperty prop; + + prop = out.getProperty(0); + BOOST_CHECK_EQUAL(prop.getName(), "prop1"); + BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_BOOL); + BOOST_CHECK_EQUAL(prop.getDesc(), "Property One"); + + prop = out.getProperty(1); + BOOST_CHECK_EQUAL(prop.getName(), "prop2"); + BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_INT); + BOOST_CHECK_EQUAL(prop.getDesc(), "Property Two"); + BOOST_CHECK_EQUAL(prop.getUnit(), "Furlong"); + BOOST_CHECK(!prop.isIndex()); + + prop = out.getProperty(2); + BOOST_CHECK_EQUAL(prop.getName(), "prop3"); + BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_STRING); + BOOST_CHECK_EQUAL(prop.getDesc(), "Property Three"); + + BOOST_CHECK_THROW(out.getProperty(3), QmfException); + + BOOST_CHECK_EQUAL(out.getMethodCount(), 2); + SchemaMethod method; + + method = out.getMethod(0); + BOOST_CHECK_EQUAL(method.getName(), "method1"); + BOOST_CHECK_EQUAL(method.getDesc(), "Method One"); + BOOST_CHECK_EQUAL(method.getArgumentCount(), 3); + + prop = method.getArgument(0); + BOOST_CHECK_EQUAL(prop.getName(), "arg1"); + BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_BOOL); + BOOST_CHECK_EQUAL(prop.getDesc(), "Argument One"); + BOOST_CHECK_EQUAL(prop.getDirection(), DIR_IN); + + prop = method.getArgument(1); + BOOST_CHECK_EQUAL(prop.getName(), "arg2"); + BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_INT); + BOOST_CHECK_EQUAL(prop.getDesc(), "Argument Two"); + BOOST_CHECK_EQUAL(prop.getDirection(), DIR_OUT); + + prop = method.getArgument(2); + BOOST_CHECK_EQUAL(prop.getName(), "arg3"); + BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_FLOAT); + BOOST_CHECK_EQUAL(prop.getDesc(), "Argument Three"); + BOOST_CHECK_EQUAL(prop.getDirection(), DIR_IN_OUT); + + BOOST_CHECK_THROW(method.getArgument(3), QmfException); + + method = out.getMethod(1); + BOOST_CHECK_EQUAL(method.getName(), "method2"); + BOOST_CHECK_EQUAL(method.getDesc(), "Method Two"); + BOOST_CHECK_EQUAL(method.getArgumentCount(), 3); + + prop = method.getArgument(0); + BOOST_CHECK_EQUAL(prop.getName(), "arg21"); + BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_BOOL); + BOOST_CHECK_EQUAL(prop.getDesc(), "Argument One"); + BOOST_CHECK_EQUAL(prop.getDirection(), DIR_IN); + + prop = method.getArgument(1); + BOOST_CHECK_EQUAL(prop.getName(), "arg22"); + BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_INT); + BOOST_CHECK_EQUAL(prop.getDesc(), "Argument Two"); + BOOST_CHECK_EQUAL(prop.getDirection(), DIR_OUT); + + prop = method.getArgument(2); + BOOST_CHECK_EQUAL(prop.getName(), "arg23"); + BOOST_CHECK_EQUAL(prop.getType(), SCHEMA_DATA_FLOAT); + BOOST_CHECK_EQUAL(prop.getDesc(), "Argument Three"); + BOOST_CHECK_EQUAL(prop.getDirection(), DIR_IN_OUT); + + BOOST_CHECK_THROW(method.getArgument(3), QmfException); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests |