summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/engine/Agent.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/engine/Agent.cpp')
-rw-r--r--qpid/cpp/src/qmf/engine/Agent.cpp915
1 files changed, 915 insertions, 0 deletions
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp
new file mode 100644
index 0000000000..1f08dded94
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/Agent.cpp
@@ -0,0 +1,915 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/Agent.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qmf/engine/SchemaImpl.h"
+#include "qmf/engine/Typecode.h"
+#include "qmf/engine/EventImpl.h"
+#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/ObjectIdImpl.h"
+#include "qmf/engine/QueryImpl.h"
+#include "qmf/engine/ValueImpl.h"
+#include "qmf/engine/Protocol.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/Time.h>
+#include <string.h>
+#include <string>
+#include <deque>
+#include <map>
+#include <iostream>
+#include <fstream>
+#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace qmf {
+namespace engine {
+
+ struct AgentEventImpl {
+ typedef boost::shared_ptr<AgentEventImpl> Ptr;
+ AgentEvent::EventKind kind;
+ uint32_t sequence;
+ string authUserId;
+ string authToken;
+ string name;
+ Object* object;
+ boost::shared_ptr<ObjectId> objectId;
+ boost::shared_ptr<Query> query;
+ boost::shared_ptr<Value> arguments;
+ string exchange;
+ string bindingKey;
+ const SchemaObjectClass* objectClass;
+
+ AgentEventImpl(AgentEvent::EventKind k) :
+ kind(k), sequence(0), object(0), objectClass(0) {}
+ ~AgentEventImpl() {}
+ AgentEvent copy();
+ };
+
+ struct AgentQueryContext {
+ typedef boost::shared_ptr<AgentQueryContext> Ptr;
+ uint32_t sequence;
+ string exchange;
+ string key;
+ const SchemaMethod* schemaMethod;
+ AgentQueryContext() : schemaMethod(0) {}
+ };
+
+ class AgentImpl : public boost::noncopyable {
+ public:
+ AgentImpl(char* label, bool internalStore);
+ ~AgentImpl();
+
+ void setStoreDir(const char* path);
+ void setTransferDir(const char* path);
+ void handleRcvMessage(Message& message);
+ bool getXmtMessage(Message& item) const;
+ void popXmt();
+ bool getEvent(AgentEvent& event) const;
+ void popEvent();
+ void newSession();
+ void startProtocol();
+ void heartbeat();
+ void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
+ void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat);
+ void queryComplete(uint32_t sequence);
+ void registerClass(SchemaObjectClass* cls);
+ void registerClass(SchemaEventClass* cls);
+ const ObjectId* addObject(Object& obj, uint64_t persistId);
+ const ObjectId* allocObjectId(uint64_t persistId);
+ const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
+ void raiseEvent(Event& event);
+
+ private:
+ mutable Mutex lock;
+ Mutex addLock;
+ string label;
+ string queueName;
+ string storeDir;
+ string transferDir;
+ bool internalStore;
+ uint64_t nextTransientId;
+ Uuid systemId;
+ uint32_t requestedBrokerBank;
+ uint32_t requestedAgentBank;
+ uint32_t assignedBrokerBank;
+ uint32_t assignedAgentBank;
+ AgentAttachment attachment;
+ uint16_t bootSequence;
+ uint64_t nextObjectId;
+ uint32_t nextContextNum;
+ deque<AgentEventImpl::Ptr> eventQueue;
+ deque<MessageImpl::Ptr> xmtQueue;
+ map<uint32_t, AgentQueryContext::Ptr> contextMap;
+ bool attachComplete;
+
+ static const char* QMF_EXCHANGE;
+ static const char* DIR_EXCHANGE;
+ static const char* BROKER_KEY;
+ static const uint32_t MERR_UNKNOWN_METHOD = 2;
+ static const uint32_t MERR_UNKNOWN_PACKAGE = 8;
+ static const uint32_t MERR_UNKNOWN_CLASS = 9;
+ static const uint32_t MERR_INTERNAL_ERROR = 10;
+# define MA_BUFFER_SIZE 65536
+ char outputBuffer[MA_BUFFER_SIZE];
+
+ struct AgentClassKey {
+ string name;
+ uint8_t hash[16];
+ AgentClassKey(const string& n, const uint8_t* h) : name(n) {
+ memcpy(hash, h, 16);
+ }
+ AgentClassKey(Buffer& buffer) {
+ buffer.getShortString(name);
+ buffer.getBin128(hash);
+ }
+ string repr() {
+ return name;
+ }
+ };
+
+ struct AgentClassKeyComp {
+ bool operator() (const AgentClassKey& lhs, const AgentClassKey& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+ typedef map<AgentClassKey, SchemaObjectClass*, AgentClassKeyComp> ObjectClassMap;
+ typedef map<AgentClassKey, SchemaEventClass*, AgentClassKeyComp> EventClassMap;
+
+ struct ClassMaps {
+ ObjectClassMap objectClasses;
+ EventClassMap eventClasses;
+ };
+
+ map<string, ClassMaps> packages;
+
+ AgentEventImpl::Ptr eventDeclareQueue(const string& queueName);
+ AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
+ AgentEventImpl::Ptr eventSetupComplete();
+ AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls,
+ boost::shared_ptr<ObjectId> oid);
+ AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
+ boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
+ const SchemaObjectClass* objectClass);
+ void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
+
+ void sendPackageIndicationLH(const string& packageName);
+ void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
+ void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
+ uint32_t code = 0, const string& text = "OK");
+ void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
+ void handleAttachResponse(Buffer& inBuffer);
+ void handlePackageRequest(Buffer& inBuffer);
+ void handleClassQuery(Buffer& inBuffer);
+ void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
+ const string& replyToExchange, const string& replyToKey);
+ void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
+ void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
+ void handleConsoleAddedIndication();
+ };
+}
+}
+
+const char* AgentImpl::QMF_EXCHANGE = "qpid.management";
+const char* AgentImpl::DIR_EXCHANGE = "amq.direct";
+const char* AgentImpl::BROKER_KEY = "broker";
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+AgentEvent AgentEventImpl::copy()
+{
+ AgentEvent item;
+
+ ::memset(&item, 0, sizeof(AgentEvent));
+ item.kind = kind;
+ item.sequence = sequence;
+ item.object = object;
+ item.objectId = objectId.get();
+ item.query = query.get();
+ item.arguments = arguments.get();
+ item.objectClass = objectClass;
+
+ STRING_REF(authUserId);
+ STRING_REF(authToken);
+ STRING_REF(name);
+ STRING_REF(exchange);
+ STRING_REF(bindingKey);
+
+ return item;
+}
+
+AgentImpl::AgentImpl(char* _label, bool i) :
+ label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1),
+ requestedBrokerBank(0), requestedAgentBank(0),
+ assignedBrokerBank(0), assignedAgentBank(0),
+ bootSequence(1), nextObjectId(1), nextContextNum(1), attachComplete(false)
+{
+ queueName += Uuid(true).str();
+}
+
+AgentImpl::~AgentImpl()
+{
+}
+
+void AgentImpl::setStoreDir(const char* path)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (path)
+ storeDir = path;
+ else
+ storeDir.clear();
+}
+
+void AgentImpl::setTransferDir(const char* path)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (path)
+ transferDir = path;
+ else
+ transferDir.clear();
+}
+
+void AgentImpl::handleRcvMessage(Message& message)
+{
+ Buffer inBuffer(message.body, message.length);
+ uint8_t opcode;
+ uint32_t sequence;
+ string replyToExchange(message.replyExchange ? message.replyExchange : "");
+ string replyToKey(message.replyKey ? message.replyKey : "");
+ string userId(message.userId ? message.userId : "");
+
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
+ if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer);
+ else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
+ else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication();
+ else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId);
+ else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId);
+ else {
+ QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode);
+ break;
+ }
+ }
+}
+
+bool AgentImpl::getXmtMessage(Message& item) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (xmtQueue.empty())
+ return false;
+ item = xmtQueue.front()->copy();
+ return true;
+}
+
+void AgentImpl::popXmt()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!xmtQueue.empty())
+ xmtQueue.pop_front();
+}
+
+bool AgentImpl::getEvent(AgentEvent& event) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void AgentImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+void AgentImpl::newSession()
+{
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.clear();
+ xmtQueue.clear();
+ eventQueue.push_back(eventDeclareQueue(queueName));
+ eventQueue.push_back(eventBind("amq.direct", queueName, queueName));
+ eventQueue.push_back(eventSetupComplete());
+}
+
+void AgentImpl::startProtocol()
+{
+ Mutex::ScopedLock _lock(lock);
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST);
+ buffer.putShortString(label);
+ systemId.encode(buffer);
+ buffer.putLong(requestedBrokerBank);
+ buffer.putLong(requestedAgentBank);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
+ " reqAgent=" << requestedAgentBank);
+}
+
+void AgentImpl::heartbeat()
+{
+ Mutex::ScopedLock _lock(lock);
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+
+ Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION);
+ buffer.putLongLong(uint64_t(Duration(EPOCH, now())));
+ stringstream key;
+ key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
+ sendBufferLH(buffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT HeartbeatIndication");
+}
+
+void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& argMap)
+{
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ AgentQueryContext::Ptr context = iter->second;
+ contextMap.erase(iter);
+
+ char* buf(outputBuffer);
+ uint32_t bufLen(114 + strlen(text)); // header(8) + status(4) + mstring(2 + size) + margin(100)
+ bool allocated(false);
+
+ if (status == 0) {
+ for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin();
+ aIter != context->schemaMethod->impl->arguments.end(); aIter++) {
+ const SchemaArgument* schemaArg = *aIter;
+ if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) {
+ if (argMap.keyInMap(schemaArg->getName())) {
+ const Value* val = argMap.byKey(schemaArg->getName());
+ bufLen += val->impl->encodedSize();
+ } else {
+ Value val(schemaArg->getType());
+ bufLen += val.impl->encodedSize();
+ }
+ }
+ }
+ }
+
+ if (bufLen > MA_BUFFER_SIZE) {
+ buf = (char*) malloc(bufLen);
+ allocated = true;
+ }
+
+ Buffer buffer(buf, bufLen);
+ Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence);
+ buffer.putLong(status);
+ buffer.putMediumString(text);
+ if (status == 0) {
+ for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin();
+ aIter != context->schemaMethod->impl->arguments.end(); aIter++) {
+ const SchemaArgument* schemaArg = *aIter;
+ if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) {
+ if (argMap.keyInMap(schemaArg->getName())) {
+ const Value* val = argMap.byKey(schemaArg->getName());
+ val->impl->encode(buffer);
+ } else {
+ Value val(schemaArg->getType());
+ val.impl->encode(buffer);
+ }
+ }
+ }
+ }
+ sendBufferLH(buffer, context->exchange, context->key);
+ if (allocated)
+ free(buf);
+ QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
+}
+
+void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
+{
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ AgentQueryContext::Ptr context = iter->second;
+
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence);
+
+ object.impl->encodeSchemaKey(buffer);
+ object.impl->encodeManagedObjectData(buffer);
+ if (prop)
+ object.impl->encodeProperties(buffer);
+ if (stat)
+ object.impl->encodeStatistics(buffer);
+
+ sendBufferLH(buffer, context->exchange, context->key);
+ QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
+}
+
+void AgentImpl::queryComplete(uint32_t sequence)
+{
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+
+ AgentQueryContext::Ptr context = iter->second;
+ contextMap.erase(iter);
+ sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
+}
+
+void AgentImpl::registerClass(SchemaObjectClass* cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ bool newPackage = false;
+
+ map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName());
+ if (iter == packages.end()) {
+ packages[cls->getClassKey()->getPackageName()] = ClassMaps();
+ iter = packages.find(cls->getClassKey()->getPackageName());
+ newPackage = true;
+ }
+
+ AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash());
+ iter->second.objectClasses[key] = cls;
+
+ // Indicate this new schema if connected.
+
+ if (attachComplete) {
+
+ if (newPackage) {
+ sendPackageIndicationLH(iter->first);
+ }
+ sendClassIndicationLH(CLASS_OBJECT, iter->first, key);
+ }
+}
+
+void AgentImpl::registerClass(SchemaEventClass* cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ bool newPackage = false;
+
+ map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName());
+ if (iter == packages.end()) {
+ packages[cls->getClassKey()->getPackageName()] = ClassMaps();
+ iter = packages.find(cls->getClassKey()->getPackageName());
+ newPackage = true;
+ }
+
+ AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash());
+ iter->second.eventClasses[key] = cls;
+
+ // Indicate this new schema if connected.
+
+ if (attachComplete) {
+
+ if (newPackage) {
+ sendPackageIndicationLH(iter->first);
+ }
+ sendClassIndicationLH(CLASS_EVENT, iter->first, key);
+ }
+}
+
+const ObjectId* AgentImpl::addObject(Object&, uint64_t)
+{
+ Mutex::ScopedLock _lock(lock);
+ return 0;
+}
+
+const ObjectId* AgentImpl::allocObjectId(uint64_t persistId)
+{
+ Mutex::ScopedLock _lock(lock);
+ uint16_t sequence = persistId ? 0 : bootSequence;
+ uint64_t objectNum = persistId ? persistId : nextObjectId++;
+
+ ObjectId* oid = ObjectIdImpl::factory(&attachment, 0, sequence, objectNum);
+ return oid;
+}
+
+const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
+{
+ return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo);
+}
+
+void AgentImpl::raiseEvent(Event& event)
+{
+ Mutex::ScopedLock _lock(lock);
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_EVENT_INDICATION);
+
+ event.impl->encodeSchemaKey(buffer);
+ buffer.putLongLong(uint64_t(Duration(EPOCH, now())));
+ event.impl->encode(buffer);
+ string key(event.impl->getRoutingKey(assignedBrokerBank, assignedAgentBank));
+
+ sendBufferLH(buffer, QMF_EXCHANGE, key);
+ QPID_LOG(trace, "SENT EventIndication");
+}
+
+AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name)
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
+ event->name = name;
+
+ return event;
+}
+
+AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue,
+ const string& key)
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND));
+ event->name = queue;
+ event->exchange = exchange;
+ event->bindingKey = key;
+
+ return event;
+}
+
+AgentEventImpl::Ptr AgentImpl::eventSetupComplete()
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE));
+ return event;
+}
+
+AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package,
+ const string& cls, boost::shared_ptr<ObjectId> oid)
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
+ event->sequence = num;
+ event->authUserId = userId;
+ if (oid.get())
+ event->query.reset(new Query(oid.get()));
+ else
+ event->query.reset(new Query(cls.c_str(), package.c_str()));
+ return event;
+}
+
+AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method,
+ boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
+ const SchemaObjectClass* objectClass)
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL));
+ event->sequence = num;
+ event->authUserId = userId;
+ event->name = method;
+ event->objectId = oid;
+ event->arguments = argMap;
+ event->objectClass = objectClass;
+ return event;
+}
+
+void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+{
+ uint32_t length = buf.getPosition();
+ MessageImpl::Ptr message(new MessageImpl);
+
+ buf.reset();
+ buf.getRawData(message->body, length);
+ message->destination = destination;
+ message->routingKey = routingKey;
+ message->replyExchange = "amq.direct";
+ message->replyKey = queueName;
+
+ xmtQueue.push_back(message);
+}
+
+void AgentImpl::sendPackageIndicationLH(const string& packageName)
+{
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION);
+ buffer.putShortString(packageName);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
+}
+
+void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
+{
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION);
+ buffer.putOctet((int) kind);
+ buffer.putShortString(packageName);
+ buffer.putShortString(key.name);
+ buffer.putBin128(const_cast<uint8_t*>(key.hash)); // const_cast needed for older Qpid libraries
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name);
+}
+
+void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey,
+ uint32_t sequence, uint32_t code, const string& text)
+{
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence);
+ buffer.putLong(code);
+ buffer.putShortString(text);
+ sendBufferLH(buffer, exchange, replyToKey);
+ QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
+}
+
+void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
+{
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence);
+ buffer.putLong(code);
+
+ string fulltext;
+ switch (code) {
+ case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break;
+ case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break;
+ case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break;
+ case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break;
+ default: fulltext = "Unspecified Error"; break;
+ }
+
+ if (!text.empty()) {
+ fulltext += " (";
+ fulltext += text;
+ fulltext += ")";
+ }
+
+ buffer.putMediumString(fulltext);
+ sendBufferLH(buffer, DIR_EXCHANGE, key);
+ QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext);
+}
+
+void AgentImpl::handleAttachResponse(Buffer& inBuffer)
+{
+ Mutex::ScopedLock _lock(lock);
+
+ assignedBrokerBank = inBuffer.getLong();
+ assignedAgentBank = inBuffer.getLong();
+
+ QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
+
+ if ((assignedBrokerBank != requestedBrokerBank) ||
+ (assignedAgentBank != requestedAgentBank)) {
+ if (requestedAgentBank == 0) {
+ QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
+ assignedAgentBank);
+ } else {
+ QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
+ "." << assignedAgentBank);
+ }
+ //storeData(); // TODO
+ requestedBrokerBank = assignedBrokerBank;
+ requestedAgentBank = assignedAgentBank;
+ }
+
+ attachment.setBanks(assignedBrokerBank, assignedAgentBank);
+
+ // Bind to qpid.management to receive commands
+ stringstream key;
+ key << "agent." << assignedBrokerBank << "." << assignedAgentBank;
+ eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str()));
+
+ // Send package indications for all local packages
+ for (map<string, ClassMaps>::iterator pIter = packages.begin();
+ pIter != packages.end();
+ pIter++) {
+ sendPackageIndicationLH(pIter->first);
+
+ // Send class indications for all local classes
+ ClassMaps cMap = pIter->second;
+ for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin();
+ cIter != cMap.objectClasses.end(); cIter++)
+ sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first);
+ for (EventClassMap::iterator cIter = cMap.eventClasses.begin();
+ cIter != cMap.eventClasses.end(); cIter++)
+ sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first);
+ }
+
+ attachComplete = true;
+}
+
+void AgentImpl::handlePackageRequest(Buffer&)
+{
+ Mutex::ScopedLock _lock(lock);
+}
+
+void AgentImpl::handleClassQuery(Buffer&)
+{
+ Mutex::ScopedLock _lock(lock);
+}
+
+void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
+ const string& replyExchange, const string& replyKey)
+{
+ Mutex::ScopedLock _lock(lock);
+ string rExchange(replyExchange);
+ string rKey(replyKey);
+ string packageName;
+ inBuffer.getShortString(packageName);
+ AgentClassKey key(inBuffer);
+
+ if (rExchange.empty())
+ rExchange = QMF_EXCHANGE;
+ if (rKey.empty())
+ rKey = BROKER_KEY;
+
+ QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
+
+ map<string, ClassMaps>::iterator pIter = packages.find(packageName);
+ if (pIter == packages.end()) {
+ sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found");
+ return;
+ }
+
+ ClassMaps cMap = pIter->second;
+ ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key);
+ if (ocIter != cMap.objectClasses.end()) {
+ SchemaObjectClass* oImpl = ocIter->second;
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
+ oImpl->impl->encode(buffer);
+ sendBufferLH(buffer, rExchange, rKey);
+ QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
+ return;
+ }
+
+ EventClassMap::iterator ecIter = cMap.eventClasses.find(key);
+ if (ecIter != cMap.eventClasses.end()) {
+ SchemaEventClass* eImpl = ecIter->second;
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
+ eImpl->impl->encode(buffer);
+ sendBufferLH(buffer, rExchange, rKey);
+ QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
+ return;
+ }
+
+ sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found");
+}
+
+void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId)
+{
+ Mutex::ScopedLock _lock(lock);
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+ map<string, ClassMaps>::const_iterator pIter = packages.end();
+ string pname;
+ string cname;
+ string oidRepr;
+ boost::shared_ptr<ObjectId> oid;
+
+ ft.decode(inBuffer);
+
+ QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft);
+
+ value = ft.get("_package");
+ if (value.get() && value->convertsTo<string>()) {
+ pname = value->get<string>();
+ pIter = packages.find(pname);
+ if (pIter == packages.end()) {
+ sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence);
+ return;
+ }
+ }
+
+ value = ft.get("_class");
+ if (value.get() && value->convertsTo<string>()) {
+ cname = value->get<string>();
+ // TODO - check for validity of class (in package or any package)
+ if (pIter == packages.end()) {
+ } else {
+
+ }
+ }
+
+ value = ft.get("_objectid");
+ if (value.get() && value->convertsTo<string>()) {
+ oidRepr = value->get<string>();
+ oid.reset(new ObjectId());
+ oid->impl->fromString(oidRepr);
+ }
+
+ AgentQueryContext::Ptr context(new AgentQueryContext);
+ uint32_t contextNum = nextContextNum++;
+ context->sequence = sequence;
+ context->exchange = DIR_EXCHANGE;
+ context->key = replyTo;
+ contextMap[contextNum] = context;
+
+ eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid));
+}
+
+void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId)
+{
+ Mutex::ScopedLock _lock(lock);
+ string pname;
+ string method;
+ boost::shared_ptr<ObjectId> oid(ObjectIdImpl::factory(buffer));
+ buffer.getShortString(pname);
+ AgentClassKey classKey(buffer);
+ buffer.getShortString(method);
+
+ QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method);
+
+ map<string, ClassMaps>::const_iterator pIter = packages.find(pname);
+ if (pIter == packages.end()) {
+ sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname);
+ return;
+ }
+
+ ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey);
+ if (cIter == pIter->second.objectClasses.end()) {
+ sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr());
+ return;
+ }
+
+ const SchemaObjectClass* schema = cIter->second;
+ vector<const SchemaMethod*>::const_iterator mIter = schema->impl->methods.begin();
+ for (; mIter != schema->impl->methods.end(); mIter++) {
+ if ((*mIter)->getName() == method)
+ break;
+ }
+
+ if (mIter == schema->impl->methods.end()) {
+ sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method);
+ return;
+ }
+
+ const SchemaMethod* schemaMethod = *mIter;
+ boost::shared_ptr<Value> argMap(new Value(TYPE_MAP));
+ Value* value;
+ for (vector<const SchemaArgument*>::const_iterator aIter = schemaMethod->impl->arguments.begin();
+ aIter != schemaMethod->impl->arguments.end(); aIter++) {
+ const SchemaArgument* schemaArg = *aIter;
+ if (schemaArg->getDirection() == DIR_IN || schemaArg->getDirection() == DIR_IN_OUT)
+ value = ValueImpl::factory(schemaArg->getType(), buffer);
+ else
+ value = ValueImpl::factory(schemaArg->getType());
+ argMap->insert(schemaArg->getName(), value);
+ }
+
+ AgentQueryContext::Ptr context(new AgentQueryContext);
+ uint32_t contextNum = nextContextNum++;
+ context->sequence = sequence;
+ context->exchange = DIR_EXCHANGE;
+ context->key = replyTo;
+ context->schemaMethod = schemaMethod;
+ contextMap[contextNum] = context;
+
+ eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema));
+}
+
+void AgentImpl::handleConsoleAddedIndication()
+{
+ Mutex::ScopedLock _lock(lock);
+}
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Agent::Agent(char* label, bool internalStore) { impl = new AgentImpl(label, internalStore); }
+Agent::~Agent() { delete impl; }
+void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); }
+void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }
+void Agent::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool Agent::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void Agent::popXmt() { impl->popXmt(); }
+bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
+void Agent::popEvent() { impl->popEvent(); }
+void Agent::newSession() { impl->newSession(); }
+void Agent::startProtocol() { impl->startProtocol(); }
+void Agent::heartbeat() { impl->heartbeat(); }
+void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); }
+void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); }
+void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
+void Agent::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); }
+void Agent::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); }
+const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); }
+const ObjectId* Agent::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); }
+const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); }
+void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
+