summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/AgentEngine.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/AgentEngine.cpp')
-rw-r--r--qpid/cpp/src/qmf/AgentEngine.cpp249
1 files changed, 74 insertions, 175 deletions
diff --git a/qpid/cpp/src/qmf/AgentEngine.cpp b/qpid/cpp/src/qmf/AgentEngine.cpp
index bef8b3d102..9ea3be5907 100644
--- a/qpid/cpp/src/qmf/AgentEngine.cpp
+++ b/qpid/cpp/src/qmf/AgentEngine.cpp
@@ -25,6 +25,7 @@
#include "qmf/ObjectIdImpl.h"
#include "qmf/QueryImpl.h"
#include "qmf/ValueImpl.h"
+#include "qmf/Protocol.h"
#include <qpid/framing/Buffer.h>
#include <qpid/framing/Uuid.h>
#include <qpid/framing/FieldTable.h>
@@ -56,7 +57,7 @@ namespace qmf {
string name;
Object* object;
boost::shared_ptr<ObjectId> objectId;
- Query query;
+ boost::shared_ptr<Query> query;
boost::shared_ptr<Value> arguments;
string exchange;
string bindingKey;
@@ -85,9 +86,9 @@ namespace qmf {
void setStoreDir(const char* path);
void setTransferDir(const char* path);
void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item);
+ bool getXmtMessage(Message& item) const;
void popXmt();
- bool getEvent(AgentEvent& event);
+ bool getEvent(AgentEvent& event) const;
void popEvent();
void newSession();
void startProtocol();
@@ -103,7 +104,7 @@ namespace qmf {
void raiseEvent(Event& event);
private:
- Mutex lock;
+ mutable Mutex lock;
Mutex addLock;
string label;
string queueName;
@@ -134,13 +135,13 @@ namespace qmf {
# define MA_BUFFER_SIZE 65536
char outputBuffer[MA_BUFFER_SIZE];
- struct SchemaClassKey {
+ struct AgentClassKey {
string name;
uint8_t hash[16];
- SchemaClassKey(const string& n, const uint8_t* h) : name(n) {
+ AgentClassKey(const string& n, const uint8_t* h) : name(n) {
memcpy(hash, h, 16);
}
- SchemaClassKey(Buffer& buffer) {
+ AgentClassKey(Buffer& buffer) {
buffer.getShortString(name);
buffer.getBin128(hash);
}
@@ -149,8 +150,8 @@ namespace qmf {
}
};
- struct SchemaClassKeyComp {
- bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+ struct AgentClassKeyComp {
+ bool operator() (const AgentClassKey& lhs, const AgentClassKey& rhs) const
{
if (lhs.name != rhs.name)
return lhs.name < rhs.name;
@@ -162,8 +163,8 @@ namespace qmf {
}
};
- typedef map<SchemaClassKey, SchemaObjectClassImpl*, SchemaClassKeyComp> ObjectClassMap;
- typedef map<SchemaClassKey, SchemaEventClassImpl*, SchemaClassKeyComp> EventClassMap;
+ typedef map<AgentClassKey, SchemaObjectClassImpl*, AgentClassKeyComp> ObjectClassMap;
+ typedef map<AgentClassKey, SchemaEventClassImpl*, AgentClassKeyComp> EventClassMap;
struct ClassMaps {
ObjectClassMap objectClasses;
@@ -172,8 +173,6 @@ namespace qmf {
map<string, ClassMaps> packages;
- bool checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq = 0);
AgentEventImpl::Ptr eventDeclareQueue(const string& queueName);
AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
AgentEventImpl::Ptr eventSetupComplete();
@@ -185,7 +184,7 @@ namespace qmf {
void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
void sendPackageIndicationLH(const string& packageName);
- void sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key);
+ void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
uint32_t code = 0, const string& text = "OK");
void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
@@ -215,7 +214,7 @@ AgentEvent AgentEventImpl::copy()
item.sequence = sequence;
item.object = object;
item.objectId = objectId.get();
- item.query = &query;
+ item.query = query.get();
item.arguments = arguments.get();
item.objectClass = objectClass;
@@ -268,16 +267,20 @@ void AgentEngineImpl::handleRcvMessage(Message& message)
string replyToKey(message.replyKey ? message.replyKey : "");
string userId(message.userId ? message.userId : "");
- if (checkHeader(inBuffer, &opcode, &sequence)) {
- if (opcode == 'a') handleAttachResponse(inBuffer);
- else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
- else if (opcode == 'x') handleConsoleAddedIndication();
- else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey, userId);
- else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey, userId);
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
+ if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer);
+ else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
+ else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication();
+ else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId);
+ else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId);
+ else {
+ QPID_LOG(error, "AgentEngineImpl::handleRcvMessage invalid opcode=" << opcode);
+ break;
+ }
}
}
-bool AgentEngineImpl::getXmtMessage(Message& item)
+bool AgentEngineImpl::getXmtMessage(Message& item) const
{
Mutex::ScopedLock _lock(lock);
if (xmtQueue.empty())
@@ -293,7 +296,7 @@ void AgentEngineImpl::popXmt()
xmtQueue.pop_front();
}
-bool AgentEngineImpl::getEvent(AgentEvent& event)
+bool AgentEngineImpl::getEvent(AgentEvent& event) const
{
Mutex::ScopedLock _lock(lock);
if (eventQueue.empty())
@@ -325,7 +328,7 @@ void AgentEngineImpl::startProtocol()
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
- encodeHeader(buffer, 'A');
+ Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST);
buffer.putShortString("qmfa");
systemId.encode(buffer);
buffer.putLong(requestedBrokerBank);
@@ -340,7 +343,7 @@ void AgentEngineImpl::heartbeat()
Mutex::ScopedLock _lock(lock);
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'h');
+ Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION);
buffer.putLongLong(uint64_t(Duration(now())));
stringstream key;
key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
@@ -349,7 +352,7 @@ void AgentEngineImpl::heartbeat()
}
void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
- const Value& argMap)
+ const Value& argMap)
{
Mutex::ScopedLock _lock(lock);
map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
@@ -359,7 +362,7 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t
contextMap.erase(iter);
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'm', context->sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence);
buffer.putLong(status);
buffer.putMediumString(text);
if (status == 0) {
@@ -378,7 +381,7 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t
}
}
sendBufferLH(buffer, context->exchange, context->key);
- QPID_LOG(trace, "SENT MethodResponse");
+ QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
}
void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
@@ -390,7 +393,7 @@ void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop
AgentQueryContext::Ptr context = iter->second;
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'g', context->sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence);
object.impl->encodeSchemaKey(buffer);
object.impl->encodeManagedObjectData(buffer);
@@ -400,7 +403,7 @@ void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop
object.impl->encodeStatistics(buffer);
sendBufferLH(buffer, context->exchange, context->key);
- QPID_LOG(trace, "SENT ContentIndication");
+ QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
}
void AgentEngineImpl::queryComplete(uint32_t sequence)
@@ -423,11 +426,11 @@ void AgentEngineImpl::registerClass(SchemaObjectClass* cls)
map<string, ClassMaps>::iterator iter = packages.find(impl->package);
if (iter == packages.end()) {
packages[impl->package] = ClassMaps();
- iter = packages.find(impl->package);
+ iter = packages.find(impl->getClassKey()->getPackageName());
// TODO: Indicate this package if connected
}
- SchemaClassKey key(impl->name, impl->getHash());
+ AgentClassKey key(impl->getClassKey()->getClassName(), impl->getClassKey()->getHash());
iter->second.objectClasses[key] = impl;
// TODO: Indicate this schema if connected.
@@ -441,11 +444,11 @@ void AgentEngineImpl::registerClass(SchemaEventClass* cls)
map<string, ClassMaps>::iterator iter = packages.find(impl->package);
if (iter == packages.end()) {
packages[impl->package] = ClassMaps();
- iter = packages.find(impl->package);
+ iter = packages.find(impl->getClassKey()->getPackageName());
// TODO: Indicate this package if connected
}
- SchemaClassKey key(impl->name, impl->getHash());
+ AgentClassKey key(impl->getClassKey()->getClassName(), impl->getClassKey()->getHash());
iter->second.eventClasses[key] = impl;
// TODO: Indicate this schema if connected.
@@ -477,30 +480,6 @@ void AgentEngineImpl::raiseEvent(Event&)
Mutex::ScopedLock _lock(lock);
}
-void AgentEngineImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
-{
- buf.putOctet('A');
- buf.putOctet('M');
- buf.putOctet('3');
- buf.putOctet(opcode);
- buf.putLong (seq);
-}
-
-bool AgentEngineImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
-{
- if (buf.getSize() < 8)
- return false;
-
- uint8_t h1 = buf.getOctet();
- uint8_t h2 = buf.getOctet();
- uint8_t h3 = buf.getOctet();
-
- *opcode = buf.getOctet();
- *seq = buf.getLong();
-
- return h1 == 'A' && h2 == 'M' && h3 == '3';
-}
-
AgentEventImpl::Ptr AgentEngineImpl::eventDeclareQueue(const string& name)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
@@ -532,9 +511,10 @@ AgentEventImpl::Ptr AgentEngineImpl::eventQuery(uint32_t num, const string& user
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
event->sequence = num;
event->authUserId = userId;
- event->query.impl->packageName = package;
- event->query.impl->className = cls;
- event->query.impl->oid = oid;
+ if (oid.get())
+ event->query.reset(new Query(oid.get()));
+ else
+ event->query.reset(new Query(cls.c_str(), package.c_str()));
return event;
}
@@ -570,16 +550,16 @@ void AgentEngineImpl::sendBufferLH(Buffer& buf, const string& destination, const
void AgentEngineImpl::sendPackageIndicationLH(const string& packageName)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'p');
+ Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION);
buffer.putShortString(packageName);
sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
}
-void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key)
+void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'q');
+ Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION);
buffer.putOctet((int) kind);
buffer.putShortString(packageName);
buffer.putShortString(key.name);
@@ -592,7 +572,7 @@ void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string
uint32_t sequence, uint32_t code, const string& text)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'z', sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence);
buffer.putLong(code);
buffer.putShortString(text);
sendBufferLH(buffer, exchange, replyToKey);
@@ -602,7 +582,7 @@ void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string
void AgentEngineImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'm', sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence);
buffer.putLong(code);
string fulltext;
@@ -690,7 +670,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
string rKey(replyKey);
string packageName;
inBuffer.getShortString(packageName);
- SchemaClassKey key(inBuffer);
+ AgentClassKey key(inBuffer);
if (rExchange.empty())
rExchange = QMF_EXCHANGE;
@@ -710,7 +690,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
if (ocIter != cMap.objectClasses.end()) {
SchemaObjectClassImpl* oImpl = ocIter->second;
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 's', sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
oImpl->encode(buffer);
sendBufferLH(buffer, rExchange, rKey);
QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
@@ -721,7 +701,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
if (ecIter != cMap.eventClasses.end()) {
SchemaEventClassImpl* eImpl = ecIter->second;
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 's', sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
eImpl->encode(buffer);
sendBufferLH(buffer, rExchange, rKey);
QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
@@ -744,7 +724,7 @@ void AgentEngineImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const
ft.decode(inBuffer);
- QPID_LOG(trace, "RCVD GetQuery: map=" << ft);
+ QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft);
value = ft.get("_package");
if (value.get() && value->convertsTo<string>()) {
@@ -791,9 +771,11 @@ void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, con
ObjectIdImpl* oidImpl = new ObjectIdImpl(buffer);
boost::shared_ptr<ObjectId> oid(oidImpl->envelope);
buffer.getShortString(pname);
- SchemaClassKey classKey(buffer);
+ AgentClassKey classKey(buffer);
buffer.getShortString(method);
+ QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method);
+
map<string, ClassMaps>::const_iterator pIter = packages.find(pname);
if (pIter == packages.end()) {
sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname);
@@ -851,108 +833,25 @@ void AgentEngineImpl::handleConsoleAddedIndication()
// Wrappers
//==================================================================
-AgentEngine::AgentEngine(char* label, bool internalStore)
-{
- impl = new AgentEngineImpl(label, internalStore);
-}
-
-AgentEngine::~AgentEngine()
-{
- delete impl;
-}
-
-void AgentEngine::setStoreDir(const char* path)
-{
- impl->setStoreDir(path);
-}
-
-void AgentEngine::setTransferDir(const char* path)
-{
- impl->setTransferDir(path);
-}
-
-void AgentEngine::handleRcvMessage(Message& message)
-{
- impl->handleRcvMessage(message);
-}
-
-bool AgentEngine::getXmtMessage(Message& item)
-{
- return impl->getXmtMessage(item);
-}
-
-void AgentEngine::popXmt()
-{
- impl->popXmt();
-}
-
-bool AgentEngine::getEvent(AgentEvent& event)
-{
- return impl->getEvent(event);
-}
-
-void AgentEngine::popEvent()
-{
- impl->popEvent();
-}
-
-void AgentEngine::newSession()
-{
- impl->newSession();
-}
-
-void AgentEngine::startProtocol()
-{
- impl->startProtocol();
-}
-
-void AgentEngine::heartbeat()
-{
- impl->heartbeat();
-}
-
-void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments)
-{
- impl->methodResponse(sequence, status, text, arguments);
-}
-
-void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
-{
- impl->queryResponse(sequence, object, prop, stat);
-}
-
-void AgentEngine::queryComplete(uint32_t sequence)
-{
- impl->queryComplete(sequence);
-}
-
-void AgentEngine::registerClass(SchemaObjectClass* cls)
-{
- impl->registerClass(cls);
-}
-
-void AgentEngine::registerClass(SchemaEventClass* cls)
-{
- impl->registerClass(cls);
-}
-
-const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId)
-{
- return impl->addObject(obj, persistId);
-}
-
-const ObjectId* AgentEngine::allocObjectId(uint64_t persistId)
-{
- return impl->allocObjectId(persistId);
-}
-
-const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
-{
- return impl->allocObjectId(persistIdLo, persistIdHi);
-}
-
-void AgentEngine::raiseEvent(Event& event)
-{
- impl->raiseEvent(event);
-}
+AgentEngine::AgentEngine(char* label, bool internalStore) { impl = new AgentEngineImpl(label, internalStore); }
+AgentEngine::~AgentEngine() { delete impl; }
+void AgentEngine::setStoreDir(const char* path) { impl->setStoreDir(path); }
+void AgentEngine::setTransferDir(const char* path) { impl->setTransferDir(path); }
+void AgentEngine::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool AgentEngine::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void AgentEngine::popXmt() { impl->popXmt(); }
+bool AgentEngine::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
+void AgentEngine::popEvent() { impl->popEvent(); }
+void AgentEngine::newSession() { impl->newSession(); }
+void AgentEngine::startProtocol() { impl->startProtocol(); }
+void AgentEngine::heartbeat() { impl->heartbeat(); }
+void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); }
+void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); }
+void AgentEngine::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
+void AgentEngine::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); }
+void AgentEngine::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); }
+const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); }
+const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); }
+const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); }
+void AgentEngine::raiseEvent(Event& event) { impl->raiseEvent(event); }