diff options
Diffstat (limited to 'qpid/cpp/src/qmf/engine/Agent.cpp')
-rw-r--r-- | qpid/cpp/src/qmf/engine/Agent.cpp | 131 |
1 files changed, 103 insertions, 28 deletions
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp index 22d53e93b7..c0c8b69bc8 100644 --- a/qpid/cpp/src/qmf/engine/Agent.cpp +++ b/qpid/cpp/src/qmf/engine/Agent.cpp @@ -31,6 +31,7 @@ #include <qpid/messaging/Receiver.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Message.h> +#include <qpid/messaging/MapView.h> #include <string> #include <deque> #include <map> @@ -83,6 +84,8 @@ namespace engine { AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore); ~AgentImpl(); + void setNotifyCallback(Agent::notifyCb handler); + void setNotifyCallback(Notifiable* handler); void setAttr(const char* key, const Variant& value); void setStoreDir(const char* path); void setTransferDir(const char* path); @@ -107,10 +110,12 @@ namespace engine { const string name; const string domain; string directAddr; - map<string, Variant> attrMap; + Variant::Map attrMap; string storeDir; string transferDir; bool internalStore; + Agent::notifyCb notifyHandler; + Notifiable* notifiable; Uuid systemId; uint16_t bootSequence; uint32_t nextContextNum; @@ -156,14 +161,20 @@ namespace engine { AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method, const string& key, boost::shared_ptr<Variant::Map> argMap, const SchemaClass* cls); - void handleRcvMessageLH(qpid::messaging::Message& message); + void notify(); + void handleRcvMessageLH(const Message& message); + void handleAgentLocateLH(const Message& message); + void handleQueryRequestLH(const Message& message); + void handleSubscribeRequest(const Message& message); + void handleSubscribeCancel(const Message& message); + void handleSubscribeRefresh(const Message& message); + void handleMethodRequest(const Message& message); void sendPackageIndicationLH(const string& packageName); void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key); void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq, uint32_t code = 0, const string& text = "OK"); void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text=""); - void handleAttachResponse(Message& msg); void handlePackageRequest(Message& msg); void handleClassQuery(Message& msg); void handleSchemaRequest(Message& msg, uint32_t sequence, @@ -198,12 +209,16 @@ AgentEvent AgentEventImpl::copy() AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) : vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i), + notifyHandler(0), notifiable(0), bootSequence(1), nextContextNum(1), running(true), thread(0) { directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name; if (_d == 0) { directAddr += " { create:always }"; } + attrMap["vendor"] = vendor; + attrMap["product"] = product; + attrMap["name"] = name; } @@ -211,9 +226,22 @@ AgentImpl::~AgentImpl() { } +void AgentImpl::setNotifyCallback(Agent::notifyCb handler) +{ + Mutex::ScopedLock _lock(lock); + notifyHandler = handler; +} + +void AgentImpl::setNotifyCallback(Notifiable* handler) +{ + Mutex::ScopedLock _lock(lock); + notifiable = handler; +} + void AgentImpl::setAttr(const char* key, const Variant& value) { - attrMap.insert(pair<string, Variant>(key, value)); + Mutex::ScopedLock _lock(lock); + attrMap[key] = value; } void AgentImpl::setStoreDir(const char* path) @@ -234,29 +262,6 @@ void AgentImpl::setTransferDir(const char* path) transferDir.clear(); } -/* -void AgentImpl::handleRcvMessage(Message& message) -{ - Buffer inBuffer(message.body, message.length); - uint8_t opcode; - uint32_t sequence; - string replyToExchange(message.replyExchange ? message.replyExchange : ""); - string replyToKey(message.replyKey ? message.replyKey : ""); - string userId(message.userId ? message.userId : ""); - - while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer); - else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); - else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication(); - else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId); - else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId); - else { - QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode); - break; - } - } -} -*/ bool AgentImpl::getEvent(AgentEvent& event) const { @@ -277,9 +282,18 @@ void AgentImpl::popEvent() void AgentImpl::setConnection(Connection& conn) { Mutex::ScopedLock _lock(lock); + + // + // Don't permit the overwriting of an existing connection + // TODO: return an error or throw an exception if an overwrite is attempted. + // if (connection == 0) return; connection = conn; + + // + // Start the Agent thread now that we have a connection to work with. + // thread = new qpid::sys::Thread(*this); } @@ -384,6 +398,61 @@ void AgentImpl::stop() running = false; } +void AgentImpl::handleRcvMessageLH(const Message& message) +{ + Variant::Map headers(message.getHeaders()); + cout << "AgentImpl::handleRcvMessageLH headers=" << headers << endl; + + if (message.getContentType() != Protocol::AMQP_CONTENT_MAP) + return; + + Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE); + if (iter == headers.end()) + return; + string opcode = iter->second.asString(); + + if (opcode == Protocol::OP_AGENT_LOCATE_REQUEST) handleAgentLocateLH(message); + if (opcode == Protocol::OP_QUERY_REQUEST) handleQueryRequestLH(message); + if (opcode == Protocol::OP_SUBSCRIBE_REQUEST) handleSubscribeRequest(message); + if (opcode == Protocol::OP_SUBSCRIBE_CANCEL_INDICATION) handleSubscribeCancel(message); + if (opcode == Protocol::OP_SUBSCRIBE_REFRESH_REQUEST) handleSubscribeRefresh(message); + if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(message); +} + +void AgentImpl::handleAgentLocateLH(const Message& message) +{ + const MapView predicate(message); + + //if (predicateMatches(predicate, attrMap)) { + // sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, attrMap); + //} +} + +void AgentImpl::handleQueryRequestLH(const Message& message) +{ + const MapView map(message); +} + +void AgentImpl::handleSubscribeRequest(const Message& message) +{ + const MapView map(message); +} + +void AgentImpl::handleSubscribeCancel(const Message& message) +{ + const MapView map(message); +} + +void AgentImpl::handleSubscribeRefresh(const Message& message) +{ + const MapView map(message); +} + +void AgentImpl::handleMethodRequest(const Message& message) +{ + const MapView map(message); +} + AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); @@ -407,8 +476,12 @@ AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, c return event; } -void AgentImpl::handleRcvMessageLH(qpid::messaging::Message& /*msg*/) +void AgentImpl::notify() { + if (notifyHandler != 0) + notifyHandler(); + if (notifiable != 0) + notifiable->notify(); } void AgentImpl::sendPackageIndicationLH(const string& packageName) @@ -471,6 +544,8 @@ void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const s Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); } Agent::~Agent() { delete impl; } +void Agent::setNotifyCallback(notifyCb handler) { impl->setNotifyCallback(handler); } +void Agent::setNotifyCallback(Notifiable* handler) { impl->setNotifyCallback(handler); } void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); } void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); } void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); } |