diff options
Diffstat (limited to 'qpid/cpp/src/qmf/AgentEngine.cpp')
-rw-r--r-- | qpid/cpp/src/qmf/AgentEngine.cpp | 249 |
1 files changed, 74 insertions, 175 deletions
diff --git a/qpid/cpp/src/qmf/AgentEngine.cpp b/qpid/cpp/src/qmf/AgentEngine.cpp index bef8b3d102..9ea3be5907 100644 --- a/qpid/cpp/src/qmf/AgentEngine.cpp +++ b/qpid/cpp/src/qmf/AgentEngine.cpp @@ -25,6 +25,7 @@ #include "qmf/ObjectIdImpl.h" #include "qmf/QueryImpl.h" #include "qmf/ValueImpl.h" +#include "qmf/Protocol.h" #include <qpid/framing/Buffer.h> #include <qpid/framing/Uuid.h> #include <qpid/framing/FieldTable.h> @@ -56,7 +57,7 @@ namespace qmf { string name; Object* object; boost::shared_ptr<ObjectId> objectId; - Query query; + boost::shared_ptr<Query> query; boost::shared_ptr<Value> arguments; string exchange; string bindingKey; @@ -85,9 +86,9 @@ namespace qmf { void setStoreDir(const char* path); void setTransferDir(const char* path); void handleRcvMessage(Message& message); - bool getXmtMessage(Message& item); + bool getXmtMessage(Message& item) const; void popXmt(); - bool getEvent(AgentEvent& event); + bool getEvent(AgentEvent& event) const; void popEvent(); void newSession(); void startProtocol(); @@ -103,7 +104,7 @@ namespace qmf { void raiseEvent(Event& event); private: - Mutex lock; + mutable Mutex lock; Mutex addLock; string label; string queueName; @@ -134,13 +135,13 @@ namespace qmf { # define MA_BUFFER_SIZE 65536 char outputBuffer[MA_BUFFER_SIZE]; - struct SchemaClassKey { + struct AgentClassKey { string name; uint8_t hash[16]; - SchemaClassKey(const string& n, const uint8_t* h) : name(n) { + AgentClassKey(const string& n, const uint8_t* h) : name(n) { memcpy(hash, h, 16); } - SchemaClassKey(Buffer& buffer) { + AgentClassKey(Buffer& buffer) { buffer.getShortString(name); buffer.getBin128(hash); } @@ -149,8 +150,8 @@ namespace qmf { } }; - struct SchemaClassKeyComp { - bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + struct AgentClassKeyComp { + bool operator() (const AgentClassKey& lhs, const AgentClassKey& rhs) const { if (lhs.name != rhs.name) return lhs.name < rhs.name; @@ -162,8 +163,8 @@ namespace qmf { } }; - typedef map<SchemaClassKey, SchemaObjectClassImpl*, SchemaClassKeyComp> ObjectClassMap; - typedef map<SchemaClassKey, SchemaEventClassImpl*, SchemaClassKeyComp> EventClassMap; + typedef map<AgentClassKey, SchemaObjectClassImpl*, AgentClassKeyComp> ObjectClassMap; + typedef map<AgentClassKey, SchemaEventClassImpl*, AgentClassKeyComp> EventClassMap; struct ClassMaps { ObjectClassMap objectClasses; @@ -172,8 +173,6 @@ namespace qmf { map<string, ClassMaps> packages; - bool checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq); - void encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq = 0); AgentEventImpl::Ptr eventDeclareQueue(const string& queueName); AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); AgentEventImpl::Ptr eventSetupComplete(); @@ -185,7 +184,7 @@ namespace qmf { void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); void sendPackageIndicationLH(const string& packageName); - void sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key); + void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key); void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq, uint32_t code = 0, const string& text = "OK"); void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text=""); @@ -215,7 +214,7 @@ AgentEvent AgentEventImpl::copy() item.sequence = sequence; item.object = object; item.objectId = objectId.get(); - item.query = &query; + item.query = query.get(); item.arguments = arguments.get(); item.objectClass = objectClass; @@ -268,16 +267,20 @@ void AgentEngineImpl::handleRcvMessage(Message& message) string replyToKey(message.replyKey ? message.replyKey : ""); string userId(message.userId ? message.userId : ""); - if (checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == 'a') handleAttachResponse(inBuffer); - else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); - else if (opcode == 'x') handleConsoleAddedIndication(); - else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey, userId); - else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey, userId); + while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) { + if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer); + else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); + else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication(); + else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId); + else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId); + else { + QPID_LOG(error, "AgentEngineImpl::handleRcvMessage invalid opcode=" << opcode); + break; + } } } -bool AgentEngineImpl::getXmtMessage(Message& item) +bool AgentEngineImpl::getXmtMessage(Message& item) const { Mutex::ScopedLock _lock(lock); if (xmtQueue.empty()) @@ -293,7 +296,7 @@ void AgentEngineImpl::popXmt() xmtQueue.pop_front(); } -bool AgentEngineImpl::getEvent(AgentEvent& event) +bool AgentEngineImpl::getEvent(AgentEvent& event) const { Mutex::ScopedLock _lock(lock); if (eventQueue.empty()) @@ -325,7 +328,7 @@ void AgentEngineImpl::startProtocol() char rawbuffer[512]; Buffer buffer(rawbuffer, 512); - encodeHeader(buffer, 'A'); + Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST); buffer.putShortString("qmfa"); systemId.encode(buffer); buffer.putLong(requestedBrokerBank); @@ -340,7 +343,7 @@ void AgentEngineImpl::heartbeat() Mutex::ScopedLock _lock(lock); Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'h'); + Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION); buffer.putLongLong(uint64_t(Duration(now()))); stringstream key; key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; @@ -349,7 +352,7 @@ void AgentEngineImpl::heartbeat() } void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, - const Value& argMap) + const Value& argMap) { Mutex::ScopedLock _lock(lock); map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); @@ -359,7 +362,7 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t contextMap.erase(iter); Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'm', context->sequence); + Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence); buffer.putLong(status); buffer.putMediumString(text); if (status == 0) { @@ -378,7 +381,7 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t } } sendBufferLH(buffer, context->exchange, context->key); - QPID_LOG(trace, "SENT MethodResponse"); + QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text); } void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) @@ -390,7 +393,7 @@ void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop AgentQueryContext::Ptr context = iter->second; Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'g', context->sequence); + Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence); object.impl->encodeSchemaKey(buffer); object.impl->encodeManagedObjectData(buffer); @@ -400,7 +403,7 @@ void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop object.impl->encodeStatistics(buffer); sendBufferLH(buffer, context->exchange, context->key); - QPID_LOG(trace, "SENT ContentIndication"); + QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence); } void AgentEngineImpl::queryComplete(uint32_t sequence) @@ -423,11 +426,11 @@ void AgentEngineImpl::registerClass(SchemaObjectClass* cls) map<string, ClassMaps>::iterator iter = packages.find(impl->package); if (iter == packages.end()) { packages[impl->package] = ClassMaps(); - iter = packages.find(impl->package); + iter = packages.find(impl->getClassKey()->getPackageName()); // TODO: Indicate this package if connected } - SchemaClassKey key(impl->name, impl->getHash()); + AgentClassKey key(impl->getClassKey()->getClassName(), impl->getClassKey()->getHash()); iter->second.objectClasses[key] = impl; // TODO: Indicate this schema if connected. @@ -441,11 +444,11 @@ void AgentEngineImpl::registerClass(SchemaEventClass* cls) map<string, ClassMaps>::iterator iter = packages.find(impl->package); if (iter == packages.end()) { packages[impl->package] = ClassMaps(); - iter = packages.find(impl->package); + iter = packages.find(impl->getClassKey()->getPackageName()); // TODO: Indicate this package if connected } - SchemaClassKey key(impl->name, impl->getHash()); + AgentClassKey key(impl->getClassKey()->getClassName(), impl->getClassKey()->getHash()); iter->second.eventClasses[key] = impl; // TODO: Indicate this schema if connected. @@ -477,30 +480,6 @@ void AgentEngineImpl::raiseEvent(Event&) Mutex::ScopedLock _lock(lock); } -void AgentEngineImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) -{ - buf.putOctet('A'); - buf.putOctet('M'); - buf.putOctet('3'); - buf.putOctet(opcode); - buf.putLong (seq); -} - -bool AgentEngineImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) -{ - if (buf.getSize() < 8) - return false; - - uint8_t h1 = buf.getOctet(); - uint8_t h2 = buf.getOctet(); - uint8_t h3 = buf.getOctet(); - - *opcode = buf.getOctet(); - *seq = buf.getLong(); - - return h1 == 'A' && h2 == 'M' && h3 == '3'; -} - AgentEventImpl::Ptr AgentEngineImpl::eventDeclareQueue(const string& name) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE)); @@ -532,9 +511,10 @@ AgentEventImpl::Ptr AgentEngineImpl::eventQuery(uint32_t num, const string& user AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); event->sequence = num; event->authUserId = userId; - event->query.impl->packageName = package; - event->query.impl->className = cls; - event->query.impl->oid = oid; + if (oid.get()) + event->query.reset(new Query(oid.get())); + else + event->query.reset(new Query(cls.c_str(), package.c_str())); return event; } @@ -570,16 +550,16 @@ void AgentEngineImpl::sendBufferLH(Buffer& buf, const string& destination, const void AgentEngineImpl::sendPackageIndicationLH(const string& packageName) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'p'); + Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION); buffer.putShortString(packageName); sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName); } -void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key) +void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'q'); + Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION); buffer.putOctet((int) kind); buffer.putShortString(packageName); buffer.putShortString(key.name); @@ -592,7 +572,7 @@ void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string uint32_t sequence, uint32_t code, const string& text) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'z', sequence); + Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence); buffer.putLong(code); buffer.putShortString(text); sendBufferLH(buffer, exchange, replyToKey); @@ -602,7 +582,7 @@ void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string void AgentEngineImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'm', sequence); + Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence); buffer.putLong(code); string fulltext; @@ -690,7 +670,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, string rKey(replyKey); string packageName; inBuffer.getShortString(packageName); - SchemaClassKey key(inBuffer); + AgentClassKey key(inBuffer); if (rExchange.empty()) rExchange = QMF_EXCHANGE; @@ -710,7 +690,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, if (ocIter != cMap.objectClasses.end()) { SchemaObjectClassImpl* oImpl = ocIter->second; Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 's', sequence); + Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); oImpl->encode(buffer); sendBufferLH(buffer, rExchange, rKey); QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name); @@ -721,7 +701,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, if (ecIter != cMap.eventClasses.end()) { SchemaEventClassImpl* eImpl = ecIter->second; Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 's', sequence); + Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); eImpl->encode(buffer); sendBufferLH(buffer, rExchange, rKey); QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name); @@ -744,7 +724,7 @@ void AgentEngineImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const ft.decode(inBuffer); - QPID_LOG(trace, "RCVD GetQuery: map=" << ft); + QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft); value = ft.get("_package"); if (value.get() && value->convertsTo<string>()) { @@ -791,9 +771,11 @@ void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, con ObjectIdImpl* oidImpl = new ObjectIdImpl(buffer); boost::shared_ptr<ObjectId> oid(oidImpl->envelope); buffer.getShortString(pname); - SchemaClassKey classKey(buffer); + AgentClassKey classKey(buffer); buffer.getShortString(method); + QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method); + map<string, ClassMaps>::const_iterator pIter = packages.find(pname); if (pIter == packages.end()) { sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname); @@ -851,108 +833,25 @@ void AgentEngineImpl::handleConsoleAddedIndication() // Wrappers //================================================================== -AgentEngine::AgentEngine(char* label, bool internalStore) -{ - impl = new AgentEngineImpl(label, internalStore); -} - -AgentEngine::~AgentEngine() -{ - delete impl; -} - -void AgentEngine::setStoreDir(const char* path) -{ - impl->setStoreDir(path); -} - -void AgentEngine::setTransferDir(const char* path) -{ - impl->setTransferDir(path); -} - -void AgentEngine::handleRcvMessage(Message& message) -{ - impl->handleRcvMessage(message); -} - -bool AgentEngine::getXmtMessage(Message& item) -{ - return impl->getXmtMessage(item); -} - -void AgentEngine::popXmt() -{ - impl->popXmt(); -} - -bool AgentEngine::getEvent(AgentEvent& event) -{ - return impl->getEvent(event); -} - -void AgentEngine::popEvent() -{ - impl->popEvent(); -} - -void AgentEngine::newSession() -{ - impl->newSession(); -} - -void AgentEngine::startProtocol() -{ - impl->startProtocol(); -} - -void AgentEngine::heartbeat() -{ - impl->heartbeat(); -} - -void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) -{ - impl->methodResponse(sequence, status, text, arguments); -} - -void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) -{ - impl->queryResponse(sequence, object, prop, stat); -} - -void AgentEngine::queryComplete(uint32_t sequence) -{ - impl->queryComplete(sequence); -} - -void AgentEngine::registerClass(SchemaObjectClass* cls) -{ - impl->registerClass(cls); -} - -void AgentEngine::registerClass(SchemaEventClass* cls) -{ - impl->registerClass(cls); -} - -const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) -{ - return impl->addObject(obj, persistId); -} - -const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) -{ - return impl->allocObjectId(persistId); -} - -const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) -{ - return impl->allocObjectId(persistIdLo, persistIdHi); -} - -void AgentEngine::raiseEvent(Event& event) -{ - impl->raiseEvent(event); -} +AgentEngine::AgentEngine(char* label, bool internalStore) { impl = new AgentEngineImpl(label, internalStore); } +AgentEngine::~AgentEngine() { delete impl; } +void AgentEngine::setStoreDir(const char* path) { impl->setStoreDir(path); } +void AgentEngine::setTransferDir(const char* path) { impl->setTransferDir(path); } +void AgentEngine::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } +bool AgentEngine::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } +void AgentEngine::popXmt() { impl->popXmt(); } +bool AgentEngine::getEvent(AgentEvent& event) const { return impl->getEvent(event); } +void AgentEngine::popEvent() { impl->popEvent(); } +void AgentEngine::newSession() { impl->newSession(); } +void AgentEngine::startProtocol() { impl->startProtocol(); } +void AgentEngine::heartbeat() { impl->heartbeat(); } +void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); } +void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); } +void AgentEngine::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); } +void AgentEngine::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); } +void AgentEngine::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); } +const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); } +const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); } +const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); } +void AgentEngine::raiseEvent(Event& event) { impl->raiseEvent(event); } |