summaryrefslogtreecommitdiff
path: root/cpp/src/qmf/AgentEngine.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-08-31 20:18:48 +0000
committerTed Ross <tross@apache.org>2009-08-31 20:18:48 +0000
commit13b692aac42bc0e896a31c176daf79920a82ea5e (patch)
treea2b1b18ae6e3667afae4ce5f7d3331f7d9188057 /cpp/src/qmf/AgentEngine.cpp
parenta7d34ad1929e3d63e5cab290090d60920dfdd32c (diff)
downloadqpid-python-13b692aac42bc0e896a31c176daf79920a82ea5e.tar.gz
Added protocol module for codepoint definitions and header handling.
Fixed a deadlock case in ResilientConnection. Added more code to the ConsoleEngine implementation. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@809728 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/AgentEngine.cpp')
-rw-r--r--cpp/src/qmf/AgentEngine.cpp190
1 files changed, 43 insertions, 147 deletions
diff --git a/cpp/src/qmf/AgentEngine.cpp b/cpp/src/qmf/AgentEngine.cpp
index ec5b117337..d3204042d5 100644
--- a/cpp/src/qmf/AgentEngine.cpp
+++ b/cpp/src/qmf/AgentEngine.cpp
@@ -25,6 +25,7 @@
#include "qmf/ObjectIdImpl.h"
#include "qmf/QueryImpl.h"
#include "qmf/ValueImpl.h"
+#include "qmf/Protocol.h"
#include <qpid/framing/Buffer.h>
#include <qpid/framing/Uuid.h>
#include <qpid/framing/FieldTable.h>
@@ -172,8 +173,6 @@ namespace qmf {
map<string, ClassMaps> packages;
- bool checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq = 0);
AgentEventImpl::Ptr eventDeclareQueue(const string& queueName);
AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
AgentEventImpl::Ptr eventSetupComplete();
@@ -268,12 +267,16 @@ void AgentEngineImpl::handleRcvMessage(Message& message)
string replyToKey(message.replyKey ? message.replyKey : "");
string userId(message.userId ? message.userId : "");
- if (checkHeader(inBuffer, &opcode, &sequence)) {
- if (opcode == 'a') handleAttachResponse(inBuffer);
- else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
- else if (opcode == 'x') handleConsoleAddedIndication();
- else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey, userId);
- else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey, userId);
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
+ if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer);
+ else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
+ else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication();
+ else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId);
+ else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId);
+ else {
+ QPID_LOG(error, "AgentEngineImpl::handleRcvMessage invalid opcode=" << opcode);
+ break;
+ }
}
}
@@ -325,7 +328,7 @@ void AgentEngineImpl::startProtocol()
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
- encodeHeader(buffer, 'A');
+ Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST);
buffer.putShortString("qmfa");
systemId.encode(buffer);
buffer.putLong(requestedBrokerBank);
@@ -340,7 +343,7 @@ void AgentEngineImpl::heartbeat()
Mutex::ScopedLock _lock(lock);
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'h');
+ Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION);
buffer.putLongLong(uint64_t(Duration(now())));
stringstream key;
key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
@@ -349,7 +352,7 @@ void AgentEngineImpl::heartbeat()
}
void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
- const Value& argMap)
+ const Value& argMap)
{
Mutex::ScopedLock _lock(lock);
map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
@@ -359,7 +362,7 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t
contextMap.erase(iter);
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'm', context->sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence);
buffer.putLong(status);
buffer.putMediumString(text);
if (status == 0) {
@@ -390,7 +393,7 @@ void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop
AgentQueryContext::Ptr context = iter->second;
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'g', context->sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence);
object.impl->encodeSchemaKey(buffer);
object.impl->encodeManagedObjectData(buffer);
@@ -477,30 +480,6 @@ void AgentEngineImpl::raiseEvent(Event&)
Mutex::ScopedLock _lock(lock);
}
-void AgentEngineImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
-{
- buf.putOctet('A');
- buf.putOctet('M');
- buf.putOctet('3');
- buf.putOctet(opcode);
- buf.putLong (seq);
-}
-
-bool AgentEngineImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
-{
- if (buf.getSize() < 8)
- return false;
-
- uint8_t h1 = buf.getOctet();
- uint8_t h2 = buf.getOctet();
- uint8_t h3 = buf.getOctet();
-
- *opcode = buf.getOctet();
- *seq = buf.getLong();
-
- return h1 == 'A' && h2 == 'M' && h3 == '3';
-}
-
AgentEventImpl::Ptr AgentEngineImpl::eventDeclareQueue(const string& name)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
@@ -570,7 +549,7 @@ void AgentEngineImpl::sendBufferLH(Buffer& buf, const string& destination, const
void AgentEngineImpl::sendPackageIndicationLH(const string& packageName)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'p');
+ Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION);
buffer.putShortString(packageName);
sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
@@ -579,7 +558,7 @@ void AgentEngineImpl::sendPackageIndicationLH(const string& packageName)
void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'q');
+ Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION);
buffer.putOctet((int) kind);
buffer.putShortString(packageName);
buffer.putShortString(key.name);
@@ -592,7 +571,7 @@ void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string
uint32_t sequence, uint32_t code, const string& text)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'z', sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence);
buffer.putLong(code);
buffer.putShortString(text);
sendBufferLH(buffer, exchange, replyToKey);
@@ -602,7 +581,7 @@ void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string
void AgentEngineImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'm', sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence);
buffer.putLong(code);
string fulltext;
@@ -710,7 +689,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
if (ocIter != cMap.objectClasses.end()) {
SchemaObjectClassImpl* oImpl = ocIter->second;
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 's', sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
oImpl->encode(buffer);
sendBufferLH(buffer, rExchange, rKey);
QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
@@ -721,7 +700,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
if (ecIter != cMap.eventClasses.end()) {
SchemaEventClassImpl* eImpl = ecIter->second;
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 's', sequence);
+ Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
eImpl->encode(buffer);
sendBufferLH(buffer, rExchange, rKey);
QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
@@ -851,108 +830,25 @@ void AgentEngineImpl::handleConsoleAddedIndication()
// Wrappers
//==================================================================
-AgentEngine::AgentEngine(char* label, bool internalStore)
-{
- impl = new AgentEngineImpl(label, internalStore);
-}
-
-AgentEngine::~AgentEngine()
-{
- delete impl;
-}
-
-void AgentEngine::setStoreDir(const char* path)
-{
- impl->setStoreDir(path);
-}
-
-void AgentEngine::setTransferDir(const char* path)
-{
- impl->setTransferDir(path);
-}
-
-void AgentEngine::handleRcvMessage(Message& message)
-{
- impl->handleRcvMessage(message);
-}
-
-bool AgentEngine::getXmtMessage(Message& item) const
-{
- return impl->getXmtMessage(item);
-}
-
-void AgentEngine::popXmt()
-{
- impl->popXmt();
-}
-
-bool AgentEngine::getEvent(AgentEvent& event) const
-{
- return impl->getEvent(event);
-}
-
-void AgentEngine::popEvent()
-{
- impl->popEvent();
-}
-
-void AgentEngine::newSession()
-{
- impl->newSession();
-}
-
-void AgentEngine::startProtocol()
-{
- impl->startProtocol();
-}
-
-void AgentEngine::heartbeat()
-{
- impl->heartbeat();
-}
-
-void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments)
-{
- impl->methodResponse(sequence, status, text, arguments);
-}
-
-void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
-{
- impl->queryResponse(sequence, object, prop, stat);
-}
-
-void AgentEngine::queryComplete(uint32_t sequence)
-{
- impl->queryComplete(sequence);
-}
-
-void AgentEngine::registerClass(SchemaObjectClass* cls)
-{
- impl->registerClass(cls);
-}
-
-void AgentEngine::registerClass(SchemaEventClass* cls)
-{
- impl->registerClass(cls);
-}
-
-const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId)
-{
- return impl->addObject(obj, persistId);
-}
-
-const ObjectId* AgentEngine::allocObjectId(uint64_t persistId)
-{
- return impl->allocObjectId(persistId);
-}
-
-const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
-{
- return impl->allocObjectId(persistIdLo, persistIdHi);
-}
-
-void AgentEngine::raiseEvent(Event& event)
-{
- impl->raiseEvent(event);
-}
+AgentEngine::AgentEngine(char* label, bool internalStore) { impl = new AgentEngineImpl(label, internalStore); }
+AgentEngine::~AgentEngine() { delete impl; }
+void AgentEngine::setStoreDir(const char* path) { impl->setStoreDir(path); }
+void AgentEngine::setTransferDir(const char* path) { impl->setTransferDir(path); }
+void AgentEngine::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool AgentEngine::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void AgentEngine::popXmt() { impl->popXmt(); }
+bool AgentEngine::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
+void AgentEngine::popEvent() { impl->popEvent(); }
+void AgentEngine::newSession() { impl->newSession(); }
+void AgentEngine::startProtocol() { impl->startProtocol(); }
+void AgentEngine::heartbeat() { impl->heartbeat(); }
+void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); }
+void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); }
+void AgentEngine::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
+void AgentEngine::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); }
+void AgentEngine::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); }
+const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); }
+const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); }
+const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); }
+void AgentEngine::raiseEvent(Event& event) { impl->raiseEvent(event); }