diff options
author | Ted Ross <tross@apache.org> | 2009-08-31 20:18:48 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-08-31 20:18:48 +0000 |
commit | 13b692aac42bc0e896a31c176daf79920a82ea5e (patch) | |
tree | a2b1b18ae6e3667afae4ce5f7d3331f7d9188057 /cpp/src/qmf/AgentEngine.cpp | |
parent | a7d34ad1929e3d63e5cab290090d60920dfdd32c (diff) | |
download | qpid-python-13b692aac42bc0e896a31c176daf79920a82ea5e.tar.gz |
Added protocol module for codepoint definitions and header handling.
Fixed a deadlock case in ResilientConnection.
Added more code to the ConsoleEngine implementation.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@809728 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/AgentEngine.cpp')
-rw-r--r-- | cpp/src/qmf/AgentEngine.cpp | 190 |
1 files changed, 43 insertions, 147 deletions
diff --git a/cpp/src/qmf/AgentEngine.cpp b/cpp/src/qmf/AgentEngine.cpp index ec5b117337..d3204042d5 100644 --- a/cpp/src/qmf/AgentEngine.cpp +++ b/cpp/src/qmf/AgentEngine.cpp @@ -25,6 +25,7 @@ #include "qmf/ObjectIdImpl.h" #include "qmf/QueryImpl.h" #include "qmf/ValueImpl.h" +#include "qmf/Protocol.h" #include <qpid/framing/Buffer.h> #include <qpid/framing/Uuid.h> #include <qpid/framing/FieldTable.h> @@ -172,8 +173,6 @@ namespace qmf { map<string, ClassMaps> packages; - bool checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq); - void encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq = 0); AgentEventImpl::Ptr eventDeclareQueue(const string& queueName); AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); AgentEventImpl::Ptr eventSetupComplete(); @@ -268,12 +267,16 @@ void AgentEngineImpl::handleRcvMessage(Message& message) string replyToKey(message.replyKey ? message.replyKey : ""); string userId(message.userId ? message.userId : ""); - if (checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == 'a') handleAttachResponse(inBuffer); - else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); - else if (opcode == 'x') handleConsoleAddedIndication(); - else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey, userId); - else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey, userId); + while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) { + if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer); + else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); + else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication(); + else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId); + else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId); + else { + QPID_LOG(error, "AgentEngineImpl::handleRcvMessage invalid opcode=" << opcode); + break; + } } } @@ -325,7 +328,7 @@ void AgentEngineImpl::startProtocol() char rawbuffer[512]; Buffer buffer(rawbuffer, 512); - encodeHeader(buffer, 'A'); + Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST); buffer.putShortString("qmfa"); systemId.encode(buffer); buffer.putLong(requestedBrokerBank); @@ -340,7 +343,7 @@ void AgentEngineImpl::heartbeat() Mutex::ScopedLock _lock(lock); Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'h'); + Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION); buffer.putLongLong(uint64_t(Duration(now()))); stringstream key; key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; @@ -349,7 +352,7 @@ void AgentEngineImpl::heartbeat() } void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, - const Value& argMap) + const Value& argMap) { Mutex::ScopedLock _lock(lock); map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); @@ -359,7 +362,7 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t contextMap.erase(iter); Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'm', context->sequence); + Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence); buffer.putLong(status); buffer.putMediumString(text); if (status == 0) { @@ -390,7 +393,7 @@ void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop AgentQueryContext::Ptr context = iter->second; Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'g', context->sequence); + Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence); object.impl->encodeSchemaKey(buffer); object.impl->encodeManagedObjectData(buffer); @@ -477,30 +480,6 @@ void AgentEngineImpl::raiseEvent(Event&) Mutex::ScopedLock _lock(lock); } -void AgentEngineImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) -{ - buf.putOctet('A'); - buf.putOctet('M'); - buf.putOctet('3'); - buf.putOctet(opcode); - buf.putLong (seq); -} - -bool AgentEngineImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) -{ - if (buf.getSize() < 8) - return false; - - uint8_t h1 = buf.getOctet(); - uint8_t h2 = buf.getOctet(); - uint8_t h3 = buf.getOctet(); - - *opcode = buf.getOctet(); - *seq = buf.getLong(); - - return h1 == 'A' && h2 == 'M' && h3 == '3'; -} - AgentEventImpl::Ptr AgentEngineImpl::eventDeclareQueue(const string& name) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE)); @@ -570,7 +549,7 @@ void AgentEngineImpl::sendBufferLH(Buffer& buf, const string& destination, const void AgentEngineImpl::sendPackageIndicationLH(const string& packageName) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'p'); + Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION); buffer.putShortString(packageName); sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName); @@ -579,7 +558,7 @@ void AgentEngineImpl::sendPackageIndicationLH(const string& packageName) void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'q'); + Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION); buffer.putOctet((int) kind); buffer.putShortString(packageName); buffer.putShortString(key.name); @@ -592,7 +571,7 @@ void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string uint32_t sequence, uint32_t code, const string& text) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'z', sequence); + Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence); buffer.putLong(code); buffer.putShortString(text); sendBufferLH(buffer, exchange, replyToKey); @@ -602,7 +581,7 @@ void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string void AgentEngineImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'm', sequence); + Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence); buffer.putLong(code); string fulltext; @@ -710,7 +689,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, if (ocIter != cMap.objectClasses.end()) { SchemaObjectClassImpl* oImpl = ocIter->second; Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 's', sequence); + Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); oImpl->encode(buffer); sendBufferLH(buffer, rExchange, rKey); QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name); @@ -721,7 +700,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, if (ecIter != cMap.eventClasses.end()) { SchemaEventClassImpl* eImpl = ecIter->second; Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 's', sequence); + Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); eImpl->encode(buffer); sendBufferLH(buffer, rExchange, rKey); QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name); @@ -851,108 +830,25 @@ void AgentEngineImpl::handleConsoleAddedIndication() // Wrappers //================================================================== -AgentEngine::AgentEngine(char* label, bool internalStore) -{ - impl = new AgentEngineImpl(label, internalStore); -} - -AgentEngine::~AgentEngine() -{ - delete impl; -} - -void AgentEngine::setStoreDir(const char* path) -{ - impl->setStoreDir(path); -} - -void AgentEngine::setTransferDir(const char* path) -{ - impl->setTransferDir(path); -} - -void AgentEngine::handleRcvMessage(Message& message) -{ - impl->handleRcvMessage(message); -} - -bool AgentEngine::getXmtMessage(Message& item) const -{ - return impl->getXmtMessage(item); -} - -void AgentEngine::popXmt() -{ - impl->popXmt(); -} - -bool AgentEngine::getEvent(AgentEvent& event) const -{ - return impl->getEvent(event); -} - -void AgentEngine::popEvent() -{ - impl->popEvent(); -} - -void AgentEngine::newSession() -{ - impl->newSession(); -} - -void AgentEngine::startProtocol() -{ - impl->startProtocol(); -} - -void AgentEngine::heartbeat() -{ - impl->heartbeat(); -} - -void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) -{ - impl->methodResponse(sequence, status, text, arguments); -} - -void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) -{ - impl->queryResponse(sequence, object, prop, stat); -} - -void AgentEngine::queryComplete(uint32_t sequence) -{ - impl->queryComplete(sequence); -} - -void AgentEngine::registerClass(SchemaObjectClass* cls) -{ - impl->registerClass(cls); -} - -void AgentEngine::registerClass(SchemaEventClass* cls) -{ - impl->registerClass(cls); -} - -const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) -{ - return impl->addObject(obj, persistId); -} - -const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) -{ - return impl->allocObjectId(persistId); -} - -const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) -{ - return impl->allocObjectId(persistIdLo, persistIdHi); -} - -void AgentEngine::raiseEvent(Event& event) -{ - impl->raiseEvent(event); -} +AgentEngine::AgentEngine(char* label, bool internalStore) { impl = new AgentEngineImpl(label, internalStore); } +AgentEngine::~AgentEngine() { delete impl; } +void AgentEngine::setStoreDir(const char* path) { impl->setStoreDir(path); } +void AgentEngine::setTransferDir(const char* path) { impl->setTransferDir(path); } +void AgentEngine::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } +bool AgentEngine::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } +void AgentEngine::popXmt() { impl->popXmt(); } +bool AgentEngine::getEvent(AgentEvent& event) const { return impl->getEvent(event); } +void AgentEngine::popEvent() { impl->popEvent(); } +void AgentEngine::newSession() { impl->newSession(); } +void AgentEngine::startProtocol() { impl->startProtocol(); } +void AgentEngine::heartbeat() { impl->heartbeat(); } +void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); } +void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); } +void AgentEngine::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); } +void AgentEngine::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); } +void AgentEngine::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); } +const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); } +const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); } +const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); } +void AgentEngine::raiseEvent(Event& event) { impl->raiseEvent(event); } |