From 26883f5dcdd17e31fad56d89bda169eb1e5a281f Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Tue, 2 Mar 2010 03:25:56 +0000 Subject: Update branch with new Agent engine implementation: - Data hooks to allow batched and partial updates from internal storage. - Capability for immediate updates for deletion and changing of discrete values. - Implementation of query and event-raise. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@917854 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/include/qmf/Protocol.h | 3 + qpid/cpp/include/qmf/engine/Data.h | 8 +- qpid/cpp/src/qmf/Protocol.cpp | 3 + qpid/cpp/src/qmf/engine/Agent.cpp | 145 +++++++++++++++++++++++++++-------- qpid/cpp/src/qmf/engine/DataImpl.cpp | 40 ++++++++-- qpid/cpp/src/qmf/engine/DataImpl.h | 24 +++++- 6 files changed, 180 insertions(+), 43 deletions(-) diff --git a/qpid/cpp/include/qmf/Protocol.h b/qpid/cpp/include/qmf/Protocol.h index e2ec287c00..64ba4b69ff 100644 --- a/qpid/cpp/include/qmf/Protocol.h +++ b/qpid/cpp/include/qmf/Protocol.h @@ -67,6 +67,8 @@ namespace qmf { * Application Header Keys */ const static std::string APP_OPCODE; + const static std::string APP_PARTIAL; + const static std::string APP_CONTENT; /** * QMF Op Codes @@ -88,6 +90,7 @@ namespace qmf { /** * Content type definitions */ + const static std::string CONTENT_NONE; const static std::string CONTENT_PACKAGE; const static std::string CONTENT_SCHEMA_ID; const static std::string CONTENT_SCHEMA_CLASS; diff --git a/qpid/cpp/include/qmf/engine/Data.h b/qpid/cpp/include/qmf/engine/Data.h index 30f2093df7..33a0289ea5 100644 --- a/qpid/cpp/include/qmf/engine/Data.h +++ b/qpid/cpp/include/qmf/engine/Data.h @@ -20,12 +20,12 @@ * under the License. */ -#include #include namespace qmf { namespace engine { + class SchemaClass; struct DataImpl; class Data { public: @@ -41,16 +41,14 @@ namespace engine { qpid::messaging::Variant::Map& getSubtypes(); const SchemaClass* getSchema() const; - void setSchema(SchemaClass* schema); const char* getKey() const; void setKey(const char* key); - void touch(); + void modifyStart(); + void modifyDone(); void destroy(); - qpid::messaging::Variant::Map asMap() const; - private: friend struct DataImpl; friend class AgentImpl; diff --git a/qpid/cpp/src/qmf/Protocol.cpp b/qpid/cpp/src/qmf/Protocol.cpp index 5ab009d6bf..774ca709af 100644 --- a/qpid/cpp/src/qmf/Protocol.cpp +++ b/qpid/cpp/src/qmf/Protocol.cpp @@ -47,6 +47,8 @@ const string Protocol::AMQP_CONTENT_MAP("amqp/map"); const string Protocol::AMQP_CONTENT_LIST("amqp/list"); const string Protocol::APP_OPCODE("qmf.opcode"); +const string Protocol::APP_PARTIAL("partial"); +const string Protocol::APP_CONTENT("qmf.content"); const string Protocol::OP_EXCEPTION("_exception"); const string Protocol::OP_AGENT_LOCATE_REQUEST("_agent_locate_request"); @@ -62,6 +64,7 @@ const string Protocol::OP_DATA_INDICATION("_data_indication"); const string Protocol::OP_METHOD_REQUEST("_method_request"); const string Protocol::OP_METHOD_RESPONSE("_method_response"); +const string Protocol::CONTENT_NONE(""); const string Protocol::CONTENT_PACKAGE("_schema_package"); const string Protocol::CONTENT_SCHEMA_ID("_schema_id"); const string Protocol::CONTENT_SCHEMA_CLASS("_schema_class"); diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp index 04308b954a..453c6bd27b 100644 --- a/qpid/cpp/src/qmf/engine/Agent.cpp +++ b/qpid/cpp/src/qmf/engine/Agent.cpp @@ -19,7 +19,7 @@ #include "qmf/engine/Agent.h" #include "qmf/engine/SchemaImpl.h" -#include "qmf/engine/Data.h" +#include "qmf/engine/DataImpl.h" #include "qmf/engine/QueryImpl.h" #include "qmf/Protocol.h" #include @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -86,15 +87,27 @@ namespace engine { AsyncContext(const string& cid, const Address& rt) : correlationId(cid), replyTo(rt), schemaMethod(0) {} }; - class StoreThread : public boost::noncopyable, public qpid::sys::Runnable { + /** + * StoreThread is used only when the Agent runs in internal-store mode. + * This class keeps track of stored objects and can perform queries and + * subscription queries on the data. + */ + class StoreThread : public boost::noncopyable, public qpid::sys::Runnable, public DataManager { public: StoreThread(AgentImpl& a) : agent(a), running(true), thread(*this) {} - ~StoreThread() { - stop(); - } + ~StoreThread() { stop(); } + + void addObject(const Data& data); + + // Methods from Runnable void run(); void stop(); + // Methods from DataManager + void modifyStart(DataPtr data); + void modifyDone(DataPtr data); + void destroy(DataPtr data); + private: AgentImpl& agent; bool running; @@ -142,6 +155,7 @@ namespace engine { string directAddrParams; string topicAddr; string topicAddrParams; + string eventSendAddr; Variant::Map attrMap; string storeDir; string transferDir; @@ -204,7 +218,10 @@ namespace engine { void handleSubscribeRefresh(const Message& message); void handleMethodRequest(const Message& message); void sendResponse(const Message& request, const string& opcode, const Data& data); - void sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data); + void send(const Address& address, const string& correlationId, const string& opcode, + const string& cType, const Data& data); + void send(const Address& address, const string& correlationId, const string& opcode, + const string& cType, const Variant::List& list, bool partial=false); void sendPackageIndicationLH(const string& packageName); void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key); @@ -243,6 +260,11 @@ AgentEvent AgentEventImpl::copy() return item; } +void StoreThread::addObject(const Data& data) +{ + DataPtr stored(new Data(data)); +} + void StoreThread::run() { while (running) { @@ -256,6 +278,30 @@ void StoreThread::stop() agent.signalInternal(); } +void StoreThread::modifyStart(DataPtr) +{ + // Algorithm: + // Make a copy of the indicated object as a delta base if there + // isn't already one in place. If there is, do nothing. +} + +void StoreThread::modifyDone(DataPtr) +{ + // Algorithm: + // If any deltas between the current and the stored base are discrete, + // send an immediate update. Otherwise, mark the object as modified. + // + // If an update is sent, delete the base copy. If not, leave the base copy + // in place for the later periodic update. +} + +void StoreThread::destroy(DataPtr) +{ + // Algorithm: + // Send an immediate full-update for this object with the delete time set. + // Remove the object and any copies from the data store. +} + AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) : vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i), notifyHandler(0), notifiable(0), @@ -263,6 +309,7 @@ AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* { directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name; topicAddr = "qmf." + domain + ".topic/console.ind.#"; + eventSendAddr = "qmf." + domain + ".topic/agent.event"; if (_d != 0) { directAddrParams = " {create: always, type: topic, x-properties: {type: direct}}"; topicAddrParams = " {create: always, type: topic, x-properties: {type: topic}}"; @@ -376,7 +423,7 @@ void AgentImpl::authAllow(uint32_t sequence) // Re-issue the now-authorized action. If this is a data query (get or subscribe), // and the agent is handling storage internally, redirect to the internal event // queue for processing by the internal-storage thread. - if (internalStore) { + if (internalStore && context->authorizedEvent->kind != AgentEvent::METHOD_CALL) { internalEventQueue.push_back(context->authorizedEvent); cond.notify(); } else { @@ -395,7 +442,7 @@ void AgentImpl::authDeny(uint32_t sequence, const Data& exception) contextMap.erase(iter); // Return an exception message to the requestor - sendResponse(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, exception); + send(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, Protocol::CONTENT_NONE, exception); } void AgentImpl::authDeny(uint32_t sequence, const string& error) @@ -419,29 +466,37 @@ void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, c QPID_LOG(trace, "SENT MethodResponse corr=" << context->correlationId << " status=" << status << " text=" << text); } -void AgentImpl::queryResponse(uint32_t sequence, Data&) +void AgentImpl::queryResponse(uint32_t sequence, Data& data) { - Mutex::ScopedLock _lock(lock); - map::iterator iter = contextMap.find(sequence); - if (iter == contextMap.end()) - return; - AsyncContext::Ptr context = iter->second; + AsyncContext::Ptr context; + { + Mutex::ScopedLock _lock(lock); + map::iterator iter = contextMap.find(sequence); + if (iter == contextMap.end()) + return; + context = iter->second; + } - // TODO: accumulate data records and send response messages when we have "enough" + Variant::List list; + list.push_back(data.impl->asMap()); + send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, list, true); + QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo); } void AgentImpl::queryComplete(uint32_t sequence) { - Mutex::ScopedLock _lock(lock); - map::iterator iter = contextMap.find(sequence); - if (iter == contextMap.end()) - return; - - // TODO: send a response message if there are any unsent data records + AsyncContext::Ptr context; + { + Mutex::ScopedLock _lock(lock); + map::iterator iter = contextMap.find(sequence); + if (iter == contextMap.end()) + return; + context = iter->second; + contextMap.erase(iter); + } - AsyncContext::Ptr context = iter->second; - contextMap.erase(iter); - //sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK"); + send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, Variant::List()); + QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo << " final response message"); } void AgentImpl::registerClass(SchemaClass* cls) @@ -463,13 +518,24 @@ void AgentImpl::registerClass(SchemaClass* cls) const char* AgentImpl::addObject(Data&, const char*) { + // TODO: Implement + // + // Determine a key for this object: + // if supplied, use the supplied key + // else: + // if the data is described (has a schema), use the schema primary-key to generate a key + // else make something up (a guid) + // + Mutex::ScopedLock _lock(lock); return 0; } -void AgentImpl::raiseEvent(Data&) +void AgentImpl::raiseEvent(Data& data) { - Mutex::ScopedLock _lock(lock); + Variant::List list; + list.push_back(data.impl->asMap()); + send(eventSendAddr, "", Protocol::OP_DATA_INDICATION, Protocol::CONTENT_EVENT, list); } void AgentImpl::run() @@ -601,16 +667,35 @@ void AgentImpl::handleMethodRequest(const Message& message) void AgentImpl::sendResponse(const Message& request, const string& opcode, const Data& data) { - sendResponse(request.getReplyTo(), request.getCorrelationId(), opcode, data); + send(request.getReplyTo(), request.getCorrelationId(), opcode, Protocol::CONTENT_NONE, data); +} + +void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Data& data) +{ + Message message; + MapContent content(message, data.impl->asMap()); + + if (!correlationId.empty()) + message.setCorrelationId(correlationId); + if (!cType.empty()) + message.getHeaders()[Protocol::APP_CONTENT] = cType; + message.getHeaders()[Protocol::APP_OPCODE] = opcode; + content.encode(); + session.createSender(address).send(message); } -void AgentImpl::sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data) +void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Variant::List& list, bool partial) { Message message; - MapContent content(message, data.asMap()); + ListContent content(message, list); - message.setCorrelationId(correlationId); + if (!correlationId.empty()) + message.setCorrelationId(correlationId); + if (!cType.empty()) + message.getHeaders()[Protocol::APP_CONTENT] = cType; message.getHeaders()[Protocol::APP_OPCODE] = opcode; + if (partial) + message.getHeaders()[Protocol::APP_PARTIAL] = Variant(); content.encode(); session.createSender(address).send(message); } diff --git a/qpid/cpp/src/qmf/engine/DataImpl.cpp b/qpid/cpp/src/qmf/engine/DataImpl.cpp index 7eced7e504..ab7d5f2178 100644 --- a/qpid/cpp/src/qmf/engine/DataImpl.cpp +++ b/qpid/cpp/src/qmf/engine/DataImpl.cpp @@ -27,27 +27,44 @@ using namespace qpid::sys; using namespace qpid::messaging; DataImpl::DataImpl() : - objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime) + objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime), + manager(0) { } DataImpl::DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map& v) : values(v), objectClass(type), createTime(uint64_t(Duration(now()))), - destroyTime(0), lastUpdatedTime(createTime) + destroyTime(0), lastUpdatedTime(createTime), manager(0) { } -void DataImpl::touch() +void DataImpl::modifyStart() { + Mutex::ScopedLock _lock(lock); lastUpdatedTime = uint64_t(Duration(now())); + if (manager != 0) + manager->modifyStart(parent); +} + + +void DataImpl::modifyDone() +{ + Mutex::ScopedLock _lock(lock); + if (manager != 0) + manager->modifyDone(parent); } void DataImpl::destroy() { + Mutex::ScopedLock _lock(lock); destroyTime = uint64_t(Duration(now())); + if (manager != 0) + manager->destroy(parent); + parent.reset(); + manager = 0; } Variant::Map DataImpl::asMap() const @@ -62,6 +79,18 @@ Variant::Map DataImpl::asMap() const return map; } +Variant::Map DataImpl::asMapDelta(Data&) const +{ + Variant::Map map; + return map; +} + +void DataImpl::registerManager(DataManager* m, DataPtr d) +{ + Mutex::ScopedLock _lock(lock); + manager = m; + parent = d; +} //================================================================== // Wrappers @@ -76,9 +105,8 @@ Variant::Map& Data::getValues() { return impl->getValues(); } const Variant::Map& Data::getSubtypes() const { return impl->getSubtypes(); } Variant::Map& Data::getSubtypes() { return impl->getSubtypes(); } const SchemaClass* Data::getSchema() const { return impl->getSchema(); } -void Data::setSchema(SchemaClass* schema) { impl->setSchema(schema); } const char* Data::getKey() const { return impl->getKey(); } void Data::setKey(const char* key) { impl->setKey(key); } -void Data::touch() { impl->touch(); } +void Data::modifyStart() { impl->modifyStart(); } +void Data::modifyDone() { impl->modifyDone(); } void Data::destroy() { impl->destroy(); } -Variant::Map Data::asMap() const { return impl->asMap(); } diff --git a/qpid/cpp/src/qmf/engine/DataImpl.h b/qpid/cpp/src/qmf/engine/DataImpl.h index 92559ae634..3974aafde7 100644 --- a/qpid/cpp/src/qmf/engine/DataImpl.h +++ b/qpid/cpp/src/qmf/engine/DataImpl.h @@ -35,7 +35,18 @@ namespace engine { typedef boost::shared_ptr DataPtr; + class DataManager { + public: + virtual ~DataManager() {} + virtual void modifyStart(DataPtr data) = 0; + virtual void modifyDone(DataPtr data) = 0; + virtual void destroy(DataPtr data) = 0; + + }; + struct DataImpl { + qpid::sys::Mutex lock; + /** * Content of the object's data */ @@ -56,8 +67,15 @@ namespace engine { uint64_t destroyTime; uint64_t lastUpdatedTime; + DataManager* manager; + DataPtr parent; + DataImpl(); DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map&); + DataImpl(const DataImpl& from) : + values(from.values), subtypes(from.subtypes), objectClass(from.objectClass), + key(from.key), createTime(from.createTime), destroyTime(from.destroyTime), + lastUpdatedTime(from.lastUpdatedTime), manager(0) {} ~DataImpl() {} const qpid::messaging::Variant::Map& getValues() const { return values; } @@ -67,15 +85,17 @@ namespace engine { qpid::messaging::Variant::Map& getSubtypes() { return subtypes; } const SchemaClass* getSchema() const { return objectClass; } - void setSchema(SchemaClass* schema) { objectClass = schema; } const char* getKey() const { return key.c_str(); } void setKey(const char* _key) { key = _key; } - void touch(); + void modifyStart(); + void modifyDone(); void destroy(); qpid::messaging::Variant::Map asMap() const; + qpid::messaging::Variant::Map asMapDelta(Data& base) const; + void registerManager(DataManager* manager, DataPtr data); }; } } -- cgit v1.2.1