summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/engine/Agent.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/engine/Agent.cpp')
-rw-r--r--qpid/cpp/src/qmf/engine/Agent.cpp131
1 files changed, 103 insertions, 28 deletions
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp
index 22d53e93b7..c0c8b69bc8 100644
--- a/qpid/cpp/src/qmf/engine/Agent.cpp
+++ b/qpid/cpp/src/qmf/engine/Agent.cpp
@@ -31,6 +31,7 @@
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Message.h>
+#include <qpid/messaging/MapView.h>
#include <string>
#include <deque>
#include <map>
@@ -83,6 +84,8 @@ namespace engine {
AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore);
~AgentImpl();
+ void setNotifyCallback(Agent::notifyCb handler);
+ void setNotifyCallback(Notifiable* handler);
void setAttr(const char* key, const Variant& value);
void setStoreDir(const char* path);
void setTransferDir(const char* path);
@@ -107,10 +110,12 @@ namespace engine {
const string name;
const string domain;
string directAddr;
- map<string, Variant> attrMap;
+ Variant::Map attrMap;
string storeDir;
string transferDir;
bool internalStore;
+ Agent::notifyCb notifyHandler;
+ Notifiable* notifiable;
Uuid systemId;
uint16_t bootSequence;
uint32_t nextContextNum;
@@ -156,14 +161,20 @@ namespace engine {
AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
const string& key, boost::shared_ptr<Variant::Map> argMap,
const SchemaClass* cls);
- void handleRcvMessageLH(qpid::messaging::Message& message);
+ void notify();
+ void handleRcvMessageLH(const Message& message);
+ void handleAgentLocateLH(const Message& message);
+ void handleQueryRequestLH(const Message& message);
+ void handleSubscribeRequest(const Message& message);
+ void handleSubscribeCancel(const Message& message);
+ void handleSubscribeRefresh(const Message& message);
+ void handleMethodRequest(const Message& message);
void sendPackageIndicationLH(const string& packageName);
void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
uint32_t code = 0, const string& text = "OK");
void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
- void handleAttachResponse(Message& msg);
void handlePackageRequest(Message& msg);
void handleClassQuery(Message& msg);
void handleSchemaRequest(Message& msg, uint32_t sequence,
@@ -198,12 +209,16 @@ AgentEvent AgentEventImpl::copy()
AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
+ notifyHandler(0), notifiable(0),
bootSequence(1), nextContextNum(1), running(true), thread(0)
{
directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
if (_d == 0) {
directAddr += " { create:always }";
}
+ attrMap["vendor"] = vendor;
+ attrMap["product"] = product;
+ attrMap["name"] = name;
}
@@ -211,9 +226,22 @@ AgentImpl::~AgentImpl()
{
}
+void AgentImpl::setNotifyCallback(Agent::notifyCb handler)
+{
+ Mutex::ScopedLock _lock(lock);
+ notifyHandler = handler;
+}
+
+void AgentImpl::setNotifyCallback(Notifiable* handler)
+{
+ Mutex::ScopedLock _lock(lock);
+ notifiable = handler;
+}
+
void AgentImpl::setAttr(const char* key, const Variant& value)
{
- attrMap.insert(pair<string, Variant>(key, value));
+ Mutex::ScopedLock _lock(lock);
+ attrMap[key] = value;
}
void AgentImpl::setStoreDir(const char* path)
@@ -234,29 +262,6 @@ void AgentImpl::setTransferDir(const char* path)
transferDir.clear();
}
-/*
-void AgentImpl::handleRcvMessage(Message& message)
-{
- Buffer inBuffer(message.body, message.length);
- uint8_t opcode;
- uint32_t sequence;
- string replyToExchange(message.replyExchange ? message.replyExchange : "");
- string replyToKey(message.replyKey ? message.replyKey : "");
- string userId(message.userId ? message.userId : "");
-
- while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
- if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer);
- else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
- else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication();
- else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId);
- else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId);
- else {
- QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode);
- break;
- }
- }
-}
-*/
bool AgentImpl::getEvent(AgentEvent& event) const
{
@@ -277,9 +282,18 @@ void AgentImpl::popEvent()
void AgentImpl::setConnection(Connection& conn)
{
Mutex::ScopedLock _lock(lock);
+
+ //
+ // Don't permit the overwriting of an existing connection
+ // TODO: return an error or throw an exception if an overwrite is attempted.
+ //
if (connection == 0)
return;
connection = conn;
+
+ //
+ // Start the Agent thread now that we have a connection to work with.
+ //
thread = new qpid::sys::Thread(*this);
}
@@ -384,6 +398,61 @@ void AgentImpl::stop()
running = false;
}
+void AgentImpl::handleRcvMessageLH(const Message& message)
+{
+ Variant::Map headers(message.getHeaders());
+ cout << "AgentImpl::handleRcvMessageLH headers=" << headers << endl;
+
+ if (message.getContentType() != Protocol::AMQP_CONTENT_MAP)
+ return;
+
+ Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE);
+ if (iter == headers.end())
+ return;
+ string opcode = iter->second.asString();
+
+ if (opcode == Protocol::OP_AGENT_LOCATE_REQUEST) handleAgentLocateLH(message);
+ if (opcode == Protocol::OP_QUERY_REQUEST) handleQueryRequestLH(message);
+ if (opcode == Protocol::OP_SUBSCRIBE_REQUEST) handleSubscribeRequest(message);
+ if (opcode == Protocol::OP_SUBSCRIBE_CANCEL_INDICATION) handleSubscribeCancel(message);
+ if (opcode == Protocol::OP_SUBSCRIBE_REFRESH_REQUEST) handleSubscribeRefresh(message);
+ if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(message);
+}
+
+void AgentImpl::handleAgentLocateLH(const Message& message)
+{
+ const MapView predicate(message);
+
+ //if (predicateMatches(predicate, attrMap)) {
+ // sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, attrMap);
+ //}
+}
+
+void AgentImpl::handleQueryRequestLH(const Message& message)
+{
+ const MapView map(message);
+}
+
+void AgentImpl::handleSubscribeRequest(const Message& message)
+{
+ const MapView map(message);
+}
+
+void AgentImpl::handleSubscribeCancel(const Message& message)
+{
+ const MapView map(message);
+}
+
+void AgentImpl::handleSubscribeRefresh(const Message& message)
+{
+ const MapView map(message);
+}
+
+void AgentImpl::handleMethodRequest(const Message& message)
+{
+ const MapView map(message);
+}
+
AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
@@ -407,8 +476,12 @@ AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, c
return event;
}
-void AgentImpl::handleRcvMessageLH(qpid::messaging::Message& /*msg*/)
+void AgentImpl::notify()
{
+ if (notifyHandler != 0)
+ notifyHandler();
+ if (notifiable != 0)
+ notifiable->notify();
}
void AgentImpl::sendPackageIndicationLH(const string& packageName)
@@ -471,6 +544,8 @@ void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const s
Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); }
Agent::~Agent() { delete impl; }
+void Agent::setNotifyCallback(notifyCb handler) { impl->setNotifyCallback(handler); }
+void Agent::setNotifyCallback(Notifiable* handler) { impl->setNotifyCallback(handler); }
void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); }
void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); }
void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }