summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-03-02 03:25:56 +0000
committerTed Ross <tross@apache.org>2010-03-02 03:25:56 +0000
commit26883f5dcdd17e31fad56d89bda169eb1e5a281f (patch)
tree9a6706120771ed48990db8b416d35f949d060a34
parent4133cb8e05f8ea6bac29e1e5cf81afaa3d3a591a (diff)
downloadqpid-python-qmf-devel0.7.tar.gz
Update branch with new Agent engine implementation:qmf-devel0.7
- Data hooks to allow batched and partial updates from internal storage. - Capability for immediate updates for deletion and changing of discrete values. - Implementation of query and event-raise. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@917854 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qmf/Protocol.h3
-rw-r--r--qpid/cpp/include/qmf/engine/Data.h8
-rw-r--r--qpid/cpp/src/qmf/Protocol.cpp3
-rw-r--r--qpid/cpp/src/qmf/engine/Agent.cpp145
-rw-r--r--qpid/cpp/src/qmf/engine/DataImpl.cpp40
-rw-r--r--qpid/cpp/src/qmf/engine/DataImpl.h24
6 files changed, 180 insertions, 43 deletions
diff --git a/qpid/cpp/include/qmf/Protocol.h b/qpid/cpp/include/qmf/Protocol.h
index e2ec287c00..64ba4b69ff 100644
--- a/qpid/cpp/include/qmf/Protocol.h
+++ b/qpid/cpp/include/qmf/Protocol.h
@@ -67,6 +67,8 @@ namespace qmf {
* Application Header Keys
*/
const static std::string APP_OPCODE;
+ const static std::string APP_PARTIAL;
+ const static std::string APP_CONTENT;
/**
* QMF Op Codes
@@ -88,6 +90,7 @@ namespace qmf {
/**
* Content type definitions
*/
+ const static std::string CONTENT_NONE;
const static std::string CONTENT_PACKAGE;
const static std::string CONTENT_SCHEMA_ID;
const static std::string CONTENT_SCHEMA_CLASS;
diff --git a/qpid/cpp/include/qmf/engine/Data.h b/qpid/cpp/include/qmf/engine/Data.h
index 30f2093df7..33a0289ea5 100644
--- a/qpid/cpp/include/qmf/engine/Data.h
+++ b/qpid/cpp/include/qmf/engine/Data.h
@@ -20,12 +20,12 @@
* under the License.
*/
-#include <qmf/engine/Schema.h>
#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
+ class SchemaClass;
struct DataImpl;
class Data {
public:
@@ -41,16 +41,14 @@ namespace engine {
qpid::messaging::Variant::Map& getSubtypes();
const SchemaClass* getSchema() const;
- void setSchema(SchemaClass* schema);
const char* getKey() const;
void setKey(const char* key);
- void touch();
+ void modifyStart();
+ void modifyDone();
void destroy();
- qpid::messaging::Variant::Map asMap() const;
-
private:
friend struct DataImpl;
friend class AgentImpl;
diff --git a/qpid/cpp/src/qmf/Protocol.cpp b/qpid/cpp/src/qmf/Protocol.cpp
index 5ab009d6bf..774ca709af 100644
--- a/qpid/cpp/src/qmf/Protocol.cpp
+++ b/qpid/cpp/src/qmf/Protocol.cpp
@@ -47,6 +47,8 @@ const string Protocol::AMQP_CONTENT_MAP("amqp/map");
const string Protocol::AMQP_CONTENT_LIST("amqp/list");
const string Protocol::APP_OPCODE("qmf.opcode");
+const string Protocol::APP_PARTIAL("partial");
+const string Protocol::APP_CONTENT("qmf.content");
const string Protocol::OP_EXCEPTION("_exception");
const string Protocol::OP_AGENT_LOCATE_REQUEST("_agent_locate_request");
@@ -62,6 +64,7 @@ const string Protocol::OP_DATA_INDICATION("_data_indication");
const string Protocol::OP_METHOD_REQUEST("_method_request");
const string Protocol::OP_METHOD_RESPONSE("_method_response");
+const string Protocol::CONTENT_NONE("");
const string Protocol::CONTENT_PACKAGE("_schema_package");
const string Protocol::CONTENT_SCHEMA_ID("_schema_id");
const string Protocol::CONTENT_SCHEMA_CLASS("_schema_class");
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp
index 04308b954a..453c6bd27b 100644
--- a/qpid/cpp/src/qmf/engine/Agent.cpp
+++ b/qpid/cpp/src/qmf/engine/Agent.cpp
@@ -19,7 +19,7 @@
#include "qmf/engine/Agent.h"
#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Data.h"
+#include "qmf/engine/DataImpl.h"
#include "qmf/engine/QueryImpl.h"
#include "qmf/Protocol.h"
#include <qpid/sys/Mutex.h>
@@ -34,6 +34,7 @@
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/MapContent.h>
+#include <qpid/messaging/ListContent.h>
#include <qpid/messaging/MapView.h>
#include <qpid/messaging/ListView.h>
#include <string>
@@ -86,15 +87,27 @@ namespace engine {
AsyncContext(const string& cid, const Address& rt) : correlationId(cid), replyTo(rt), schemaMethod(0) {}
};
- class StoreThread : public boost::noncopyable, public qpid::sys::Runnable {
+ /**
+ * StoreThread is used only when the Agent runs in internal-store mode.
+ * This class keeps track of stored objects and can perform queries and
+ * subscription queries on the data.
+ */
+ class StoreThread : public boost::noncopyable, public qpid::sys::Runnable, public DataManager {
public:
StoreThread(AgentImpl& a) : agent(a), running(true), thread(*this) {}
- ~StoreThread() {
- stop();
- }
+ ~StoreThread() { stop(); }
+
+ void addObject(const Data& data);
+
+ // Methods from Runnable
void run();
void stop();
+ // Methods from DataManager
+ void modifyStart(DataPtr data);
+ void modifyDone(DataPtr data);
+ void destroy(DataPtr data);
+
private:
AgentImpl& agent;
bool running;
@@ -142,6 +155,7 @@ namespace engine {
string directAddrParams;
string topicAddr;
string topicAddrParams;
+ string eventSendAddr;
Variant::Map attrMap;
string storeDir;
string transferDir;
@@ -204,7 +218,10 @@ namespace engine {
void handleSubscribeRefresh(const Message& message);
void handleMethodRequest(const Message& message);
void sendResponse(const Message& request, const string& opcode, const Data& data);
- void sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data);
+ void send(const Address& address, const string& correlationId, const string& opcode,
+ const string& cType, const Data& data);
+ void send(const Address& address, const string& correlationId, const string& opcode,
+ const string& cType, const Variant::List& list, bool partial=false);
void sendPackageIndicationLH(const string& packageName);
void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
@@ -243,6 +260,11 @@ AgentEvent AgentEventImpl::copy()
return item;
}
+void StoreThread::addObject(const Data& data)
+{
+ DataPtr stored(new Data(data));
+}
+
void StoreThread::run()
{
while (running) {
@@ -256,6 +278,30 @@ void StoreThread::stop()
agent.signalInternal();
}
+void StoreThread::modifyStart(DataPtr)
+{
+ // Algorithm:
+ // Make a copy of the indicated object as a delta base if there
+ // isn't already one in place. If there is, do nothing.
+}
+
+void StoreThread::modifyDone(DataPtr)
+{
+ // Algorithm:
+ // If any deltas between the current and the stored base are discrete,
+ // send an immediate update. Otherwise, mark the object as modified.
+ //
+ // If an update is sent, delete the base copy. If not, leave the base copy
+ // in place for the later periodic update.
+}
+
+void StoreThread::destroy(DataPtr)
+{
+ // Algorithm:
+ // Send an immediate full-update for this object with the delete time set.
+ // Remove the object and any copies from the data store.
+}
+
AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
notifyHandler(0), notifiable(0),
@@ -263,6 +309,7 @@ AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char*
{
directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
topicAddr = "qmf." + domain + ".topic/console.ind.#";
+ eventSendAddr = "qmf." + domain + ".topic/agent.event";
if (_d != 0) {
directAddrParams = " {create: always, type: topic, x-properties: {type: direct}}";
topicAddrParams = " {create: always, type: topic, x-properties: {type: topic}}";
@@ -376,7 +423,7 @@ void AgentImpl::authAllow(uint32_t sequence)
// Re-issue the now-authorized action. If this is a data query (get or subscribe),
// and the agent is handling storage internally, redirect to the internal event
// queue for processing by the internal-storage thread.
- if (internalStore) {
+ if (internalStore && context->authorizedEvent->kind != AgentEvent::METHOD_CALL) {
internalEventQueue.push_back(context->authorizedEvent);
cond.notify();
} else {
@@ -395,7 +442,7 @@ void AgentImpl::authDeny(uint32_t sequence, const Data& exception)
contextMap.erase(iter);
// Return an exception message to the requestor
- sendResponse(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, exception);
+ send(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, Protocol::CONTENT_NONE, exception);
}
void AgentImpl::authDeny(uint32_t sequence, const string& error)
@@ -419,29 +466,37 @@ void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, c
QPID_LOG(trace, "SENT MethodResponse corr=" << context->correlationId << " status=" << status << " text=" << text);
}
-void AgentImpl::queryResponse(uint32_t sequence, Data&)
+void AgentImpl::queryResponse(uint32_t sequence, Data& data)
{
- Mutex::ScopedLock _lock(lock);
- map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
- if (iter == contextMap.end())
- return;
- AsyncContext::Ptr context = iter->second;
+ AsyncContext::Ptr context;
+ {
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ context = iter->second;
+ }
- // TODO: accumulate data records and send response messages when we have "enough"
+ Variant::List list;
+ list.push_back(data.impl->asMap());
+ send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, list, true);
+ QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo);
}
void AgentImpl::queryComplete(uint32_t sequence)
{
- Mutex::ScopedLock _lock(lock);
- map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
- if (iter == contextMap.end())
- return;
-
- // TODO: send a response message if there are any unsent data records
+ AsyncContext::Ptr context;
+ {
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ context = iter->second;
+ contextMap.erase(iter);
+ }
- AsyncContext::Ptr context = iter->second;
- contextMap.erase(iter);
- //sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
+ send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, Variant::List());
+ QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo << " final response message");
}
void AgentImpl::registerClass(SchemaClass* cls)
@@ -463,13 +518,24 @@ void AgentImpl::registerClass(SchemaClass* cls)
const char* AgentImpl::addObject(Data&, const char*)
{
+ // TODO: Implement
+ //
+ // Determine a key for this object:
+ // if supplied, use the supplied key
+ // else:
+ // if the data is described (has a schema), use the schema primary-key to generate a key
+ // else make something up (a guid)
+ //
+
Mutex::ScopedLock _lock(lock);
return 0;
}
-void AgentImpl::raiseEvent(Data&)
+void AgentImpl::raiseEvent(Data& data)
{
- Mutex::ScopedLock _lock(lock);
+ Variant::List list;
+ list.push_back(data.impl->asMap());
+ send(eventSendAddr, "", Protocol::OP_DATA_INDICATION, Protocol::CONTENT_EVENT, list);
}
void AgentImpl::run()
@@ -601,16 +667,35 @@ void AgentImpl::handleMethodRequest(const Message& message)
void AgentImpl::sendResponse(const Message& request, const string& opcode, const Data& data)
{
- sendResponse(request.getReplyTo(), request.getCorrelationId(), opcode, data);
+ send(request.getReplyTo(), request.getCorrelationId(), opcode, Protocol::CONTENT_NONE, data);
+}
+
+void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Data& data)
+{
+ Message message;
+ MapContent content(message, data.impl->asMap());
+
+ if (!correlationId.empty())
+ message.setCorrelationId(correlationId);
+ if (!cType.empty())
+ message.getHeaders()[Protocol::APP_CONTENT] = cType;
+ message.getHeaders()[Protocol::APP_OPCODE] = opcode;
+ content.encode();
+ session.createSender(address).send(message);
}
-void AgentImpl::sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data)
+void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Variant::List& list, bool partial)
{
Message message;
- MapContent content(message, data.asMap());
+ ListContent content(message, list);
- message.setCorrelationId(correlationId);
+ if (!correlationId.empty())
+ message.setCorrelationId(correlationId);
+ if (!cType.empty())
+ message.getHeaders()[Protocol::APP_CONTENT] = cType;
message.getHeaders()[Protocol::APP_OPCODE] = opcode;
+ if (partial)
+ message.getHeaders()[Protocol::APP_PARTIAL] = Variant();
content.encode();
session.createSender(address).send(message);
}
diff --git a/qpid/cpp/src/qmf/engine/DataImpl.cpp b/qpid/cpp/src/qmf/engine/DataImpl.cpp
index 7eced7e504..ab7d5f2178 100644
--- a/qpid/cpp/src/qmf/engine/DataImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/DataImpl.cpp
@@ -27,27 +27,44 @@ using namespace qpid::sys;
using namespace qpid::messaging;
DataImpl::DataImpl() :
- objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
+ objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime),
+ manager(0)
{
}
DataImpl::DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map& v) :
values(v), objectClass(type), createTime(uint64_t(Duration(now()))),
- destroyTime(0), lastUpdatedTime(createTime)
+ destroyTime(0), lastUpdatedTime(createTime), manager(0)
{
}
-void DataImpl::touch()
+void DataImpl::modifyStart()
{
+ Mutex::ScopedLock _lock(lock);
lastUpdatedTime = uint64_t(Duration(now()));
+ if (manager != 0)
+ manager->modifyStart(parent);
+}
+
+
+void DataImpl::modifyDone()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (manager != 0)
+ manager->modifyDone(parent);
}
void DataImpl::destroy()
{
+ Mutex::ScopedLock _lock(lock);
destroyTime = uint64_t(Duration(now()));
+ if (manager != 0)
+ manager->destroy(parent);
+ parent.reset();
+ manager = 0;
}
Variant::Map DataImpl::asMap() const
@@ -62,6 +79,18 @@ Variant::Map DataImpl::asMap() const
return map;
}
+Variant::Map DataImpl::asMapDelta(Data&) const
+{
+ Variant::Map map;
+ return map;
+}
+
+void DataImpl::registerManager(DataManager* m, DataPtr d)
+{
+ Mutex::ScopedLock _lock(lock);
+ manager = m;
+ parent = d;
+}
//==================================================================
// Wrappers
@@ -76,9 +105,8 @@ Variant::Map& Data::getValues() { return impl->getValues(); }
const Variant::Map& Data::getSubtypes() const { return impl->getSubtypes(); }
Variant::Map& Data::getSubtypes() { return impl->getSubtypes(); }
const SchemaClass* Data::getSchema() const { return impl->getSchema(); }
-void Data::setSchema(SchemaClass* schema) { impl->setSchema(schema); }
const char* Data::getKey() const { return impl->getKey(); }
void Data::setKey(const char* key) { impl->setKey(key); }
-void Data::touch() { impl->touch(); }
+void Data::modifyStart() { impl->modifyStart(); }
+void Data::modifyDone() { impl->modifyDone(); }
void Data::destroy() { impl->destroy(); }
-Variant::Map Data::asMap() const { return impl->asMap(); }
diff --git a/qpid/cpp/src/qmf/engine/DataImpl.h b/qpid/cpp/src/qmf/engine/DataImpl.h
index 92559ae634..3974aafde7 100644
--- a/qpid/cpp/src/qmf/engine/DataImpl.h
+++ b/qpid/cpp/src/qmf/engine/DataImpl.h
@@ -35,7 +35,18 @@ namespace engine {
typedef boost::shared_ptr<Data> DataPtr;
+ class DataManager {
+ public:
+ virtual ~DataManager() {}
+ virtual void modifyStart(DataPtr data) = 0;
+ virtual void modifyDone(DataPtr data) = 0;
+ virtual void destroy(DataPtr data) = 0;
+
+ };
+
struct DataImpl {
+ qpid::sys::Mutex lock;
+
/**
* Content of the object's data
*/
@@ -56,8 +67,15 @@ namespace engine {
uint64_t destroyTime;
uint64_t lastUpdatedTime;
+ DataManager* manager;
+ DataPtr parent;
+
DataImpl();
DataImpl(SchemaClass* type, const qpid::messaging::Variant::Map&);
+ DataImpl(const DataImpl& from) :
+ values(from.values), subtypes(from.subtypes), objectClass(from.objectClass),
+ key(from.key), createTime(from.createTime), destroyTime(from.destroyTime),
+ lastUpdatedTime(from.lastUpdatedTime), manager(0) {}
~DataImpl() {}
const qpid::messaging::Variant::Map& getValues() const { return values; }
@@ -67,15 +85,17 @@ namespace engine {
qpid::messaging::Variant::Map& getSubtypes() { return subtypes; }
const SchemaClass* getSchema() const { return objectClass; }
- void setSchema(SchemaClass* schema) { objectClass = schema; }
const char* getKey() const { return key.c_str(); }
void setKey(const char* _key) { key = _key; }
- void touch();
+ void modifyStart();
+ void modifyDone();
void destroy();
qpid::messaging::Variant::Map asMap() const;
+ qpid::messaging::Variant::Map asMapDelta(Data& base) const;
+ void registerManager(DataManager* manager, DataPtr data);
};
}
}