diff options
author | Ted Ross <tross@apache.org> | 2010-03-02 00:58:16 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-03-02 00:58:16 +0000 |
commit | 10e6d0d251e8da42ab3e401dc75b0661bcee301e (patch) | |
tree | 79fd4e86bb55144d6ec9a8a6e3425b37dd5a3ed2 | |
parent | acf3a1931ec404d1b02a2e115ef18e531d3924e4 (diff) | |
download | qpid-python-10e6d0d251e8da42ab3e401dc75b0661bcee301e.tar.gz |
Further implementation of the QMFv2 agent engine.
- deprecated old ObjectId class
- renamed Object to Data
- added hooks for authorization of get, subscribe, and method call
- added infrastructure for optional internal storage
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@917825 13f79535-47bb-0310-9956-ffa450edef68
20 files changed, 389 insertions, 721 deletions
diff --git a/qpid/cpp/include/qmf/Notifiable.h b/qpid/cpp/include/qmf/Notifiable.h index 43f546d9cd..e3b075b49e 100644 --- a/qpid/cpp/include/qmf/Notifiable.h +++ b/qpid/cpp/include/qmf/Notifiable.h @@ -40,7 +40,7 @@ namespace qmf { */ class Notifiable { public: - QMF_EXTERN virtual ~Notifiable(); + QMF_EXTERN virtual ~Notifiable() {} virtual void notify() = 0; }; } diff --git a/qpid/cpp/include/qmf/Protocol.h b/qpid/cpp/include/qmf/Protocol.h index 361a7bd283..e2ec287c00 100644 --- a/qpid/cpp/include/qmf/Protocol.h +++ b/qpid/cpp/include/qmf/Protocol.h @@ -71,6 +71,7 @@ namespace qmf { /** * QMF Op Codes */ + const static std::string OP_EXCEPTION; const static std::string OP_AGENT_LOCATE_REQUEST; const static std::string OP_AGENT_LOCATE_RESPONSE; const static std::string OP_AGENT_HEARTBEAT_INDICATION; diff --git a/qpid/cpp/include/qmf/engine/Agent.h b/qpid/cpp/include/qmf/engine/Agent.h index 4585ce51cc..d7fff97ad1 100644 --- a/qpid/cpp/include/qmf/engine/Agent.h +++ b/qpid/cpp/include/qmf/engine/Agent.h @@ -22,9 +22,7 @@ #include <qmf/Notifiable.h> #include <qmf/engine/Schema.h> -#include <qmf/engine/ObjectId.h> -#include <qmf/engine/Object.h> -#include <qmf/engine/Event.h> +#include <qmf/engine/Data.h> #include <qmf/engine/Query.h> #include <qpid/messaging/Connection.h> #include <qpid/messaging/Variant.h> @@ -40,10 +38,13 @@ namespace engine { */ struct AgentEvent { enum EventKind { - GET_QUERY = 1, - START_SYNC = 2, - END_SYNC = 3, - METHOD_CALL = 4 + GET_QUERY = 1, + START_SYNC = 2, + END_SYNC = 3, + METHOD_CALL = 4, + GET_AUTHORIZE = 5, + METHOD_AUTHORIZE = 6, + SYNC_AUTHORIZE = 7 }; EventKind kind; @@ -52,7 +53,7 @@ namespace engine { char* authToken; // Authentication token if issued (for all kinds) char* name; // Name of the method/sync query // (METHOD_CALL, START_SYNC, END_SYNC) - Object* object; // Object involved in method call (METHOD_CALL) + Data* object; // Object involved in method call (METHOD_CALL) char* objectKey; // Object key for method call (METHOD_CALL) Query* query; // Query parameters (GET_QUERY, START_SYNC) qpid::messaging::Variant::Map* arguments; // Method parameters (METHOD_CALL) @@ -128,6 +129,21 @@ namespace engine { void setConnection(qpid::messaging::Connection& conn); /** + * Respond to an authorize request by allowing the requested action. + *@param sequence The sequence number from the authorization request event. + */ + void authAllow(uint32_t sequence); + + /** + * Respond to an authorize request by denying the requested action. + *@param sequence The sequence number from the authorization request event. + *@param exception Value (typically a string) describing the reason for the + * rejection of authorization. + */ + void authDeny(uint32_t sequence, const Data& exception=Data()); + void authDeny(uint32_t sequence, const char* error); + + /** * Respond to a method request. *@param sequence The sequence number from the method request event. *@param status The method's completion status. @@ -143,7 +159,7 @@ namespace engine { *@param sequence The sequence number of the GET request or the SYNC_START request. *@param object The object (annotated with "changed" flags) for publication. */ - void queryResponse(uint32_t sequence, Object& object); + void queryResponse(uint32_t sequence, Data& object); /** * Indicate the completion of a query. This is not used for SYNC_START requests. @@ -165,13 +181,13 @@ namespace engine { * left null, the agent will create a unique name for the object. *@return The key for the managed object. */ - const char* addObject(Object& obj, const char* key=0); + const char* addObject(Data& obj, const char* key=0); /** * Raise an event into the QMF network.. *@param event The event object for the event to be raised. */ - void raiseEvent(Event& event); + void raiseEvent(Data& event); private: AgentImpl* impl; diff --git a/qpid/cpp/include/qmf/engine/Object.h b/qpid/cpp/include/qmf/engine/Data.h index 61d4f3d75c..30f2093df7 100644 --- a/qpid/cpp/include/qmf/engine/Object.h +++ b/qpid/cpp/include/qmf/engine/Data.h @@ -1,5 +1,5 @@ -#ifndef _QmfEngineObject_ -#define _QmfEngineObject_ +#ifndef _QmfEngineData_ +#define _QmfEngineData_ /* * Licensed to the Apache Software Foundation (ASF) under one @@ -21,23 +21,25 @@ */ #include <qmf/engine/Schema.h> -#include <qmf/engine/ObjectId.h> #include <qpid/messaging/Variant.h> namespace qmf { namespace engine { - struct ObjectImpl; - class Object { + struct DataImpl; + class Data { public: - Object(); - Object(SchemaClass* type); - Object(const Object& from); - virtual ~Object(); + Data(); + Data(SchemaClass* type, const qpid::messaging::Variant::Map& values=qpid::messaging::Variant::Map()); + Data(const Data& from); + virtual ~Data(); const qpid::messaging::Variant::Map& getValues() const; qpid::messaging::Variant::Map& getValues(); + const qpid::messaging::Variant::Map& getSubtypes() const; + qpid::messaging::Variant::Map& getSubtypes(); + const SchemaClass* getSchema() const; void setSchema(SchemaClass* schema); @@ -47,10 +49,12 @@ namespace engine { void touch(); void destroy(); + qpid::messaging::Variant::Map asMap() const; + private: - friend struct ObjectImpl; + friend struct DataImpl; friend class AgentImpl; - ObjectImpl* impl; + DataImpl* impl; }; } } diff --git a/qpid/cpp/include/qmf/engine/Event.h b/qpid/cpp/include/qmf/engine/Event.h deleted file mode 100644 index 5096e3e064..0000000000 --- a/qpid/cpp/include/qmf/engine/Event.h +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef _QmfEngineEvent_ -#define _QmfEngineEvent_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace qmf { -namespace engine { - - class SchemaClass; - class Value; - struct EventImpl; - - class Event { - public: - Event(const SchemaClass* type); - Event(const Event& from); - ~Event(); - - const SchemaClass* getClass() const; - Value* getValue(const char* key) const; - - private: - friend struct EventImpl; - friend class AgentImpl; - Event(EventImpl* impl); - EventImpl* impl; - }; -} -} - -#endif - diff --git a/qpid/cpp/include/qmf/engine/ObjectId.h b/qpid/cpp/include/qmf/engine/ObjectId.h deleted file mode 100644 index 51eb2bc9e7..0000000000 --- a/qpid/cpp/include/qmf/engine/ObjectId.h +++ /dev/null @@ -1,68 +0,0 @@ -#ifndef _QmfEngineObjectId_ -#define _QmfEngineObjectId_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/sys/IntegerTypes.h> - -namespace qmf { -namespace engine { - - // TODO: Add to/from string and << operator - - struct ObjectIdImpl; - class ObjectId { - public: - ObjectId(); - ObjectId(const ObjectId& from); - ~ObjectId(); - - uint64_t getObjectNum() const; - uint32_t getObjectNumHi() const; - uint32_t getObjectNumLo() const; - bool isDurable() const; - const char* str() const; - uint8_t getFlags() const; - uint16_t getSequence() const; - uint32_t getBrokerBank() const; - uint32_t getAgentBank() const; - - bool operator==(const ObjectId& other) const; - bool operator<(const ObjectId& other) const; - bool operator>(const ObjectId& other) const; - bool operator<=(const ObjectId& other) const; - bool operator>=(const ObjectId& other) const; - ObjectId& operator=(const ObjectId &other); - - private: - friend struct ObjectIdImpl; - friend struct ObjectImpl; - friend class BrokerProxyImpl; - friend struct QueryImpl; - friend struct ValueImpl; - friend class AgentImpl; - ObjectId(ObjectIdImpl* impl); - ObjectIdImpl* impl; - }; -} -} - -#endif - diff --git a/qpid/cpp/include/qmf/engine/Query.h b/qpid/cpp/include/qmf/engine/Query.h index 8954a08285..7c22360b03 100644 --- a/qpid/cpp/include/qmf/engine/Query.h +++ b/qpid/cpp/include/qmf/engine/Query.h @@ -25,7 +25,6 @@ namespace qmf { namespace engine { - class Object; class QueryImpl; class Query { @@ -49,11 +48,12 @@ namespace engine { const char* getOrderBy() const; bool getDecreasing() const; - bool matches(const Object& object) const; + bool matches(const qpid::messaging::Variant::Map& data) const; private: friend struct QueryImpl; friend struct BrokerProxyImpl; + Query(QueryImpl*); QueryImpl* impl; }; } diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk index 80e4f5bc46..3106d0bcb9 100644 --- a/qpid/cpp/src/qmf.mk +++ b/qpid/cpp/src/qmf.mk @@ -40,16 +40,13 @@ QMF_API = \ QMF_ENGINE_API = \ ../include/qmf/engine/Agent.h \ ../include/qmf/engine/Console.h \ - ../include/qmf/engine/Event.h \ - ../include/qmf/engine/Object.h \ + ../include/qmf/engine/Data.h \ ../include/qmf/engine/QmfEngineImportExport.h \ ../include/qmf/engine/Query.h \ ../include/qmf/engine/Schema.h \ ../include/qmf/Agent.h \ ../include/qmf/Notifiable.h -# ../include/qmf/engine/ObjectId.h - # Public header files nobase_include_HEADERS += \ $(QMF_API) \ @@ -65,8 +62,8 @@ libqmf_la_SOURCES = \ libqmfengine_la_SOURCES = \ $(QMF_ENGINE_API) \ qmf/engine/Agent.cpp \ - qmf/engine/ObjectImpl.cpp \ - qmf/engine/ObjectImpl.h \ + qmf/engine/DataImpl.cpp \ + qmf/engine/DataImpl.h \ qmf/Protocol.cpp \ qmf/Protocol.h \ qmf/engine/QueryImpl.cpp \ @@ -78,8 +75,6 @@ libqmfengine_la_SOURCES = \ # qmf/engine/BrokerProxyImpl.h # qmf/engine/ConsoleImpl.cpp # qmf/engine/ConsoleImpl.h -# qmf/engine/ObjectIdImpl.cpp -# qmf/engine/ObjectIdImpl.h # qmf/engine/SequenceManager.cpp # qmf/engine/SequenceManager.h diff --git a/qpid/cpp/src/qmf/Protocol.cpp b/qpid/cpp/src/qmf/Protocol.cpp index 518d263080..5ab009d6bf 100644 --- a/qpid/cpp/src/qmf/Protocol.cpp +++ b/qpid/cpp/src/qmf/Protocol.cpp @@ -48,6 +48,7 @@ const string Protocol::AMQP_CONTENT_LIST("amqp/list"); const string Protocol::APP_OPCODE("qmf.opcode"); +const string Protocol::OP_EXCEPTION("_exception"); const string Protocol::OP_AGENT_LOCATE_REQUEST("_agent_locate_request"); const string Protocol::OP_AGENT_LOCATE_RESPONSE("_agent_locate_response"); const string Protocol::OP_AGENT_HEARTBEAT_INDICATION("_agent_heartbeat_indication"); diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp index c0c8b69bc8..04308b954a 100644 --- a/qpid/cpp/src/qmf/engine/Agent.cpp +++ b/qpid/cpp/src/qmf/engine/Agent.cpp @@ -19,10 +19,11 @@ #include "qmf/engine/Agent.h" #include "qmf/engine/SchemaImpl.h" -#include "qmf/engine/ObjectImpl.h" +#include "qmf/engine/Data.h" #include "qmf/engine/QueryImpl.h" #include "qmf/Protocol.h" #include <qpid/sys/Mutex.h> +#include <qpid/sys/Condition.h> #include <qpid/log/Statement.h> #include <qpid/sys/Time.h> #include <qpid/sys/Thread.h> @@ -30,8 +31,11 @@ #include <qpid/messaging/Session.h> #include <qpid/messaging/Receiver.h> #include <qpid/messaging/Sender.h> +#include <qpid/messaging/Address.h> #include <qpid/messaging/Message.h> +#include <qpid/messaging/MapContent.h> #include <qpid/messaging/MapView.h> +#include <qpid/messaging/ListView.h> #include <string> #include <deque> #include <map> @@ -48,6 +52,8 @@ using namespace qpid::messaging; namespace qmf { namespace engine { + class AgentImpl; + struct AgentEventImpl { typedef boost::shared_ptr<AgentEventImpl> Ptr; AgentEvent::EventKind kind; @@ -55,7 +61,7 @@ namespace engine { string authUserId; string authToken; string name; - Object* object; + Data* object; string objectKey; boost::shared_ptr<Query> query; boost::shared_ptr<Variant::Map> arguments; @@ -68,15 +74,31 @@ namespace engine { }; /** - * AgentQueryContext is used to track asynchronous requests (Query, Sync, or Method) + * AsyncContext is used to track asynchronous requests (Query, Sync, or Method) * sent up to the application. */ - struct AgentQueryContext { - typedef boost::shared_ptr<AgentQueryContext> Ptr; - uint32_t sequence; - string consoleAddr; + struct AsyncContext { + typedef boost::shared_ptr<AsyncContext> Ptr; + string correlationId; + Address replyTo; + AgentEventImpl::Ptr authorizedEvent; const SchemaMethod* schemaMethod; - AgentQueryContext() : schemaMethod(0) {} + AsyncContext(const string& cid, const Address& rt) : correlationId(cid), replyTo(rt), schemaMethod(0) {} + }; + + class StoreThread : public boost::noncopyable, public qpid::sys::Runnable { + public: + StoreThread(AgentImpl& a) : agent(a), running(true), thread(*this) {} + ~StoreThread() { + stop(); + } + void run(); + void stop(); + + private: + AgentImpl& agent; + bool running; + qpid::sys::Thread thread; }; class AgentImpl : public boost::noncopyable, public qpid::sys::Runnable { @@ -92,24 +114,34 @@ namespace engine { bool getEvent(AgentEvent& event) const; void popEvent(); void setConnection(Connection& conn); + void authAllow(uint32_t sequence); + void authDeny(uint32_t sequence, const Data&); + void authDeny(uint32_t sequence, const string&); void methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments); - void queryResponse(uint32_t sequence, Object& object); + void queryResponse(uint32_t sequence, Data& object); void queryComplete(uint32_t sequence); void registerClass(SchemaClass* cls); - const char* addObject(Object& obj, const char* key); - void raiseEvent(Event& event); + const char* addObject(Data& obj, const char* key); + void raiseEvent(Data& event); void run(); void stop(); + // This blocking call is used by the internal store thread(s) to get work to do. + AgentEventImpl::Ptr nextInternalEvent(); + void signalInternal() { cond.notify(); } + private: mutable Mutex lock; - Mutex addLock; + Condition cond; const string vendor; const string product; const string name; const string domain; string directAddr; + string directAddrParams; + string topicAddr; + string topicAddrParams; Variant::Map attrMap; string storeDir; string transferDir; @@ -121,13 +153,15 @@ namespace engine { uint32_t nextContextNum; bool running; deque<AgentEventImpl::Ptr> eventQueue; - map<uint32_t, AgentQueryContext::Ptr> contextMap; + deque<AgentEventImpl::Ptr> internalEventQueue; + map<uint32_t, AsyncContext::Ptr> contextMap; Connection connection; Session session; Receiver directReceiver; Receiver topicReceiver; Sender sender; qpid::sys::Thread* thread; + StoreThread* storeThread; struct AgentClassKey { string name; @@ -169,6 +203,8 @@ namespace engine { void handleSubscribeCancel(const Message& message); void handleSubscribeRefresh(const Message& message); void handleMethodRequest(const Message& message); + void sendResponse(const Message& request, const string& opcode, const Data& data); + void sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data); void sendPackageIndicationLH(const string& packageName); void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key); @@ -207,18 +243,38 @@ AgentEvent AgentEventImpl::copy() return item; } +void StoreThread::run() +{ + while (running) { + AgentEventImpl::Ptr ptr(agent.nextInternalEvent()); + } +} + +void StoreThread::stop() +{ + running = false; + agent.signalInternal(); +} + AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) : vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i), notifyHandler(0), notifiable(0), bootSequence(1), nextContextNum(1), running(true), thread(0) { directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name; - if (_d == 0) { - directAddr += " { create:always }"; + topicAddr = "qmf." + domain + ".topic/console.ind.#"; + if (_d != 0) { + directAddrParams = " {create: always, type: topic, x-properties: {type: direct}}"; + topicAddrParams = " {create: always, type: topic, x-properties: {type: topic}}"; + } + attrMap["_vendor"] = vendor; + attrMap["_product"] = product; + attrMap["_instance"] = name; + attrMap["_name"] = vendor + ":" + product + ":" + name; + + if (internalStore) { + storeThread = new StoreThread(*this); } - attrMap["vendor"] = vendor; - attrMap["product"] = product; - attrMap["name"] = name; } @@ -297,42 +353,93 @@ void AgentImpl::setConnection(Connection& conn) thread = new qpid::sys::Thread(*this); } +void AgentImpl::authAllow(uint32_t sequence) +{ + Mutex::ScopedLock _lock(lock); + + // Find the context associated with the sequence number + map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); + if (iter == contextMap.end()) + return; + AsyncContext::Ptr context = iter->second; + + // Transform the authorize event into the real event + switch (context->authorizedEvent->kind) { + case AgentEvent::GET_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::GET_QUERY; break; + case AgentEvent::METHOD_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::METHOD_CALL; break; + case AgentEvent::SYNC_AUTHORIZE : context->authorizedEvent->kind = AgentEvent::START_SYNC; break; + default: + contextMap.erase(iter); + return; + } + + // Re-issue the now-authorized action. If this is a data query (get or subscribe), + // and the agent is handling storage internally, redirect to the internal event + // queue for processing by the internal-storage thread. + if (internalStore) { + internalEventQueue.push_back(context->authorizedEvent); + cond.notify(); + } else { + eventQueue.push_back(context->authorizedEvent); + notify(); + } +} + +void AgentImpl::authDeny(uint32_t sequence, const Data& exception) +{ + Mutex::ScopedLock _lock(lock); + map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); + if (iter == contextMap.end()) + return; + AsyncContext::Ptr context = iter->second; + contextMap.erase(iter); + + // Return an exception message to the requestor + sendResponse(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, exception); +} + +void AgentImpl::authDeny(uint32_t sequence, const string& error) +{ + Data exception; + exception.getValues()["status"] = "Access to this Operation Denied"; + exception.getValues()["text"] = error; + authDeny(sequence, exception); +} + void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& /*argMap*/) { Mutex::ScopedLock _lock(lock); - map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); + map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); if (iter == contextMap.end()) return; - AgentQueryContext::Ptr context = iter->second; + AsyncContext::Ptr context = iter->second; contextMap.erase(iter); // TODO: Encode method response - QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text); + QPID_LOG(trace, "SENT MethodResponse corr=" << context->correlationId << " status=" << status << " text=" << text); } -void AgentImpl::queryResponse(uint32_t sequence, Object&) +void AgentImpl::queryResponse(uint32_t sequence, Data&) { Mutex::ScopedLock _lock(lock); - map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); + map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); if (iter == contextMap.end()) return; - AgentQueryContext::Ptr context = iter->second; + AsyncContext::Ptr context = iter->second; // TODO: accumulate data records and send response messages when we have "enough" - - QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence); } void AgentImpl::queryComplete(uint32_t sequence) { Mutex::ScopedLock _lock(lock); - map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); + map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); if (iter == contextMap.end()) return; // TODO: send a response message if there are any unsent data records - AgentQueryContext::Ptr context = iter->second; + AsyncContext::Ptr context = iter->second; contextMap.erase(iter); //sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK"); } @@ -354,13 +461,13 @@ void AgentImpl::registerClass(SchemaClass* cls) // TODO: Indicate this schema if connected. } -const char* AgentImpl::addObject(Object&, const char*) +const char* AgentImpl::addObject(Data&, const char*) { Mutex::ScopedLock _lock(lock); return 0; } -void AgentImpl::raiseEvent(Event&) +void AgentImpl::raiseEvent(Data&) { Mutex::ScopedLock _lock(lock); } @@ -370,9 +477,14 @@ void AgentImpl::run() qpid::sys::Duration duration = qpid::sys::TIME_MSEC * 500; session = connection.newSession(); - directReceiver = session.createReceiver(directAddr); + QPID_LOG(trace, "Creating direct receiver to address: " << directAddr << directAddrParams); + directReceiver = session.createReceiver(directAddr + directAddrParams); directReceiver.setCapacity(10); + QPID_LOG(trace, "Creating topic receiver to address: " << topicAddr << topicAddrParams); + topicReceiver = session.createReceiver(topicAddr + topicAddrParams); + topicReceiver.setCapacity(10); + Mutex::ScopedLock _lock(lock); while (running) { Receiver rcvr; @@ -398,12 +510,29 @@ void AgentImpl::stop() running = false; } +AgentEventImpl::Ptr AgentImpl::nextInternalEvent() +{ + Mutex::ScopedLock _lock(lock); + while (internalEventQueue.empty()) + cond.wait(lock); + + AgentEventImpl::Ptr event(internalEventQueue.front()); + internalEventQueue.pop_front(); + return event; + + // TODO: make sure this function returns with a null pointer when the thread needs to stop. +} + + void AgentImpl::handleRcvMessageLH(const Message& message) { Variant::Map headers(message.getHeaders()); - cout << "AgentImpl::handleRcvMessageLH headers=" << headers << endl; + cout << "AgentImpl::handleRcvMessageLH contentType=" << message.getContentType() << + " replyTo=" << message.getReplyTo() << + " headers=" << headers << endl; - if (message.getContentType() != Protocol::AMQP_CONTENT_MAP) + if (message.getContentType() != Protocol::AMQP_CONTENT_MAP && + message.getContentType() != Protocol::AMQP_CONTENT_LIST) return; Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE); @@ -421,16 +550,33 @@ void AgentImpl::handleRcvMessageLH(const Message& message) void AgentImpl::handleAgentLocateLH(const Message& message) { - const MapView predicate(message); - - //if (predicateMatches(predicate, attrMap)) { - // sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, attrMap); - //} + QPID_LOG(trace, "RCVD AgentLocateRequest replyTo=" << message.getReplyTo()); + auto_ptr<Query> query(QueryImpl::factory(ListView(message))); + if (query->matches(attrMap)) { + Data data(0, attrMap); + sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, data); + QPID_LOG(trace, "SENT AgentLocateResponse"); + } } void AgentImpl::handleQueryRequestLH(const Message& message) { - const MapView map(message); + uint32_t contextNum = nextContextNum++; + AsyncContext::Ptr context(new AsyncContext(message.getCorrelationId(), message.getReplyTo())); + contextMap[contextNum] = context; + + // Build the event for the get request + AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_AUTHORIZE)); + event->sequence = contextNum; + event->authUserId = message.getUserId(); + event->query.reset(QueryImpl::factory(MapView(message))); + + // Put the not-yet-authorized event into the context for possible later use + context->authorizedEvent = event; + + // Enqueue the event + eventQueue.push_back(event); + notify(); } void AgentImpl::handleSubscribeRequest(const Message& message) @@ -453,6 +599,22 @@ void AgentImpl::handleMethodRequest(const Message& message) const MapView map(message); } +void AgentImpl::sendResponse(const Message& request, const string& opcode, const Data& data) +{ + sendResponse(request.getReplyTo(), request.getCorrelationId(), opcode, data); +} + +void AgentImpl::sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data) +{ + Message message; + MapContent content(message, data.asMap()); + + message.setCorrelationId(correlationId); + message.getHeaders()[Protocol::APP_OPCODE] = opcode; + content.encode(); + session.createSender(address).send(message); +} + AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); @@ -528,14 +690,8 @@ void AgentImpl::handleGetQuery(Message&, uint32_t, const string&, const string&) Mutex::ScopedLock _lock(lock); } -void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const string& /*replyTo*/, const string& /*userId*/) +void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t, const string& /*replyTo*/, const string& /*userId*/) { - Mutex::ScopedLock _lock(lock); - QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method="); - - AgentQueryContext::Ptr context(new AgentQueryContext); - uint32_t contextNum = nextContextNum++; - contextMap[contextNum] = context; } //================================================================== @@ -552,10 +708,13 @@ void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); } bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); } void Agent::popEvent() { impl->popEvent(); } void Agent::setConnection(Connection& conn) { impl->setConnection(conn); } +void Agent::authAllow(uint32_t sequence) { impl->authAllow(sequence); } +void Agent::authDeny(uint32_t sequence, const Data& ex) { impl->authDeny(sequence, ex); } +void Agent::authDeny(uint32_t sequence, const char* ex) { impl->authDeny(sequence, string(ex)); } void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments) { impl->methodResponse(sequence, status, text, arguments); } -void Agent::queryResponse(uint32_t sequence, Object& object) { impl->queryResponse(sequence, object); } +void Agent::queryResponse(uint32_t sequence, Data& object) { impl->queryResponse(sequence, object); } void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); } void Agent::registerClass(SchemaClass* cls) { impl->registerClass(cls); } -const char* Agent::addObject(Object& obj, const char* key) { return impl->addObject(obj, key); } -void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); } +const char* Agent::addObject(Data& obj, const char* key) { return impl->addObject(obj, key); } +void Agent::raiseEvent(Data& event) { impl->raiseEvent(event); } diff --git a/qpid/cpp/src/qmf/engine/DataImpl.cpp b/qpid/cpp/src/qmf/engine/DataImpl.cpp new file mode 100644 index 0000000000..7eced7e504 --- /dev/null +++ b/qpid/cpp/src/qmf/engine/DataImpl.cpp @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/Protocol.h" +#include "qmf/engine/DataImpl.h" +#include <qpid/sys/Time.h> + +using namespace std; +using namespace qmf::engine; +using namespace qpid::sys; +using namespace qpid::messaging; + +DataImpl::DataImpl() : + objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime) +{ +} + + +DataImpl::DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map& v) : + values(v), objectClass(type), createTime(uint64_t(Duration(now()))), + destroyTime(0), lastUpdatedTime(createTime) +{ +} + + +void DataImpl::touch() +{ + lastUpdatedTime = uint64_t(Duration(now())); +} + + +void DataImpl::destroy() +{ + destroyTime = uint64_t(Duration(now())); +} + +Variant::Map DataImpl::asMap() const +{ + Variant::Map map; + + map[Protocol::VALUES] = values; + if (!subtypes.empty()) + map[Protocol::SUBTYPES] = subtypes; + // TODO: Add key, schema, and lifecycle data + + return map; +} + + +//================================================================== +// Wrappers +//================================================================== + +Data::Data() : impl(new DataImpl()) {} +Data::Data(SchemaClass* type, const Variant::Map& m) : impl(new DataImpl(type, m)) {} +Data::Data(const Data& from) : impl(new DataImpl(*(from.impl))) {} +Data::~Data() { delete impl; } +const Variant::Map& Data::getValues() const { return impl->getValues(); } +Variant::Map& Data::getValues() { return impl->getValues(); } +const Variant::Map& Data::getSubtypes() const { return impl->getSubtypes(); } +Variant::Map& Data::getSubtypes() { return impl->getSubtypes(); } +const SchemaClass* Data::getSchema() const { return impl->getSchema(); } +void Data::setSchema(SchemaClass* schema) { impl->setSchema(schema); } +const char* Data::getKey() const { return impl->getKey(); } +void Data::setKey(const char* key) { impl->setKey(key); } +void Data::touch() { impl->touch(); } +void Data::destroy() { impl->destroy(); } +Variant::Map Data::asMap() const { return impl->asMap(); } diff --git a/qpid/cpp/src/qmf/engine/ObjectImpl.h b/qpid/cpp/src/qmf/engine/DataImpl.h index 8be4ef655f..92559ae634 100644 --- a/qpid/cpp/src/qmf/engine/ObjectImpl.h +++ b/qpid/cpp/src/qmf/engine/DataImpl.h @@ -1,5 +1,5 @@ -#ifndef _QmfEngineObjectImpl_ -#define _QmfEngineObjectImpl_ +#ifndef _QmfEngineDataImpl_ +#define _QmfEngineDataImpl_ /* * Licensed to the Apache Software Foundation (ASF) under one @@ -20,7 +20,7 @@ * under the License. */ -#include <qmf/engine/Object.h> +#include <qmf/engine/Data.h> #include <qpid/sys/Mutex.h> #include <qpid/messaging/Variant.h> #include <map> @@ -33,13 +33,14 @@ namespace engine { class SchemaClass; - typedef boost::shared_ptr<Object> ObjectPtr; + typedef boost::shared_ptr<Data> DataPtr; - struct ObjectImpl { + struct DataImpl { /** * Content of the object's data */ qpid::messaging::Variant::Map values; + qpid::messaging::Variant::Map subtypes; /** * Schema reference if this object is "described" @@ -55,13 +56,16 @@ namespace engine { uint64_t destroyTime; uint64_t lastUpdatedTime; - ObjectImpl(); - ObjectImpl(SchemaClass* type); - ~ObjectImpl() {} + DataImpl(); + DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map&); + ~DataImpl() {} const qpid::messaging::Variant::Map& getValues() const { return values; } qpid::messaging::Variant::Map& getValues() { return values; } + const qpid::messaging::Variant::Map& getSubtypes() const { return subtypes; } + qpid::messaging::Variant::Map& getSubtypes() { return subtypes; } + const SchemaClass* getSchema() const { return objectClass; } void setSchema(SchemaClass* schema) { objectClass = schema; } @@ -70,6 +74,8 @@ namespace engine { void touch(); void destroy(); + + qpid::messaging::Variant::Map asMap() const; }; } } diff --git a/qpid/cpp/src/qmf/engine/EventImpl.cpp b/qpid/cpp/src/qmf/engine/EventImpl.cpp deleted file mode 100644 index 27501cc396..0000000000 --- a/qpid/cpp/src/qmf/engine/EventImpl.cpp +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/engine/EventImpl.h> -#include <qmf/engine/ValueImpl.h> - -#include <sstream> - -using namespace std; -using namespace qmf::engine; -using qpid::framing::Buffer; - -EventImpl::EventImpl(const SchemaEventClass* type) : eventClass(type) -{ - int argCount = eventClass->getPropertyCount(); - int idx; - - for (idx = 0; idx < argCount; idx++) { - const SchemaProperty* arg = eventClass->getProperty(idx); - properties[arg->getName()] = ValuePtr(new Value(arg->getType())); - } -} - - -EventImpl::EventImpl(const SchemaEventClass* type, Buffer&) : - eventClass(type) -{ -} - - -Event* EventImpl::factory(const SchemaEventClass* type, Buffer& buffer) -{ - EventImpl* impl(new EventImpl(type, buffer)); - return new Event(impl); -} - - -Value* EventImpl::getValue(const char* key) const -{ - map<string, ValuePtr>::const_iterator iter; - - iter = properties.find(key); - if (iter != properties.end()) - return iter->second.get(); - - return 0; -} - - -void EventImpl::encodeSchemaKey(Buffer& buffer) const -{ - buffer.putShortString(eventClass->getClassKey()->getPackageName()); - buffer.putShortString(eventClass->getClassKey()->getClassName()); - buffer.putBin128(const_cast<uint8_t*>(eventClass->getClassKey()->getHashData())); -} - - -void EventImpl::encode(Buffer& buffer) const -{ - buffer.putOctet((uint8_t) eventClass->getSeverity()); - - int argCount = eventClass->getPropertyCount(); - for (int idx = 0; idx < argCount; idx++) { - const SchemaProperty* arg = eventClass->getProperty(idx); - ValuePtr value = properties[arg->getName()]; - value->impl->encode(buffer); - } -} - - -string EventImpl::getRoutingKey(uint32_t brokerBank, uint32_t agentBank) const -{ - stringstream key; - - key << "console.event." << brokerBank << "." << agentBank << "." << - eventClass->getClassKey()->getPackageName() << "." << - eventClass->getClassKey()->getClassName(); - return key.str(); -} - - -//================================================================== -// Wrappers -//================================================================== - -Event::Event(const SchemaEventClass* type) : impl(new EventImpl(type)) {} -Event::Event(EventImpl* i) : impl(i) {} -Event::Event(const Event& from) : impl(new EventImpl(*(from.impl))) {} -Event::~Event() { delete impl; } -const SchemaEventClass* Event::getClass() const { return impl->getClass(); } -Value* Event::getValue(const char* key) const { return impl->getValue(key); } - diff --git a/qpid/cpp/src/qmf/engine/EventImpl.h b/qpid/cpp/src/qmf/engine/EventImpl.h deleted file mode 100644 index ab790f08fa..0000000000 --- a/qpid/cpp/src/qmf/engine/EventImpl.h +++ /dev/null @@ -1,53 +0,0 @@ -#ifndef _QmfEngineEventImpl_ -#define _QmfEngineEventImpl_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/engine/Event.h> -#include <qmf/engine/Schema.h> -#include <qpid/framing/Buffer.h> -#include <boost/shared_ptr.hpp> -#include <map> - -namespace qmf { -namespace engine { - - struct EventImpl { - typedef boost::shared_ptr<Value> ValuePtr; - const SchemaEventClass* eventClass; - mutable std::map<std::string, ValuePtr> properties; - - EventImpl(const SchemaEventClass* type); - EventImpl(const SchemaEventClass* type, qpid::framing::Buffer& buffer); - static Event* factory(const SchemaEventClass* type, qpid::framing::Buffer& buffer); - - const SchemaEventClass* getClass() const { return eventClass; } - Value* getValue(const char* key) const; - - void encodeSchemaKey(qpid::framing::Buffer& buffer) const; - void encode(qpid::framing::Buffer& buffer) const; - std::string getRoutingKey(uint32_t brokerBank, uint32_t agentBank) const; - }; - -} -} - -#endif - diff --git a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp deleted file mode 100644 index 9216f7bac0..0000000000 --- a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/ObjectIdImpl.h" -#include <stdlib.h> -#include <sstream> - -using namespace std; -using namespace qmf::engine; -using qpid::framing::Buffer; - -void AgentAttachment::setBanks(uint32_t broker, uint32_t agent) -{ - first = - ((uint64_t) (broker & 0x000fffff)) << 28 | - ((uint64_t) (agent & 0x0fffffff)); -} - -ObjectIdImpl::ObjectIdImpl(Buffer& buffer) : agent(0) -{ - decode(buffer); -} - -ObjectIdImpl::ObjectIdImpl(AgentAttachment* a, uint8_t flags, uint16_t seq, uint64_t object) : agent(a) -{ - first = - ((uint64_t) (flags & 0x0f)) << 60 | - ((uint64_t) (seq & 0x0fff)) << 48; - second = object; -} - -ObjectId* ObjectIdImpl::factory(Buffer& buffer) -{ - ObjectIdImpl* impl(new ObjectIdImpl(buffer)); - return new ObjectId(impl); -} - -ObjectId* ObjectIdImpl::factory(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object) -{ - ObjectIdImpl* impl(new ObjectIdImpl(agent, flags, seq, object)); - return new ObjectId(impl); -} - -void ObjectIdImpl::decode(Buffer& buffer) -{ - first = buffer.getLongLong(); - second = buffer.getLongLong(); -} - -void ObjectIdImpl::encode(Buffer& buffer) const -{ - if (agent == 0) - buffer.putLongLong(first); - else - buffer.putLongLong(first | agent->first); - buffer.putLongLong(second); -} - -void ObjectIdImpl::fromString(const std::string& repr) -{ -#define FIELDS 5 -#if defined (_WIN32) && !defined (atoll) -# define atoll(X) _atoi64(X) -#endif - - std::string copy(repr.c_str()); - char* cText; - char* field[FIELDS]; - bool atFieldStart = true; - int idx = 0; - - cText = const_cast<char*>(copy.c_str()); - for (char* cursor = cText; *cursor; cursor++) { - if (atFieldStart) { - if (idx >= FIELDS) - return; // TODO error - field[idx++] = cursor; - atFieldStart = false; - } else { - if (*cursor == '-') { - *cursor = '\0'; - atFieldStart = true; - } - } - } - - if (idx != FIELDS) - return; // TODO error - - first = (atoll(field[0]) << 60) + - (atoll(field[1]) << 48) + - (atoll(field[2]) << 28) + - atoll(field[3]); - second = atoll(field[4]); - agent = 0; -} - -const string& ObjectIdImpl::asString() const -{ - stringstream val; - - val << (int) getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" << - getAgentBank() << "-" << getObjectNum(); - repr = val.str(); - return repr; -} - -#define ACTUAL_FIRST (agent == 0 ? first : first | agent->first) -#define ACTUAL_OTHER (other.agent == 0 ? other.first : other.first | other.agent->first) - -uint8_t ObjectIdImpl::getFlags() const -{ - return (ACTUAL_FIRST & 0xF000000000000000LL) >> 60; -} - -uint16_t ObjectIdImpl::getSequence() const -{ - return (ACTUAL_FIRST & 0x0FFF000000000000LL) >> 48; -} - -uint32_t ObjectIdImpl::getBrokerBank() const -{ - return (ACTUAL_FIRST & 0x0000FFFFF0000000LL) >> 28; -} - -uint32_t ObjectIdImpl::getAgentBank() const -{ - return ACTUAL_FIRST & 0x000000000FFFFFFFLL; -} - -uint64_t ObjectIdImpl::getObjectNum() const -{ - return second; -} - -uint32_t ObjectIdImpl::getObjectNumHi() const -{ - return (uint32_t) (second >> 32); -} - -uint32_t ObjectIdImpl::getObjectNumLo() const -{ - return (uint32_t) (second & 0x00000000FFFFFFFFLL); -} - -bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const -{ - return ACTUAL_FIRST == ACTUAL_OTHER && second == other.second; -} - -bool ObjectIdImpl::operator<(const ObjectIdImpl& other) const -{ - return (ACTUAL_FIRST < ACTUAL_OTHER) || ((ACTUAL_FIRST == ACTUAL_OTHER) && (second < other.second)); -} - -bool ObjectIdImpl::operator>(const ObjectIdImpl& other) const -{ - return (ACTUAL_FIRST > ACTUAL_OTHER) || ((ACTUAL_FIRST == ACTUAL_OTHER) && (second > other.second)); -} - - -//================================================================== -// Wrappers -//================================================================== - -ObjectId::ObjectId() : impl(new ObjectIdImpl()) {} -ObjectId::ObjectId(const ObjectId& from) : impl(new ObjectIdImpl(*(from.impl))) {} -ObjectId::ObjectId(ObjectIdImpl* i) : impl(i) {} -ObjectId::~ObjectId() { delete impl; } -uint64_t ObjectId::getObjectNum() const { return impl->getObjectNum(); } -uint32_t ObjectId::getObjectNumHi() const { return impl->getObjectNumHi(); } -uint32_t ObjectId::getObjectNumLo() const { return impl->getObjectNumLo(); } -bool ObjectId::isDurable() const { return impl->isDurable(); } -const char* ObjectId::str() const { return impl->asString().c_str(); } -uint8_t ObjectId::getFlags() const { return impl->getFlags(); } -uint16_t ObjectId::getSequence() const { return impl->getSequence(); } -uint32_t ObjectId::getBrokerBank() const { return impl->getBrokerBank(); } -uint32_t ObjectId::getAgentBank() const { return impl->getAgentBank(); } -bool ObjectId::operator==(const ObjectId& other) const { return *impl == *other.impl; } -bool ObjectId::operator<(const ObjectId& other) const { return *impl < *other.impl; } -bool ObjectId::operator>(const ObjectId& other) const { return *impl > *other.impl; } -bool ObjectId::operator<=(const ObjectId& other) const { return !(*impl > *other.impl); } -bool ObjectId::operator>=(const ObjectId& other) const { return !(*impl < *other.impl); } -ObjectId& ObjectId::operator=(const ObjectId& other) { - ObjectIdImpl *old; - if (this != &other) { - old = impl; - impl = new ObjectIdImpl(*(other.impl)); - if (old) - delete old; - } - return *this; -} - diff --git a/qpid/cpp/src/qmf/engine/ObjectIdImpl.h b/qpid/cpp/src/qmf/engine/ObjectIdImpl.h deleted file mode 100644 index d70c8efff4..0000000000 --- a/qpid/cpp/src/qmf/engine/ObjectIdImpl.h +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef _QmfEngineObjectIdImpl_ -#define _QmfEngineObjectIdImpl_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/engine/ObjectId.h> -#include <qpid/framing/Buffer.h> - -namespace qmf { -namespace engine { - - struct AgentAttachment { - uint64_t first; - - AgentAttachment() : first(0) {} - void setBanks(uint32_t broker, uint32_t bank); - uint64_t getFirst() const { return first; } - }; - - struct ObjectIdImpl { - AgentAttachment* agent; - uint64_t first; - uint64_t second; - mutable std::string repr; - - ObjectIdImpl() : agent(0), first(0), second(0) {} - ObjectIdImpl(qpid::framing::Buffer& buffer); - ObjectIdImpl(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object); - - static ObjectId* factory(qpid::framing::Buffer& buffer); - static ObjectId* factory(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object); - - void decode(qpid::framing::Buffer& buffer); - void encode(qpid::framing::Buffer& buffer) const; - void fromString(const std::string& repr); - const std::string& asString() const; - uint8_t getFlags() const; - uint16_t getSequence() const; - uint32_t getBrokerBank() const; - uint32_t getAgentBank() const; - uint64_t getObjectNum() const; - uint32_t getObjectNumHi() const; - uint32_t getObjectNumLo() const; - bool isDurable() const { return getSequence() == 0; } - void setValue(uint64_t f, uint64_t s) { first = f; second = s; agent = 0; } - - bool operator==(const ObjectIdImpl& other) const; - bool operator<(const ObjectIdImpl& other) const; - bool operator>(const ObjectIdImpl& other) const; - }; -} -} - -#endif - diff --git a/qpid/cpp/src/qmf/engine/ObjectImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp deleted file mode 100644 index 353b16ee37..0000000000 --- a/qpid/cpp/src/qmf/engine/ObjectImpl.cpp +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/ObjectImpl.h" -#include <qpid/sys/Time.h> - -using namespace std; -using namespace qmf::engine; -using namespace qpid::sys; -using namespace qpid::messaging; - -ObjectImpl::ObjectImpl() : - objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime) -{ -} - - -ObjectImpl::ObjectImpl(SchemaClass* type) : - objectClass(type), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime) -{ -} - - -void ObjectImpl::touch() -{ - lastUpdatedTime = uint64_t(Duration(now())); -} - - -void ObjectImpl::destroy() -{ - destroyTime = uint64_t(Duration(now())); -} - - -//================================================================== -// Wrappers -//================================================================== - -Object::Object() : impl(new ObjectImpl()) {} -Object::Object(SchemaClass* type) : impl(new ObjectImpl(type)) {} -Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {} -Object::~Object() { delete impl; } -const Variant::Map& Object::getValues() const { return impl->getValues(); } -Variant::Map& Object::getValues() { return impl->getValues(); } -const SchemaClass* Object::getSchema() const { return impl->getSchema(); } -void Object::setSchema(SchemaClass* schema) { impl->setSchema(schema); } -const char* Object::getKey() const { return impl->getKey(); } -void Object::setKey(const char* key) { impl->setKey(key); } -void Object::touch() { impl->touch(); } -void Object::destroy() { impl->destroy(); } diff --git a/qpid/cpp/src/qmf/engine/QueryImpl.cpp b/qpid/cpp/src/qmf/engine/QueryImpl.cpp index 0df49ff646..371f400f7b 100644 --- a/qpid/cpp/src/qmf/engine/QueryImpl.cpp +++ b/qpid/cpp/src/qmf/engine/QueryImpl.cpp @@ -23,7 +23,17 @@ using namespace std; using namespace qmf::engine; using namespace qpid::messaging; -bool QueryImpl::matches(const Object&) const +QueryImpl::QueryImpl(const qpid::messaging::MapView&) +{ + // TODO +} + +QueryImpl::QueryImpl(const qpid::messaging::ListView&) +{ + //TODO +} + +bool QueryImpl::matches(const Variant::Map&) const { return true; } @@ -34,6 +44,17 @@ void QueryImpl::parsePredicate(const std::string&) predicate.clear(); } +Query* QueryImpl::factory(const qpid::messaging::MapView& map) +{ + QueryImpl* impl(new QueryImpl(map)); + return new Query(impl); +} + +Query* QueryImpl::factory(const qpid::messaging::ListView& pred) +{ + QueryImpl* impl(new QueryImpl(pred)); + return new Query(impl); +} //================================================================== // Wrappers @@ -43,6 +64,7 @@ Query::Query(const char* target) : impl(new QueryImpl(target)) {} Query::Query(const char* target, const Variant::List& predicate) : impl(new QueryImpl(target, predicate)) {} Query::Query(const char* target, const char* expression) : impl(new QueryImpl(target, expression)) {} Query::Query(const Query& from) : impl(new QueryImpl(*(from.impl))) {} +Query::Query(QueryImpl* i) : impl(i) {} Query::~Query() { delete impl; } void Query::where(const Variant::List& predicate) { impl->where(predicate); } void Query::where(const char* expression) { impl->where(expression); } @@ -55,5 +77,5 @@ const Variant::List& Query::getPredicate() const { return impl->getPredicate(); uint32_t Query::getLimit() const { return impl->getLimit(); } const char* Query::getOrderBy() const { return impl->getOrderBy(); } bool Query::getDecreasing() const { return impl->getDecreasing(); } -bool Query::matches(const Object& object) const { return impl->matches(object); } +bool Query::matches(const Variant::Map& data) const { return impl->matches(data); } diff --git a/qpid/cpp/src/qmf/engine/QueryImpl.h b/qpid/cpp/src/qmf/engine/QueryImpl.h index 0ef8711f8e..326bbb7fa6 100644 --- a/qpid/cpp/src/qmf/engine/QueryImpl.h +++ b/qpid/cpp/src/qmf/engine/QueryImpl.h @@ -22,6 +22,8 @@ #include "qmf/engine/Query.h" #include <qpid/messaging/Variant.h> +#include <qpid/messaging/MapView.h> +#include <qpid/messaging/ListView.h> #include <string> #include <boost/shared_ptr.hpp> @@ -34,8 +36,13 @@ namespace engine { target(_target), predicate(_predicate), resultLimit(0) {} QueryImpl(const char* _target, const char* expression) : target(_target), resultLimit(0) { parsePredicate(expression); } + QueryImpl(const qpid::messaging::MapView& map); + QueryImpl(const qpid::messaging::ListView& pred); ~QueryImpl() {} + static Query* factory(const qpid::messaging::MapView& map); + static Query* factory(const qpid::messaging::ListView& pred); + void where(const qpid::messaging::Variant::List& _predicate) { predicate = _predicate; } void where(const char* expression) { parsePredicate(expression); } void limit(uint32_t maxResults) { resultLimit = maxResults; } @@ -48,7 +55,7 @@ namespace engine { uint32_t getLimit() const { return resultLimit; } const char* getOrderBy() const { return sortAttr.c_str(); } bool getDecreasing() const { return orderDecreasing; } - bool matches(const Object& object) const; + bool matches(const qpid::messaging::Variant::Map& data) const; void parsePredicate(const std::string& expression); diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h index 71a10559cf..29b327f6cd 100644 --- a/qpid/cpp/src/qmf/engine/SchemaImpl.h +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h @@ -41,7 +41,7 @@ namespace engine { SchemaException(const std::string& context, const std::string& expected) { text = context + ": Expected item with key " + expected; } - virtual ~SchemaException() throw(); + virtual ~SchemaException() throw() {} virtual const char* what() const throw() { return text.c_str(); } private: |