summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/engine
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/engine')
-rw-r--r--qpid/cpp/src/qmf/engine/Agent.cpp857
-rw-r--r--qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp763
-rw-r--r--qpid/cpp/src/qmf/engine/BrokerProxyImpl.h239
-rw-r--r--qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp273
-rw-r--r--qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h63
-rw-r--r--qpid/cpp/src/qmf/engine/ConsoleImpl.cpp419
-rw-r--r--qpid/cpp/src/qmf/engine/ConsoleImpl.h145
-rw-r--r--qpid/cpp/src/qmf/engine/MessageImpl.cpp43
-rw-r--r--qpid/cpp/src/qmf/engine/MessageImpl.h44
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp168
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectIdImpl.h72
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectImpl.cpp232
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectImpl.h76
-rw-r--r--qpid/cpp/src/qmf/engine/Protocol.cpp52
-rw-r--r--qpid/cpp/src/qmf/engine/Protocol.h69
-rw-r--r--qpid/cpp/src/qmf/engine/QueryImpl.cpp103
-rw-r--r--qpid/cpp/src/qmf/engine/QueryImpl.h100
-rw-r--r--qpid/cpp/src/qmf/engine/ResilientConnection.cpp489
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.cpp611
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.h223
-rw-r--r--qpid/cpp/src/qmf/engine/SequenceManager.cpp96
-rw-r--r--qpid/cpp/src/qmf/engine/SequenceManager.h68
-rw-r--r--qpid/cpp/src/qmf/engine/ValueImpl.cpp266
-rw-r--r--qpid/cpp/src/qmf/engine/ValueImpl.h150
24 files changed, 5621 insertions, 0 deletions
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp
new file mode 100644
index 0000000000..c5d1bff2e0
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/Agent.cpp
@@ -0,0 +1,857 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/Agent.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qmf/engine/SchemaImpl.h"
+#include "qmf/engine/Typecode.h"
+#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/ObjectIdImpl.h"
+#include "qmf/engine/QueryImpl.h"
+#include "qmf/engine/ValueImpl.h"
+#include "qmf/engine/Protocol.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/Time.h>
+#include <string.h>
+#include <string>
+#include <deque>
+#include <map>
+#include <iostream>
+#include <fstream>
+#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace qmf {
+namespace engine {
+
+ struct AgentEventImpl {
+ typedef boost::shared_ptr<AgentEventImpl> Ptr;
+ AgentEvent::EventKind kind;
+ uint32_t sequence;
+ string authUserId;
+ string authToken;
+ string name;
+ Object* object;
+ boost::shared_ptr<ObjectId> objectId;
+ boost::shared_ptr<Query> query;
+ boost::shared_ptr<Value> arguments;
+ string exchange;
+ string bindingKey;
+ const SchemaObjectClass* objectClass;
+
+ AgentEventImpl(AgentEvent::EventKind k) :
+ kind(k), sequence(0), object(0), objectClass(0) {}
+ ~AgentEventImpl() {}
+ AgentEvent copy();
+ };
+
+ struct AgentQueryContext {
+ typedef boost::shared_ptr<AgentQueryContext> Ptr;
+ uint32_t sequence;
+ string exchange;
+ string key;
+ const SchemaMethod* schemaMethod;
+ AgentQueryContext() : schemaMethod(0) {}
+ };
+
+ class AgentImpl : public boost::noncopyable {
+ public:
+ AgentImpl(char* label, bool internalStore);
+ ~AgentImpl();
+
+ void setStoreDir(const char* path);
+ void setTransferDir(const char* path);
+ void handleRcvMessage(Message& message);
+ bool getXmtMessage(Message& item) const;
+ void popXmt();
+ bool getEvent(AgentEvent& event) const;
+ void popEvent();
+ void newSession();
+ void startProtocol();
+ void heartbeat();
+ void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
+ void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat);
+ void queryComplete(uint32_t sequence);
+ void registerClass(SchemaObjectClass* cls);
+ void registerClass(SchemaEventClass* cls);
+ const ObjectId* addObject(Object& obj, uint64_t persistId);
+ const ObjectId* allocObjectId(uint64_t persistId);
+ const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
+ void raiseEvent(Event& event);
+
+ private:
+ mutable Mutex lock;
+ Mutex addLock;
+ string label;
+ string queueName;
+ string storeDir;
+ string transferDir;
+ bool internalStore;
+ uint64_t nextTransientId;
+ Uuid systemId;
+ uint32_t requestedBrokerBank;
+ uint32_t requestedAgentBank;
+ uint32_t assignedBrokerBank;
+ uint32_t assignedAgentBank;
+ AgentAttachment attachment;
+ uint16_t bootSequence;
+ uint64_t nextObjectId;
+ uint32_t nextContextNum;
+ deque<AgentEventImpl::Ptr> eventQueue;
+ deque<MessageImpl::Ptr> xmtQueue;
+ map<uint32_t, AgentQueryContext::Ptr> contextMap;
+
+ static const char* QMF_EXCHANGE;
+ static const char* DIR_EXCHANGE;
+ static const char* BROKER_KEY;
+ static const uint32_t MERR_UNKNOWN_METHOD = 2;
+ static const uint32_t MERR_UNKNOWN_PACKAGE = 8;
+ static const uint32_t MERR_UNKNOWN_CLASS = 9;
+ static const uint32_t MERR_INTERNAL_ERROR = 10;
+# define MA_BUFFER_SIZE 65536
+ char outputBuffer[MA_BUFFER_SIZE];
+
+ struct AgentClassKey {
+ string name;
+ uint8_t hash[16];
+ AgentClassKey(const string& n, const uint8_t* h) : name(n) {
+ memcpy(hash, h, 16);
+ }
+ AgentClassKey(Buffer& buffer) {
+ buffer.getShortString(name);
+ buffer.getBin128(hash);
+ }
+ string repr() {
+ return name;
+ }
+ };
+
+ struct AgentClassKeyComp {
+ bool operator() (const AgentClassKey& lhs, const AgentClassKey& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+ typedef map<AgentClassKey, SchemaObjectClass*, AgentClassKeyComp> ObjectClassMap;
+ typedef map<AgentClassKey, SchemaEventClass*, AgentClassKeyComp> EventClassMap;
+
+ struct ClassMaps {
+ ObjectClassMap objectClasses;
+ EventClassMap eventClasses;
+ };
+
+ map<string, ClassMaps> packages;
+
+ AgentEventImpl::Ptr eventDeclareQueue(const string& queueName);
+ AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
+ AgentEventImpl::Ptr eventSetupComplete();
+ AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls,
+ boost::shared_ptr<ObjectId> oid);
+ AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
+ boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
+ const SchemaObjectClass* objectClass);
+ void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
+
+ void sendPackageIndicationLH(const string& packageName);
+ void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
+ void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
+ uint32_t code = 0, const string& text = "OK");
+ void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
+ void handleAttachResponse(Buffer& inBuffer);
+ void handlePackageRequest(Buffer& inBuffer);
+ void handleClassQuery(Buffer& inBuffer);
+ void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
+ const string& replyToExchange, const string& replyToKey);
+ void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
+ void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
+ void handleConsoleAddedIndication();
+ };
+}
+}
+
+const char* AgentImpl::QMF_EXCHANGE = "qpid.management";
+const char* AgentImpl::DIR_EXCHANGE = "amq.direct";
+const char* AgentImpl::BROKER_KEY = "broker";
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+AgentEvent AgentEventImpl::copy()
+{
+ AgentEvent item;
+
+ ::memset(&item, 0, sizeof(AgentEvent));
+ item.kind = kind;
+ item.sequence = sequence;
+ item.object = object;
+ item.objectId = objectId.get();
+ item.query = query.get();
+ item.arguments = arguments.get();
+ item.objectClass = objectClass;
+
+ STRING_REF(authUserId);
+ STRING_REF(authToken);
+ STRING_REF(name);
+ STRING_REF(exchange);
+ STRING_REF(bindingKey);
+
+ return item;
+}
+
+AgentImpl::AgentImpl(char* _label, bool i) :
+ label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1),
+ requestedBrokerBank(0), requestedAgentBank(0),
+ assignedBrokerBank(0), assignedAgentBank(0),
+ bootSequence(1), nextObjectId(1), nextContextNum(1)
+{
+ queueName += label;
+}
+
+AgentImpl::~AgentImpl()
+{
+}
+
+void AgentImpl::setStoreDir(const char* path)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (path)
+ storeDir = path;
+ else
+ storeDir.clear();
+}
+
+void AgentImpl::setTransferDir(const char* path)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (path)
+ transferDir = path;
+ else
+ transferDir.clear();
+}
+
+void AgentImpl::handleRcvMessage(Message& message)
+{
+ Buffer inBuffer(message.body, message.length);
+ uint8_t opcode;
+ uint32_t sequence;
+ string replyToExchange(message.replyExchange ? message.replyExchange : "");
+ string replyToKey(message.replyKey ? message.replyKey : "");
+ string userId(message.userId ? message.userId : "");
+
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
+ if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer);
+ else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
+ else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication();
+ else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId);
+ else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId);
+ else {
+ QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode);
+ break;
+ }
+ }
+}
+
+bool AgentImpl::getXmtMessage(Message& item) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (xmtQueue.empty())
+ return false;
+ item = xmtQueue.front()->copy();
+ return true;
+}
+
+void AgentImpl::popXmt()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!xmtQueue.empty())
+ xmtQueue.pop_front();
+}
+
+bool AgentImpl::getEvent(AgentEvent& event) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void AgentImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+void AgentImpl::newSession()
+{
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.clear();
+ xmtQueue.clear();
+ eventQueue.push_back(eventDeclareQueue(queueName));
+ eventQueue.push_back(eventBind("amq.direct", queueName, queueName));
+ eventQueue.push_back(eventSetupComplete());
+}
+
+void AgentImpl::startProtocol()
+{
+ Mutex::ScopedLock _lock(lock);
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST);
+ buffer.putShortString("qmfa");
+ systemId.encode(buffer);
+ buffer.putLong(requestedBrokerBank);
+ buffer.putLong(requestedAgentBank);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
+ " reqAgent=" << requestedAgentBank);
+}
+
+void AgentImpl::heartbeat()
+{
+ Mutex::ScopedLock _lock(lock);
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+
+ Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION);
+ buffer.putLongLong(uint64_t(Duration(now())));
+ stringstream key;
+ key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
+ sendBufferLH(buffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT HeartbeatIndication");
+}
+
+void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
+ const Value& argMap)
+{
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ AgentQueryContext::Ptr context = iter->second;
+ contextMap.erase(iter);
+
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence);
+ buffer.putLong(status);
+ buffer.putMediumString(text);
+ if (status == 0) {
+ for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin();
+ aIter != context->schemaMethod->impl->arguments.end(); aIter++) {
+ const SchemaArgument* schemaArg = *aIter;
+ if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) {
+ if (argMap.keyInMap(schemaArg->getName())) {
+ const Value* val = argMap.byKey(schemaArg->getName());
+ val->impl->encode(buffer);
+ } else {
+ Value val(schemaArg->getType());
+ val.impl->encode(buffer);
+ }
+ }
+ }
+ }
+ sendBufferLH(buffer, context->exchange, context->key);
+ QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
+}
+
+void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
+{
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ AgentQueryContext::Ptr context = iter->second;
+
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence);
+
+ object.impl->encodeSchemaKey(buffer);
+ object.impl->encodeManagedObjectData(buffer);
+ if (prop)
+ object.impl->encodeProperties(buffer);
+ if (stat)
+ object.impl->encodeStatistics(buffer);
+
+ sendBufferLH(buffer, context->exchange, context->key);
+ QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
+}
+
+void AgentImpl::queryComplete(uint32_t sequence)
+{
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+
+ AgentQueryContext::Ptr context = iter->second;
+ contextMap.erase(iter);
+ sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
+}
+
+void AgentImpl::registerClass(SchemaObjectClass* cls)
+{
+ Mutex::ScopedLock _lock(lock);
+
+ map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName());
+ if (iter == packages.end()) {
+ packages[cls->getClassKey()->getPackageName()] = ClassMaps();
+ iter = packages.find(cls->getClassKey()->getPackageName());
+ // TODO: Indicate this package if connected
+ }
+
+ AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash());
+ iter->second.objectClasses[key] = cls;
+
+ // TODO: Indicate this schema if connected.
+}
+
+void AgentImpl::registerClass(SchemaEventClass* cls)
+{
+ Mutex::ScopedLock _lock(lock);
+
+ map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName());
+ if (iter == packages.end()) {
+ packages[cls->getClassKey()->getPackageName()] = ClassMaps();
+ iter = packages.find(cls->getClassKey()->getPackageName());
+ // TODO: Indicate this package if connected
+ }
+
+ AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash());
+ iter->second.eventClasses[key] = cls;
+
+ // TODO: Indicate this schema if connected.
+}
+
+const ObjectId* AgentImpl::addObject(Object&, uint64_t)
+{
+ Mutex::ScopedLock _lock(lock);
+ return 0;
+}
+
+const ObjectId* AgentImpl::allocObjectId(uint64_t persistId)
+{
+ Mutex::ScopedLock _lock(lock);
+ uint16_t sequence = persistId ? 0 : bootSequence;
+ uint64_t objectNum = persistId ? persistId : nextObjectId++;
+
+ ObjectId* oid = ObjectIdImpl::factory(&attachment, 0, sequence, objectNum);
+ return oid;
+}
+
+const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
+{
+ return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo);
+}
+
+void AgentImpl::raiseEvent(Event&)
+{
+ Mutex::ScopedLock _lock(lock);
+}
+
+AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name)
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
+ event->name = name;
+
+ return event;
+}
+
+AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue,
+ const string& key)
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND));
+ event->name = queue;
+ event->exchange = exchange;
+ event->bindingKey = key;
+
+ return event;
+}
+
+AgentEventImpl::Ptr AgentImpl::eventSetupComplete()
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE));
+ return event;
+}
+
+AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package,
+ const string& cls, boost::shared_ptr<ObjectId> oid)
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
+ event->sequence = num;
+ event->authUserId = userId;
+ if (oid.get())
+ event->query.reset(new Query(oid.get()));
+ else
+ event->query.reset(new Query(cls.c_str(), package.c_str()));
+ return event;
+}
+
+AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method,
+ boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
+ const SchemaObjectClass* objectClass)
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL));
+ event->sequence = num;
+ event->authUserId = userId;
+ event->name = method;
+ event->objectId = oid;
+ event->arguments = argMap;
+ event->objectClass = objectClass;
+ return event;
+}
+
+void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+{
+ uint32_t length = buf.getPosition();
+ MessageImpl::Ptr message(new MessageImpl);
+
+ buf.reset();
+ buf.getRawData(message->body, length);
+ message->destination = destination;
+ message->routingKey = routingKey;
+ message->replyExchange = "amq.direct";
+ message->replyKey = queueName;
+
+ xmtQueue.push_back(message);
+}
+
+void AgentImpl::sendPackageIndicationLH(const string& packageName)
+{
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION);
+ buffer.putShortString(packageName);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
+}
+
+void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
+{
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION);
+ buffer.putOctet((int) kind);
+ buffer.putShortString(packageName);
+ buffer.putShortString(key.name);
+ buffer.putBin128(const_cast<uint8_t*>(key.hash)); // const_cast needed for older Qpid libraries
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name);
+}
+
+void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey,
+ uint32_t sequence, uint32_t code, const string& text)
+{
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence);
+ buffer.putLong(code);
+ buffer.putShortString(text);
+ sendBufferLH(buffer, exchange, replyToKey);
+ QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
+}
+
+void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
+{
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence);
+ buffer.putLong(code);
+
+ string fulltext;
+ switch (code) {
+ case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break;
+ case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break;
+ case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break;
+ case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break;
+ default: fulltext = "Unspecified Error"; break;
+ }
+
+ if (!text.empty()) {
+ fulltext += " (";
+ fulltext += text;
+ fulltext += ")";
+ }
+
+ buffer.putMediumString(fulltext);
+ sendBufferLH(buffer, DIR_EXCHANGE, key);
+ QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext);
+}
+
+void AgentImpl::handleAttachResponse(Buffer& inBuffer)
+{
+ Mutex::ScopedLock _lock(lock);
+
+ assignedBrokerBank = inBuffer.getLong();
+ assignedAgentBank = inBuffer.getLong();
+
+ QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
+
+ if ((assignedBrokerBank != requestedBrokerBank) ||
+ (assignedAgentBank != requestedAgentBank)) {
+ if (requestedAgentBank == 0) {
+ QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
+ assignedAgentBank);
+ } else {
+ QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
+ "." << assignedAgentBank);
+ }
+ //storeData(); // TODO
+ requestedBrokerBank = assignedBrokerBank;
+ requestedAgentBank = assignedAgentBank;
+ }
+
+ attachment.setBanks(assignedBrokerBank, assignedAgentBank);
+
+ // Bind to qpid.management to receive commands
+ stringstream key;
+ key << "agent." << assignedBrokerBank << "." << assignedAgentBank;
+ eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str()));
+
+ // Send package indications for all local packages
+ for (map<string, ClassMaps>::iterator pIter = packages.begin();
+ pIter != packages.end();
+ pIter++) {
+ sendPackageIndicationLH(pIter->first);
+
+ // Send class indications for all local classes
+ ClassMaps cMap = pIter->second;
+ for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin();
+ cIter != cMap.objectClasses.end(); cIter++)
+ sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first);
+ for (EventClassMap::iterator cIter = cMap.eventClasses.begin();
+ cIter != cMap.eventClasses.end(); cIter++)
+ sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first);
+ }
+}
+
+void AgentImpl::handlePackageRequest(Buffer&)
+{
+ Mutex::ScopedLock _lock(lock);
+}
+
+void AgentImpl::handleClassQuery(Buffer&)
+{
+ Mutex::ScopedLock _lock(lock);
+}
+
+void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
+ const string& replyExchange, const string& replyKey)
+{
+ Mutex::ScopedLock _lock(lock);
+ string rExchange(replyExchange);
+ string rKey(replyKey);
+ string packageName;
+ inBuffer.getShortString(packageName);
+ AgentClassKey key(inBuffer);
+
+ if (rExchange.empty())
+ rExchange = QMF_EXCHANGE;
+ if (rKey.empty())
+ rKey = BROKER_KEY;
+
+ QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
+
+ map<string, ClassMaps>::iterator pIter = packages.find(packageName);
+ if (pIter == packages.end()) {
+ sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found");
+ return;
+ }
+
+ ClassMaps cMap = pIter->second;
+ ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key);
+ if (ocIter != cMap.objectClasses.end()) {
+ SchemaObjectClass* oImpl = ocIter->second;
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
+ oImpl->impl->encode(buffer);
+ sendBufferLH(buffer, rExchange, rKey);
+ QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
+ return;
+ }
+
+ EventClassMap::iterator ecIter = cMap.eventClasses.find(key);
+ if (ecIter != cMap.eventClasses.end()) {
+ SchemaEventClass* eImpl = ecIter->second;
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
+ eImpl->impl->encode(buffer);
+ sendBufferLH(buffer, rExchange, rKey);
+ QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
+ return;
+ }
+
+ sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found");
+}
+
+void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId)
+{
+ Mutex::ScopedLock _lock(lock);
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+ map<string, ClassMaps>::const_iterator pIter = packages.end();
+ string pname;
+ string cname;
+ string oidRepr;
+ boost::shared_ptr<ObjectId> oid;
+
+ ft.decode(inBuffer);
+
+ QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft);
+
+ value = ft.get("_package");
+ if (value.get() && value->convertsTo<string>()) {
+ pname = value->get<string>();
+ pIter = packages.find(pname);
+ if (pIter == packages.end()) {
+ sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence);
+ return;
+ }
+ }
+
+ value = ft.get("_class");
+ if (value.get() && value->convertsTo<string>()) {
+ cname = value->get<string>();
+ // TODO - check for validity of class (in package or any package)
+ if (pIter == packages.end()) {
+ } else {
+
+ }
+ }
+
+ value = ft.get("_objectid");
+ if (value.get() && value->convertsTo<string>()) {
+ oidRepr = value->get<string>();
+ oid.reset(new ObjectId());
+ oid->impl->fromString(oidRepr);
+ }
+
+ AgentQueryContext::Ptr context(new AgentQueryContext);
+ uint32_t contextNum = nextContextNum++;
+ context->sequence = sequence;
+ context->exchange = DIR_EXCHANGE;
+ context->key = replyTo;
+ contextMap[contextNum] = context;
+
+ eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid));
+}
+
+void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId)
+{
+ Mutex::ScopedLock _lock(lock);
+ string pname;
+ string method;
+ boost::shared_ptr<ObjectId> oid(ObjectIdImpl::factory(buffer));
+ buffer.getShortString(pname);
+ AgentClassKey classKey(buffer);
+ buffer.getShortString(method);
+
+ QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method);
+
+ map<string, ClassMaps>::const_iterator pIter = packages.find(pname);
+ if (pIter == packages.end()) {
+ sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname);
+ return;
+ }
+
+ ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey);
+ if (cIter == pIter->second.objectClasses.end()) {
+ sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr());
+ return;
+ }
+
+ const SchemaObjectClass* schema = cIter->second;
+ vector<const SchemaMethod*>::const_iterator mIter = schema->impl->methods.begin();
+ for (; mIter != schema->impl->methods.end(); mIter++) {
+ if ((*mIter)->getName() == method)
+ break;
+ }
+
+ if (mIter == schema->impl->methods.end()) {
+ sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method);
+ return;
+ }
+
+ const SchemaMethod* schemaMethod = *mIter;
+ boost::shared_ptr<Value> argMap(new Value(TYPE_MAP));
+ Value* value;
+ for (vector<const SchemaArgument*>::const_iterator aIter = schemaMethod->impl->arguments.begin();
+ aIter != schemaMethod->impl->arguments.end(); aIter++) {
+ const SchemaArgument* schemaArg = *aIter;
+ if (schemaArg->getDirection() == DIR_IN || schemaArg->getDirection() == DIR_IN_OUT)
+ value = ValueImpl::factory(schemaArg->getType(), buffer);
+ else
+ value = ValueImpl::factory(schemaArg->getType());
+ argMap->insert(schemaArg->getName(), value);
+ }
+
+ AgentQueryContext::Ptr context(new AgentQueryContext);
+ uint32_t contextNum = nextContextNum++;
+ context->sequence = sequence;
+ context->exchange = DIR_EXCHANGE;
+ context->key = replyTo;
+ context->schemaMethod = schemaMethod;
+ contextMap[contextNum] = context;
+
+ eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema));
+}
+
+void AgentImpl::handleConsoleAddedIndication()
+{
+ Mutex::ScopedLock _lock(lock);
+}
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Agent::Agent(char* label, bool internalStore) { impl = new AgentImpl(label, internalStore); }
+Agent::~Agent() { delete impl; }
+void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); }
+void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }
+void Agent::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool Agent::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void Agent::popXmt() { impl->popXmt(); }
+bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
+void Agent::popEvent() { impl->popEvent(); }
+void Agent::newSession() { impl->newSession(); }
+void Agent::startProtocol() { impl->startProtocol(); }
+void Agent::heartbeat() { impl->heartbeat(); }
+void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); }
+void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); }
+void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
+void Agent::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); }
+void Agent::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); }
+const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); }
+const ObjectId* Agent::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); }
+const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); }
+void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
+
diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
new file mode 100644
index 0000000000..1a2b3e6555
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/BrokerProxyImpl.h"
+#include "qmf/engine/ConsoleImpl.h"
+#include "qmf/engine/Protocol.h"
+#include "qpid/Address.h"
+#include "qpid/sys/SystemInfo.h"
+#include <qpid/log/Statement.h>
+#include <qpid/StringUtils.h>
+#include <string.h>
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace {
+ const char* QMF_EXCHANGE = "qpid.management";
+ const char* DIR_EXCHANGE = "amq.direct";
+ const char* BROKER_KEY = "broker";
+ const char* BROKER_PACKAGE = "org.apache.qpid.broker";
+ const char* AGENT_CLASS = "agent";
+ const char* BROKER_AGENT_KEY = "agent.1.0";
+}
+
+const Object* QueryResponseImpl::getObject(uint32_t idx) const
+{
+ vector<ObjectPtr>::const_iterator iter = results.begin();
+
+ while (idx > 0) {
+ if (iter == results.end())
+ return 0;
+ iter++;
+ idx--;
+ }
+
+ return iter->get();
+}
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+BrokerEvent BrokerEventImpl::copy()
+{
+ BrokerEvent item;
+
+ ::memset(&item, 0, sizeof(BrokerEvent));
+ item.kind = kind;
+
+ STRING_REF(name);
+ STRING_REF(exchange);
+ STRING_REF(bindingKey);
+ item.context = context;
+ item.queryResponse = queryResponse.get();
+ item.methodResponse = methodResponse.get();
+
+ return item;
+}
+
+BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, Console& _console) : publicObject(pub), console(_console)
+{
+ stringstream qn;
+ qpid::TcpAddress addr;
+
+ SystemInfo::getLocalHostname(addr);
+ qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
+ queueName = qn.str();
+
+ seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
+}
+
+void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
+{
+ Mutex::ScopedLock _lock(lock);
+ agentList.clear();
+ eventQueue.clear();
+ xmtQueue.clear();
+ eventQueue.push_back(eventDeclareQueue(queueName));
+ eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
+ eventQueue.push_back(eventSetupComplete());
+
+ // TODO: Store session handle
+}
+
+void BrokerProxyImpl::sessionClosed()
+{
+ Mutex::ScopedLock _lock(lock);
+ agentList.clear();
+ eventQueue.clear();
+ xmtQueue.clear();
+}
+
+void BrokerProxyImpl::startProtocol()
+{
+ AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
+ {
+ Mutex::ScopedLock _lock(lock);
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ agentList[0] = agent;
+
+ requestsOutstanding = 1;
+ topicBound = false;
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
+ }
+
+ console.impl->eventAgentAdded(agent);
+}
+
+void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+{
+ uint32_t length = buf.getPosition();
+ MessageImpl::Ptr message(new MessageImpl);
+
+ buf.reset();
+ buf.getRawData(message->body, length);
+ message->destination = destination;
+ message->routingKey = routingKey;
+ message->replyExchange = DIR_EXCHANGE;
+ message->replyKey = queueName;
+
+ xmtQueue.push_back(message);
+}
+
+void BrokerProxyImpl::handleRcvMessage(Message& message)
+{
+ Buffer inBuffer(message.body, message.length);
+ uint8_t opcode;
+ uint32_t sequence;
+
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
+ seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer);
+}
+
+bool BrokerProxyImpl::getXmtMessage(Message& item) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (xmtQueue.empty())
+ return false;
+ item = xmtQueue.front()->copy();
+ return true;
+}
+
+void BrokerProxyImpl::popXmt()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!xmtQueue.empty())
+ xmtQueue.pop_front();
+}
+
+bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void BrokerProxyImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+uint32_t BrokerProxyImpl::agentCount() const
+{
+ Mutex::ScopedLock _lock(lock);
+ return agentList.size();
+}
+
+const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
+{
+ Mutex::ScopedLock _lock(lock);
+ for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++)
+ if (idx-- == 0)
+ return iter->second.get();
+ return 0;
+}
+
+void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
+{
+ SequenceContext::Ptr queryContext(new QueryContext(*this, context));
+ Mutex::ScopedLock _lock(lock);
+ if (agent != 0) {
+ sendGetRequestLH(queryContext, query, agent);
+ } else {
+ // TODO (optimization) only send queries to agents that have the requested class+package
+ for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++) {
+ sendGetRequestLH(queryContext, query, iter->second.get());
+ }
+ }
+}
+
+void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent)
+{
+ stringstream key;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(queryContext));
+ agent->impl->addSequence(sequence);
+
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ query.impl->encode(outBuffer);
+ key << "agent.1." << agent->impl->agentBank;
+ sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
+}
+
+string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const Value* argmap, Buffer& buffer)
+{
+ int argCount = schema->getArgumentCount();
+
+ if (argmap == 0 || !argmap->isMap())
+ return string("Arguments must be in a map value");
+
+ for (int aIdx = 0; aIdx < argCount; aIdx++) {
+ const SchemaArgument* arg(schema->getArgument(aIdx));
+ if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) {
+ if (argmap->keyInMap(arg->getName())) {
+ const Value* argVal(argmap->byKey(arg->getName()));
+ if (argVal->getType() != arg->getType())
+ return string("Argument is the wrong type: ") + arg->getName();
+ argVal->impl->encode(buffer);
+ } else {
+ Value defaultValue(arg->getType());
+ defaultValue.impl->encode(buffer);
+ }
+ }
+ }
+
+ return string();
+}
+
+void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls,
+ const string& methodName, const Value* args, void* userContext)
+{
+ int methodCount = cls->getMethodCount();
+ int idx;
+ for (idx = 0; idx < methodCount; idx++) {
+ const SchemaMethod* method = cls->getMethod(idx);
+ if (string(method->getName()) == methodName) {
+ Mutex::ScopedLock _lock(lock);
+ SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method));
+ stringstream key;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(methodContext));
+
+ Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST, sequence);
+ oid->impl->encode(outBuffer);
+ cls->getClassKey()->impl->encode(outBuffer);
+ outBuffer.putShortString(methodName);
+
+ string argErrorString = encodeMethodArguments(method, args, outBuffer);
+ if (argErrorString.empty()) {
+ key << "agent.1." << oid->impl->getAgentBank();
+ sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << " method=" << methodName << " key=" << key.str());
+ } else {
+ MethodResponsePtr argError(MethodResponseImpl::factory(1, argErrorString));
+ eventQueue.push_back(eventMethodResponse(userContext, argError));
+ }
+ return;
+ }
+ }
+
+ MethodResponsePtr error(MethodResponseImpl::factory(1, string("Unknown method: ") + methodName));
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(eventMethodResponse(userContext, error));
+}
+
+void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
+{
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(eventBind(exchange, queueName, key));
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
+ event->name = queueName;
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
+ event->name = queue;
+ event->exchange = exchange;
+ event->bindingKey = key;
+
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
+{
+ QPID_LOG(trace, "Console Link to Broker Stable");
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponsePtr response)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
+ event->context = context;
+ event->queryResponse = response;
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponsePtr response)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::METHOD_RESPONSE));
+ event->context = context;
+ event->methodResponse = response;
+ return event;
+}
+
+void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+{
+ brokerId.decode(inBuffer);
+ QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
+ Mutex::ScopedLock _lock(lock);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ incOutstandingLH();
+ Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
+}
+
+void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
+{
+ string package;
+
+ inBuffer.getShortString(package);
+ QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
+ console.impl->learnPackage(package);
+
+ Mutex::ScopedLock _lock(lock);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ incOutstandingLH();
+ Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
+ outBuffer.putShortString(package);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
+}
+
+void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
+{
+ string text;
+ uint32_t code = inBuffer.getLong();
+ inBuffer.getShortString(text);
+ QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
+}
+
+void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
+{
+ uint8_t kind = inBuffer.getOctet();
+ auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
+
+ QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey->impl->str());
+
+ if (!console.impl->haveClass(classKey.get())) {
+ Mutex::ScopedLock _lock(lock);
+ incOutstandingLH();
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
+ classKey->impl->encode(outBuffer);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey->impl->str());
+ }
+}
+
+MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema)
+{
+ MethodResponsePtr response(MethodResponseImpl::factory(inBuffer, schema));
+
+ QPID_LOG(trace, "RCVD MethodResponse seq=" << seq << " status=" << response->getStatus() << " text=" <<
+ response->getException()->asString());
+
+ return response;
+}
+
+void BrokerProxyImpl::handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq, const string& routingKey)
+{
+ vector<string> tokens = qpid::split(routingKey, ".");
+ uint32_t agentBank;
+ uint64_t timestamp;
+
+ if (routingKey.empty() || tokens.size() != 4)
+ agentBank = 0;
+ else
+ agentBank = ::atoi(tokens[3].c_str());
+
+ timestamp = inBuffer.getLongLong();
+ map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
+ if (iter != agentList.end()) {
+ console.impl->eventAgentHeartbeat(iter->second, timestamp);
+ }
+ QPID_LOG(trace, "RCVD HeartbeatIndication seq=" << seq << " agentBank=" << agentBank);
+}
+
+void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
+{
+ SchemaObjectClass* oClassPtr;
+ SchemaEventClass* eClassPtr;
+ uint8_t kind = inBuffer.getOctet();
+ const SchemaClassKey* key;
+ if (kind == CLASS_OBJECT) {
+ oClassPtr = SchemaObjectClassImpl::factory(inBuffer);
+ console.impl->learnClass(oClassPtr);
+ key = oClassPtr->getClassKey();
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str());
+
+ //
+ // If we have just learned about the org.apache.qpid.broker:agent class, send a get
+ // request for the current list of agents so we can have it on-hand before we declare
+ // this session "stable".
+ //
+ if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) {
+ Mutex::ScopedLock _lock(lock);
+ incOutstandingLH();
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ FieldTable ft;
+ ft.setString("_class", AGENT_CLASS);
+ ft.setString("_package", BROKER_PACKAGE);
+ ft.encode(outBuffer);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
+ }
+ } else if (kind == CLASS_EVENT) {
+ eClassPtr = SchemaEventClassImpl::factory(inBuffer);
+ console.impl->learnClass(eClassPtr);
+ key = eClassPtr->getClassKey();
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str());
+ }
+ else {
+ QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
+ }
+}
+
+ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
+{
+ auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
+ QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str());
+
+ SchemaObjectClass* schema = console.impl->getSchema(classKey.get());
+ if (schema == 0) {
+ QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str());
+ return ObjectPtr();
+ }
+
+ ObjectPtr optr(ObjectImpl::factory(schema, this, inBuffer, prop, stat, true));
+ if (prop && classKey->impl->getPackageName() == BROKER_PACKAGE && classKey->impl->getClassName() == AGENT_CLASS) {
+ //
+ // We've intercepted information about a remote agent... update the agent list accordingly
+ //
+ updateAgentList(optr);
+ }
+ return optr;
+}
+
+void BrokerProxyImpl::updateAgentList(ObjectPtr obj)
+{
+ Value* value = obj->getValue("agentBank");
+ Mutex::ScopedLock _lock(lock);
+ if (value != 0 && value->isUint()) {
+ uint32_t agentBank = value->asUint();
+ if (obj->isDeleted()) {
+ map<uint32_t, AgentProxyPtr>::iterator iter = agentList.find(agentBank);
+ if (iter != agentList.end()) {
+ AgentProxyPtr agent(iter->second);
+ console.impl->eventAgentDeleted(agent);
+ agentList.erase(agentBank);
+ QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list");
+
+ //
+ // Release all sequence numbers for requests in-flight to this agent.
+ // Since the agent is no longer connected, these requests would not
+ // otherwise complete.
+ //
+ agent->impl->releaseInFlight(seqMgr);
+ }
+ } else {
+ Value* str = obj->getValue("label");
+ string label;
+ if (str != 0 && str->isString())
+ label = str->asString();
+ map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
+ if (iter == agentList.end()) {
+ AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, agentBank, label));
+ agentList[agentBank] = agent;
+ console.impl->eventAgentAdded(agent);
+ QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank);
+ }
+ }
+ }
+}
+
+void BrokerProxyImpl::incOutstandingLH()
+{
+ requestsOutstanding++;
+}
+
+void BrokerProxyImpl::decOutstanding()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding--;
+ if (requestsOutstanding == 0 && !topicBound) {
+ topicBound = true;
+ for (vector<pair<string, string> >::const_iterator iter = console.impl->bindingList.begin();
+ iter != console.impl->bindingList.end(); iter++) {
+ string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
+ string key(iter->second);
+ eventQueue.push_back(eventBind(exchange, queueName, key));
+ }
+ eventQueue.push_back(eventStable());
+ }
+}
+
+MethodResponseImpl::MethodResponseImpl(const MethodResponseImpl& from) :
+ status(from.status), schema(from.schema)
+{
+ if (from.exception.get())
+ exception.reset(new Value(*(from.exception)));
+ if (from.arguments.get())
+ arguments.reset(new Value(*(from.arguments)));
+}
+
+MethodResponseImpl::MethodResponseImpl(Buffer& buf, const SchemaMethod* s) : schema(s)
+{
+ string text;
+
+ status = buf.getLong();
+ buf.getMediumString(text);
+ exception.reset(new Value(TYPE_LSTR));
+ exception->setString(text.c_str());
+
+ if (status != 0)
+ return;
+
+ arguments.reset(new Value(TYPE_MAP));
+ int argCount(schema->getArgumentCount());
+ for (int idx = 0; idx < argCount; idx++) {
+ const SchemaArgument* arg = schema->getArgument(idx);
+ if (arg->getDirection() == DIR_OUT || arg->getDirection() == DIR_IN_OUT) {
+ Value* value(ValueImpl::factory(arg->getType(), buf));
+ arguments->insert(arg->getName(), value);
+ }
+ }
+}
+
+MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : schema(0)
+{
+ status = s;
+ exception.reset(new Value(TYPE_LSTR));
+ exception->setString(text.c_str());
+}
+
+MethodResponse* MethodResponseImpl::factory(Buffer& buf, const SchemaMethod* schema)
+{
+ MethodResponseImpl* impl(new MethodResponseImpl(buf, schema));
+ return new MethodResponse(impl);
+}
+
+MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string& text)
+{
+ MethodResponseImpl* impl(new MethodResponseImpl(status, text));
+ return new MethodResponse(impl);
+}
+
+bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& routingKey, Buffer& buffer)
+{
+ ObjectPtr object;
+ bool completeContext = false;
+
+ if (opcode == Protocol::OP_BROKER_RESPONSE) {
+ broker.handleBrokerResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
+ broker.handleSchemaResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_PACKAGE_INDICATION)
+ broker.handlePackageIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_CLASS_INDICATION)
+ broker.handleClassIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
+ broker.handleHeartbeatIndication(buffer, sequence, routingKey);
+ else if (opcode == Protocol::OP_EVENT_INDICATION)
+ broker.handleEventIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_PROPERTY_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, false);
+ broker.console.impl->eventObjectUpdate(object, true, false);
+ }
+ else if (opcode == Protocol::OP_STATISTIC_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, false, true);
+ broker.console.impl->eventObjectUpdate(object, false, true);
+ }
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, true);
+ broker.console.impl->eventObjectUpdate(object, true, true);
+ }
+ else {
+ QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
+void QueryContext::reserve()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding++;
+}
+
+void QueryContext::release()
+{
+ {
+ Mutex::ScopedLock _lock(lock);
+ if (--requestsOutstanding > 0)
+ return;
+ }
+
+ Mutex::ScopedLock _block(broker.lock);
+ broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
+}
+
+bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
+{
+ bool completeContext = false;
+ ObjectPtr object;
+
+ if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+
+ //
+ // Visit each agent and remove the sequence from that agent's in-flight list.
+ // This could be made more efficient because only one agent will have this sequence
+ // in its list.
+ //
+ map<uint32_t, AgentProxyPtr> copy;
+ {
+ Mutex::ScopedLock _block(broker.lock);
+ copy = broker.agentList;
+ }
+ for (map<uint32_t, AgentProxyPtr>::iterator iter = copy.begin(); iter != copy.end(); iter++)
+ iter->second->impl->delSequence(sequence);
+ }
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, true);
+ if (object.get() != 0)
+ queryResponse->impl->results.push_back(object);
+ }
+ else {
+ QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
+void MethodContext::release()
+{
+ Mutex::ScopedLock _block(broker.lock);
+ broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse));
+}
+
+bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
+{
+ if (opcode == Protocol::OP_METHOD_RESPONSE)
+ methodResponse = broker.handleMethodResponse(buffer, sequence, schema);
+ else
+ QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+
+ return true;
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
+AgentProxy::~AgentProxy() { delete impl; }
+const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
+uint32_t AgentProxy::getBrokerBank() const { return impl->getBrokerBank(); }
+uint32_t AgentProxy::getAgentBank() const { return impl->getAgentBank(); }
+
+BrokerProxy::BrokerProxy(Console& console) : impl(new BrokerProxyImpl(*this, console)) {}
+BrokerProxy::~BrokerProxy() { delete impl; }
+void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
+void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
+void BrokerProxy::startProtocol() { impl->startProtocol(); }
+void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void BrokerProxy::popXmt() { impl->popXmt(); }
+bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
+void BrokerProxy::popEvent() { impl->popEvent(); }
+uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
+const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
+void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
+
+MethodResponse::MethodResponse(const MethodResponse& from) : impl(new MethodResponseImpl(*(from.impl))) {}
+MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
+MethodResponse::~MethodResponse() {}
+uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
+const Value* MethodResponse::getException() const { return impl->getException(); }
+const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
+
+QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
+QueryResponse::~QueryResponse() {}
+uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
+const Value* QueryResponse::getException() const { return impl->getException(); }
+uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
+const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
+
diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
new file mode 100644
index 0000000000..798a5fdc76
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
@@ -0,0 +1,239 @@
+#ifndef _QmfEngineBrokerProxyImpl_
+#define _QmfEngineBrokerProxyImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/Console.h"
+#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/SchemaImpl.h"
+#include "qmf/engine/ValueImpl.h"
+#include "qmf/engine/QueryImpl.h"
+#include "qmf/engine/SequenceManager.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include "boost/shared_ptr.hpp"
+#include "boost/noncopyable.hpp"
+#include <memory>
+#include <string>
+#include <deque>
+#include <map>
+#include <set>
+#include <vector>
+
+namespace qmf {
+namespace engine {
+
+ typedef boost::shared_ptr<MethodResponse> MethodResponsePtr;
+ struct MethodResponseImpl {
+ uint32_t status;
+ const SchemaMethod* schema;
+ std::auto_ptr<Value> exception;
+ std::auto_ptr<Value> arguments;
+
+ MethodResponseImpl(const MethodResponseImpl& from);
+ MethodResponseImpl(qpid::framing::Buffer& buf, const SchemaMethod* schema);
+ MethodResponseImpl(uint32_t status, const std::string& text);
+ static MethodResponse* factory(qpid::framing::Buffer& buf, const SchemaMethod* schema);
+ static MethodResponse* factory(uint32_t status, const std::string& text);
+ ~MethodResponseImpl() {}
+ uint32_t getStatus() const { return status; }
+ const Value* getException() const { return exception.get(); }
+ const Value* getArgs() const { return arguments.get(); }
+ };
+
+ typedef boost::shared_ptr<QueryResponse> QueryResponsePtr;
+ struct QueryResponseImpl {
+ uint32_t status;
+ std::auto_ptr<Value> exception;
+ std::vector<ObjectPtr> results;
+
+ QueryResponseImpl() : status(0) {}
+ static QueryResponse* factory() {
+ QueryResponseImpl* impl(new QueryResponseImpl());
+ return new QueryResponse(impl);
+ }
+ ~QueryResponseImpl() {}
+ uint32_t getStatus() const { return status; }
+ const Value* getException() const { return exception.get(); }
+ uint32_t getObjectCount() const { return results.size(); }
+ const Object* getObject(uint32_t idx) const;
+ };
+
+ struct BrokerEventImpl {
+ typedef boost::shared_ptr<BrokerEventImpl> Ptr;
+ BrokerEvent::EventKind kind;
+ std::string name;
+ std::string exchange;
+ std::string bindingKey;
+ void* context;
+ QueryResponsePtr queryResponse;
+ MethodResponsePtr methodResponse;
+
+ BrokerEventImpl(BrokerEvent::EventKind k) : kind(k), context(0) {}
+ ~BrokerEventImpl() {}
+ BrokerEvent copy();
+ };
+
+ typedef boost::shared_ptr<AgentProxy> AgentProxyPtr;
+ struct AgentProxyImpl {
+ Console& console;
+ BrokerProxy& broker;
+ uint32_t agentBank;
+ std::string label;
+ std::set<uint32_t> inFlightSequences;
+
+ AgentProxyImpl(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) : console(c), broker(b), agentBank(ab), label(l) {}
+ static AgentProxy* factory(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) {
+ AgentProxyImpl* impl(new AgentProxyImpl(c, b, ab, l));
+ return new AgentProxy(impl);
+ }
+ ~AgentProxyImpl() {}
+ const std::string& getLabel() const { return label; }
+ uint32_t getBrokerBank() const { return 1; }
+ uint32_t getAgentBank() const { return agentBank; }
+ void addSequence(uint32_t seq) { inFlightSequences.insert(seq); }
+ void delSequence(uint32_t seq) { inFlightSequences.erase(seq); }
+ void releaseInFlight(SequenceManager& seqMgr) {
+ for (std::set<uint32_t>::iterator iter = inFlightSequences.begin(); iter != inFlightSequences.end(); iter++)
+ seqMgr.release(*iter);
+ inFlightSequences.clear();
+ }
+ };
+
+ class BrokerProxyImpl : public boost::noncopyable {
+ public:
+ BrokerProxyImpl(BrokerProxy& pub, Console& _console);
+ ~BrokerProxyImpl() {}
+
+ void sessionOpened(SessionHandle& sh);
+ void sessionClosed();
+ void startProtocol();
+
+ void sendBufferLH(qpid::framing::Buffer& buf, const std::string& destination, const std::string& routingKey);
+ void handleRcvMessage(Message& message);
+ bool getXmtMessage(Message& item) const;
+ void popXmt();
+
+ bool getEvent(BrokerEvent& event) const;
+ void popEvent();
+
+ uint32_t agentCount() const;
+ const AgentProxy* getAgent(uint32_t idx) const;
+ void sendQuery(const Query& query, void* context, const AgentProxy* agent);
+ void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent);
+ std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer);
+ void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context);
+
+ void addBinding(const std::string& exchange, const std::string& key);
+ void staticRelease() { decOutstanding(); }
+
+ private:
+ friend struct StaticContext;
+ friend struct QueryContext;
+ friend struct MethodContext;
+ BrokerProxy& publicObject;
+ mutable qpid::sys::Mutex lock;
+ Console& console;
+ std::string queueName;
+ qpid::framing::Uuid brokerId;
+ SequenceManager seqMgr;
+ uint32_t requestsOutstanding;
+ bool topicBound;
+ std::map<uint32_t, AgentProxyPtr> agentList;
+ std::deque<MessageImpl::Ptr> xmtQueue;
+ std::deque<BrokerEventImpl::Ptr> eventQueue;
+
+# define MA_BUFFER_SIZE 65536
+ char outputBuffer[MA_BUFFER_SIZE];
+
+ BrokerEventImpl::Ptr eventDeclareQueue(const std::string& queueName);
+ BrokerEventImpl::Ptr eventBind(const std::string& exchange, const std::string& queue, const std::string& key);
+ BrokerEventImpl::Ptr eventSetupComplete();
+ BrokerEventImpl::Ptr eventStable();
+ BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponsePtr response);
+ BrokerEventImpl::Ptr eventMethodResponse(void* context, MethodResponsePtr response);
+
+ void handleBrokerResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handlePackageIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleCommandComplete(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleClassIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ MethodResponsePtr handleMethodResponse(qpid::framing::Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema);
+ void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, const std::string& routingKey);
+ void handleEventIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleSchemaResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ ObjectPtr handleObjectIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
+ void updateAgentList(ObjectPtr obj);
+ void incOutstandingLH();
+ void decOutstanding();
+ };
+
+ //
+ // StaticContext is used to handle:
+ //
+ // 1) Responses to console-level requests (for schema info, etc.)
+ // 2) Unsolicited messages from agents (events, published updates, etc.)
+ //
+ struct StaticContext : public SequenceContext {
+ StaticContext(BrokerProxyImpl& b) : broker(b) {}
+ virtual ~StaticContext() {}
+ void reserve() {}
+ void release() { broker.staticRelease(); }
+ bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
+ BrokerProxyImpl& broker;
+ };
+
+ //
+ // QueryContext is used to track and handle responses associated with a single Get Query
+ //
+ struct QueryContext : public SequenceContext {
+ QueryContext(BrokerProxyImpl& b, void* u) :
+ broker(b), userContext(u), requestsOutstanding(0), queryResponse(QueryResponseImpl::factory()) {}
+ virtual ~QueryContext() {}
+ void reserve();
+ void release();
+ bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
+
+ mutable qpid::sys::Mutex lock;
+ BrokerProxyImpl& broker;
+ void* userContext;
+ uint32_t requestsOutstanding;
+ QueryResponsePtr queryResponse;
+ };
+
+ struct MethodContext : public SequenceContext {
+ MethodContext(BrokerProxyImpl& b, void* u, const SchemaMethod* s) : broker(b), userContext(u), schema(s) {}
+ virtual ~MethodContext() {}
+ void reserve() {}
+ void release();
+ bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
+
+ BrokerProxyImpl& broker;
+ void* userContext;
+ const SchemaMethod* schema;
+ MethodResponsePtr methodResponse;
+ };
+
+}
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp
new file mode 100644
index 0000000000..2cd6af10f8
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/ConnectionSettingsImpl.h"
+#include "qmf/engine/Typecode.h"
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid;
+
+const string attrProtocol("protocol");
+const string attrHost("host");
+const string attrPort("port");
+const string attrVirtualhost("virtualhost");
+const string attrUsername("username");
+const string attrPassword("password");
+const string attrMechanism("mechanism");
+const string attrLocale("locale");
+const string attrHeartbeat("heartbeat");
+const string attrMaxChannels("maxChannels");
+const string attrMaxFrameSize("maxFrameSize");
+const string attrBounds("bounds");
+const string attrTcpNoDelay("tcpNoDelay");
+const string attrService("service");
+const string attrMinSsf("minSsf");
+const string attrMaxSsf("maxSsf");
+const string attrRetryDelayMin("retryDelayMin");
+const string attrRetryDelayMax("retryDelayMax");
+const string attrRetryDelayFactor("retryDelayFactor");
+const string attrSendUserId("sendUserId");
+
+ConnectionSettingsImpl::ConnectionSettingsImpl() :
+ retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2), sendUserId(true)
+{
+}
+
+ConnectionSettingsImpl::ConnectionSettingsImpl(const string& /*url*/) :
+ retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2), sendUserId(true)
+{
+ // TODO: Parse the URL
+}
+
+bool ConnectionSettingsImpl::setAttr(const string& key, const Value& value)
+{
+ if (key == attrProtocol) clientSettings.protocol = value.asString();
+ else if (key == attrHost) clientSettings.host = value.asString();
+ else if (key == attrPort) clientSettings.port = value.asUint();
+ else if (key == attrVirtualhost) clientSettings.virtualhost = value.asString();
+ else if (key == attrUsername) clientSettings.username = value.asString();
+ else if (key == attrPassword) clientSettings.password = value.asString();
+ else if (key == attrMechanism) clientSettings.mechanism = value.asString();
+ else if (key == attrLocale) clientSettings.locale = value.asString();
+ else if (key == attrHeartbeat) clientSettings.heartbeat = value.asUint();
+ else if (key == attrMaxChannels) clientSettings.maxChannels = value.asUint();
+ else if (key == attrMaxFrameSize) clientSettings.maxFrameSize = value.asUint();
+ else if (key == attrBounds) clientSettings.bounds = value.asUint();
+ else if (key == attrTcpNoDelay) clientSettings.tcpNoDelay = value.asBool();
+ else if (key == attrService) clientSettings.service = value.asString();
+ else if (key == attrMinSsf) clientSettings.minSsf = value.asUint();
+ else if (key == attrMaxSsf) clientSettings.maxSsf = value.asUint();
+
+ else if (key == attrRetryDelayMin) retryDelayMin = value.asUint();
+ else if (key == attrRetryDelayMax) retryDelayMax = value.asUint();
+ else if (key == attrRetryDelayFactor) retryDelayFactor = value.asUint();
+ else if (key == attrSendUserId) sendUserId = value.asBool();
+ else
+ return false;
+ return true;
+}
+
+Value ConnectionSettingsImpl::getAttr(const string& key) const
+{
+ Value strval(TYPE_LSTR);
+ Value intval(TYPE_UINT32);
+ Value boolval(TYPE_BOOL);
+
+ if (key == attrProtocol) {
+ strval.setString(clientSettings.protocol.c_str());
+ return strval;
+ }
+
+ if (key == attrHost) {
+ strval.setString(clientSettings.host.c_str());
+ return strval;
+ }
+
+ if (key == attrPort) {
+ intval.setUint(clientSettings.port);
+ return intval;
+ }
+
+ if (key == attrVirtualhost) {
+ strval.setString(clientSettings.virtualhost.c_str());
+ return strval;
+ }
+
+ if (key == attrUsername) {
+ strval.setString(clientSettings.username.c_str());
+ return strval;
+ }
+
+ if (key == attrPassword) {
+ strval.setString(clientSettings.password.c_str());
+ return strval;
+ }
+
+ if (key == attrMechanism) {
+ strval.setString(clientSettings.mechanism.c_str());
+ return strval;
+ }
+
+ if (key == attrLocale) {
+ strval.setString(clientSettings.locale.c_str());
+ return strval;
+ }
+
+ if (key == attrHeartbeat) {
+ intval.setUint(clientSettings.heartbeat);
+ return intval;
+ }
+
+ if (key == attrMaxChannels) {
+ intval.setUint(clientSettings.maxChannels);
+ return intval;
+ }
+
+ if (key == attrMaxFrameSize) {
+ intval.setUint(clientSettings.maxFrameSize);
+ return intval;
+ }
+
+ if (key == attrBounds) {
+ intval.setUint(clientSettings.bounds);
+ return intval;
+ }
+
+ if (key == attrTcpNoDelay) {
+ boolval.setBool(clientSettings.tcpNoDelay);
+ return boolval;
+ }
+
+ if (key == attrService) {
+ strval.setString(clientSettings.service.c_str());
+ return strval;
+ }
+
+ if (key == attrMinSsf) {
+ intval.setUint(clientSettings.minSsf);
+ return intval;
+ }
+
+ if (key == attrMaxSsf) {
+ intval.setUint(clientSettings.maxSsf);
+ return intval;
+ }
+
+ if (key == attrRetryDelayMin) {
+ intval.setUint(retryDelayMin);
+ return intval;
+ }
+
+ if (key == attrRetryDelayMax) {
+ intval.setUint(retryDelayMax);
+ return intval;
+ }
+
+ if (key == attrRetryDelayFactor) {
+ intval.setUint(retryDelayFactor);
+ return intval;
+ }
+
+ return strval;
+}
+
+const string& ConnectionSettingsImpl::getAttrString() const
+{
+ // TODO: build and return attribute string
+ return attrString;
+}
+
+void ConnectionSettingsImpl::transportTcp(uint16_t port)
+{
+ clientSettings.protocol = "tcp";
+ clientSettings.port = port;
+}
+
+void ConnectionSettingsImpl::transportSsl(uint16_t port)
+{
+ clientSettings.protocol = "ssl";
+ clientSettings.port = port;
+}
+
+void ConnectionSettingsImpl::transportRdma(uint16_t port)
+{
+ clientSettings.protocol = "rdma";
+ clientSettings.port = port;
+}
+
+void ConnectionSettingsImpl::authAnonymous(const string& username)
+{
+ clientSettings.mechanism = "ANONYMOUS";
+ clientSettings.username = username;
+}
+
+void ConnectionSettingsImpl::authPlain(const string& username, const string& password)
+{
+ clientSettings.mechanism = "PLAIN";
+ clientSettings.username = username;
+ clientSettings.password = password;
+}
+
+void ConnectionSettingsImpl::authGssapi(const string& serviceName, uint32_t minSsf, uint32_t maxSsf)
+{
+ clientSettings.mechanism = "GSSAPI";
+ clientSettings.service = serviceName;
+ clientSettings.minSsf = minSsf;
+ clientSettings.maxSsf = maxSsf;
+}
+
+void ConnectionSettingsImpl::setRetry(int delayMin, int delayMax, int delayFactor)
+{
+ retryDelayMin = delayMin;
+ retryDelayMax = delayMax;
+ retryDelayFactor = delayFactor;
+}
+
+const client::ConnectionSettings& ConnectionSettingsImpl::getClientSettings() const
+{
+ return clientSettings;
+}
+
+void ConnectionSettingsImpl::getRetrySettings(int* min, int* max, int* factor) const
+{
+ *min = retryDelayMin;
+ *max = retryDelayMax;
+ *factor = retryDelayFactor;
+}
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+ConnectionSettings::ConnectionSettings(const ConnectionSettings& from) { impl = new ConnectionSettingsImpl(*from.impl); }
+ConnectionSettings::ConnectionSettings() { impl = new ConnectionSettingsImpl(); }
+ConnectionSettings::ConnectionSettings(const char* url) { impl = new ConnectionSettingsImpl(url); }
+ConnectionSettings::~ConnectionSettings() { delete impl; }
+bool ConnectionSettings::setAttr(const char* key, const Value& value) { return impl->setAttr(key, value); }
+Value ConnectionSettings::getAttr(const char* key) const { return impl->getAttr(key); }
+const char* ConnectionSettings::getAttrString() const { return impl->getAttrString().c_str(); }
+void ConnectionSettings::transportTcp(uint16_t port) { impl->transportTcp(port); }
+void ConnectionSettings::transportSsl(uint16_t port) { impl->transportSsl(port); }
+void ConnectionSettings::transportRdma(uint16_t port) { impl->transportRdma(port); }
+void ConnectionSettings::authAnonymous(const char* username) { impl->authAnonymous(username); }
+void ConnectionSettings::authPlain(const char* username, const char* password) { impl->authPlain(username, password); }
+void ConnectionSettings::authGssapi(const char* serviceName, uint32_t minSsf, uint32_t maxSsf) { impl->authGssapi(serviceName, minSsf, maxSsf); }
+void ConnectionSettings::setRetry(int delayMin, int delayMax, int delayFactor) { impl->setRetry(delayMin, delayMax, delayFactor); }
+
diff --git a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h
new file mode 100644
index 0000000000..98bf87868b
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h
@@ -0,0 +1,63 @@
+#ifndef _QmfEngineConnectionSettingsImpl_
+#define _QmfEngineConnectionSettingsImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/ConnectionSettings.h"
+#include "qmf/engine/Value.h"
+#include "qpid/client/ConnectionSettings.h"
+#include <string>
+#include <map>
+
+namespace qmf {
+namespace engine {
+
+ class ConnectionSettingsImpl {
+ qpid::client::ConnectionSettings clientSettings;
+ mutable std::string attrString;
+ int retryDelayMin;
+ int retryDelayMax;
+ int retryDelayFactor;
+ bool sendUserId;
+
+ public:
+ ConnectionSettingsImpl();
+ ConnectionSettingsImpl(const std::string& url);
+ ~ConnectionSettingsImpl() {}
+ bool setAttr(const std::string& key, const Value& value);
+ Value getAttr(const std::string& key) const;
+ const std::string& getAttrString() const;
+ void transportTcp(uint16_t port);
+ void transportSsl(uint16_t port);
+ void transportRdma(uint16_t port);
+ void authAnonymous(const std::string& username);
+ void authPlain(const std::string& username, const std::string& password);
+ void authGssapi(const std::string& serviceName, uint32_t minSsf, uint32_t maxSsf);
+ void setRetry(int delayMin, int delayMax, int delayFactor);
+
+ const qpid::client::ConnectionSettings& getClientSettings() const;
+ void getRetrySettings(int* delayMin, int* delayMax, int* delayFactor) const;
+ bool getSendUserId() const { return sendUserId; }
+ };
+
+}
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
new file mode 100644
index 0000000000..c2d1f51f2b
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/ConsoleImpl.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qmf/engine/SchemaImpl.h"
+#include "qmf/engine/Typecode.h"
+#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/ObjectIdImpl.h"
+#include "qmf/engine/QueryImpl.h"
+#include "qmf/engine/ValueImpl.h"
+#include "qmf/engine/Protocol.h"
+#include "qmf/engine/SequenceManager.h"
+#include "qmf/engine/BrokerProxyImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/SystemInfo.h>
+#include <string.h>
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace {
+ const char* QMF_EXCHANGE = "qpid.management";
+}
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+ConsoleEvent ConsoleEventImpl::copy()
+{
+ ConsoleEvent item;
+
+ ::memset(&item, 0, sizeof(ConsoleEvent));
+ item.kind = kind;
+ item.agent = agent.get();
+ item.classKey = classKey;
+ item.object = object.get();
+ item.context = context;
+ item.event = event;
+ item.timestamp = timestamp;
+ item.hasProps = hasProps;
+ item.hasStats = hasStats;
+
+ STRING_REF(name);
+
+ return item;
+}
+
+ConsoleImpl::ConsoleImpl(const ConsoleSettings& s) : settings(s)
+{
+ bindingList.push_back(pair<string, string>(string(), "schema.#"));
+ if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
+ bindingList.push_back(pair<string, string>(string(), "console.#"));
+ } else {
+ if (settings.rcvObjects && !settings.userBindings)
+ bindingList.push_back(pair<string, string>(string(), "console.obj.#"));
+ else
+ bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent"));
+ if (settings.rcvEvents)
+ bindingList.push_back(pair<string, string>(string(), "console.event.#"));
+ if (settings.rcvHeartbeats)
+ bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#"));
+ }
+}
+
+ConsoleImpl::~ConsoleImpl()
+{
+ // This function intentionally left blank.
+}
+
+bool ConsoleImpl::getEvent(ConsoleEvent& event) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void ConsoleImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+void ConsoleImpl::addConnection(BrokerProxy& broker, void* /*context*/)
+{
+ Mutex::ScopedLock _lock(lock);
+ brokerList.push_back(broker.impl);
+}
+
+void ConsoleImpl::delConnection(BrokerProxy& broker)
+{
+ Mutex::ScopedLock _lock(lock);
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ if (*iter == broker.impl) {
+ brokerList.erase(iter);
+ break;
+ }
+}
+
+uint32_t ConsoleImpl::packageCount() const
+{
+ Mutex::ScopedLock _lock(lock);
+ return packages.size();
+}
+
+const string& ConsoleImpl::getPackageName(uint32_t idx) const
+{
+ const static string empty;
+
+ Mutex::ScopedLock _lock(lock);
+ if (idx >= packages.size())
+ return empty;
+
+ PackageList::const_iterator iter = packages.begin();
+ for (uint32_t i = 0; i < idx; i++) iter++;
+ return iter->first;
+}
+
+uint32_t ConsoleImpl::classCount(const char* packageName) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+
+ return oList.size() + eList.size();
+}
+
+const SchemaClassKey* ConsoleImpl::getClass(const char* packageName, uint32_t idx) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+ uint32_t count = 0;
+
+ for (ObjectClassList::const_iterator oIter = oList.begin();
+ oIter != oList.end(); oIter++) {
+ if (count == idx)
+ return oIter->second->getClassKey();
+ count++;
+ }
+
+ for (EventClassList::const_iterator eIter = eList.begin();
+ eIter != eList.end(); eIter++) {
+ if (count == idx)
+ return eIter->second->getClassKey();
+ count++;
+ }
+
+ return 0;
+}
+
+ClassKind ConsoleImpl::getClassKind(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return CLASS_OBJECT;
+
+ const EventClassList& eList = pIter->second.second;
+ if (eList.find(key) != eList.end())
+ return CLASS_EVENT;
+ return CLASS_OBJECT;
+}
+
+const SchemaObjectClass* ConsoleImpl::getObjectClass(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ ObjectClassList::const_iterator iter = oList.find(key);
+ if (iter == oList.end())
+ return 0;
+ return iter->second;
+}
+
+const SchemaEventClass* ConsoleImpl::getEventClass(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const EventClassList& eList = pIter->second.second;
+ EventClassList::const_iterator iter = eList.find(key);
+ if (iter == eList.end())
+ return 0;
+ return iter->second;
+}
+
+void ConsoleImpl::bindPackage(const char* packageName)
+{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+void ConsoleImpl::bindClass(const SchemaClassKey* classKey)
+{
+ stringstream key;
+ key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+void ConsoleImpl::bindClass(const char* packageName, const char* className)
+{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << "." << className << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+/*
+void ConsoleImpl::startSync(const Query& query, void* context, SyncQuery& sync)
+{
+}
+
+void ConsoleImpl::touchSync(SyncQuery& sync)
+{
+}
+
+void ConsoleImpl::endSync(SyncQuery& sync)
+{
+}
+*/
+
+void ConsoleImpl::learnPackage(const string& packageName)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (packages.find(packageName) == packages.end()) {
+ packages.insert(pair<string, pair<ObjectClassList, EventClassList> >
+ (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList())));
+ eventNewPackage(packageName);
+ }
+}
+
+void ConsoleImpl::learnClass(SchemaObjectClass* cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ const SchemaClassKey* key = cls->getClassKey();
+ PackageList::iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return;
+
+ ObjectClassList& list = pIter->second.first;
+ if (list.find(key) == list.end()) {
+ list[key] = cls;
+ eventNewClass(key);
+ }
+}
+
+void ConsoleImpl::learnClass(SchemaEventClass* cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ const SchemaClassKey* key = cls->getClassKey();
+ PackageList::iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return;
+
+ EventClassList& list = pIter->second.second;
+ if (list.find(key) == list.end()) {
+ list[key] = cls;
+ eventNewClass(key);
+ }
+}
+
+bool ConsoleImpl::haveClass(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return false;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+
+ return oList.find(key) != oList.end() || eList.find(key) != eList.end();
+}
+
+SchemaObjectClass* ConsoleImpl::getSchema(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ ObjectClassList::const_iterator iter = oList.find(key);
+ if (iter == oList.end())
+ return 0;
+
+ return iter->second;
+}
+
+void ConsoleImpl::eventAgentAdded(boost::shared_ptr<AgentProxy> agent)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_ADDED));
+ event->agent = agent;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventAgentDeleted(boost::shared_ptr<AgentProxy> agent)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_DELETED));
+ event->agent = agent;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventNewPackage(const string& packageName)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_PACKAGE));
+ event->name = packageName;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventNewClass(const SchemaClassKey* key)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_CLASS));
+ event->classKey = key;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventObjectUpdate(ObjectPtr object, bool prop, bool stat)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::OBJECT_UPDATE));
+ event->object = object;
+ event->hasProps = prop;
+ event->hasStats = stat;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_HEARTBEAT));
+ event->agent = agent;
+ event->timestamp = timestamp;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Console::Console(const ConsoleSettings& settings) : impl(new ConsoleImpl(settings)) {}
+Console::~Console() { delete impl; }
+bool Console::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }
+void Console::popEvent() { impl->popEvent(); }
+void Console::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); }
+void Console::delConnection(BrokerProxy& broker) { impl->delConnection(broker); }
+uint32_t Console::packageCount() const { return impl->packageCount(); }
+const char* Console::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); }
+uint32_t Console::classCount(const char* packageName) const { return impl->classCount(packageName); }
+const SchemaClassKey* Console::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); }
+ClassKind Console::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); }
+const SchemaObjectClass* Console::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); }
+const SchemaEventClass* Console::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); }
+void Console::bindPackage(const char* packageName) { impl->bindPackage(packageName); }
+void Console::bindClass(const SchemaClassKey* key) { impl->bindClass(key); }
+void Console::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); }
+//void Console::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); }
+//void Console::touchSync(SyncQuery& sync) { impl->touchSync(sync); }
+//void Console::endSync(SyncQuery& sync) { impl->endSync(sync); }
+
+
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.h b/qpid/cpp/src/qmf/engine/ConsoleImpl.h
new file mode 100644
index 0000000000..8f99c5e6b9
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.h
@@ -0,0 +1,145 @@
+#ifndef _QmfEngineConsoleEngineImpl_
+#define _QmfEngineConsoleEngineImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/Console.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qmf/engine/SchemaImpl.h"
+#include "qmf/engine/Typecode.h"
+#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/ObjectIdImpl.h"
+#include "qmf/engine/QueryImpl.h"
+#include "qmf/engine/ValueImpl.h"
+#include "qmf/engine/Protocol.h"
+#include "qmf/engine/SequenceManager.h"
+#include "qmf/engine/BrokerProxyImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/SystemInfo.h>
+#include <string.h>
+#include <string>
+#include <deque>
+#include <map>
+#include <vector>
+#include <iostream>
+#include <fstream>
+#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace qmf {
+namespace engine {
+
+ struct ConsoleEventImpl {
+ typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
+ ConsoleEvent::EventKind kind;
+ boost::shared_ptr<AgentProxy> agent;
+ std::string name;
+ const SchemaClassKey* classKey;
+ boost::shared_ptr<Object> object;
+ void* context;
+ Event* event;
+ uint64_t timestamp;
+ bool hasProps;
+ bool hasStats;
+
+ ConsoleEventImpl(ConsoleEvent::EventKind k) :
+ kind(k), classKey(0), context(0), event(0), timestamp(0) {}
+ ~ConsoleEventImpl() {}
+ ConsoleEvent copy();
+ };
+
+ class ConsoleImpl : public boost::noncopyable {
+ public:
+ ConsoleImpl(const ConsoleSettings& settings = ConsoleSettings());
+ ~ConsoleImpl();
+
+ bool getEvent(ConsoleEvent& event) const;
+ void popEvent();
+
+ void addConnection(BrokerProxy& broker, void* context);
+ void delConnection(BrokerProxy& broker);
+
+ uint32_t packageCount() const;
+ const std::string& getPackageName(uint32_t idx) const;
+
+ uint32_t classCount(const char* packageName) const;
+ const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
+
+ ClassKind getClassKind(const SchemaClassKey* key) const;
+ const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const;
+ const SchemaEventClass* getEventClass(const SchemaClassKey* key) const;
+
+ void bindPackage(const char* packageName);
+ void bindClass(const SchemaClassKey* key);
+ void bindClass(const char* packageName, const char* className);
+
+ /*
+ void startSync(const Query& query, void* context, SyncQuery& sync);
+ void touchSync(SyncQuery& sync);
+ void endSync(SyncQuery& sync);
+ */
+
+ private:
+ friend class BrokerProxyImpl;
+ friend struct StaticContext;
+ const ConsoleSettings& settings;
+ mutable qpid::sys::Mutex lock;
+ std::deque<ConsoleEventImpl::Ptr> eventQueue;
+ std::vector<BrokerProxyImpl*> brokerList;
+ std::vector<std::pair<std::string, std::string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
+
+ // Declare a compare class for the class maps that compares the dereferenced
+ // class key pointers. The default behavior would be to compare the pointer
+ // addresses themselves.
+ struct KeyCompare {
+ bool operator()(const SchemaClassKey* left, const SchemaClassKey* right) const {
+ return *left < *right;
+ }
+ };
+
+ typedef std::map<const SchemaClassKey*, SchemaObjectClass*, KeyCompare> ObjectClassList;
+ typedef std::map<const SchemaClassKey*, SchemaEventClass*, KeyCompare> EventClassList;
+ typedef std::map<std::string, std::pair<ObjectClassList, EventClassList> > PackageList;
+
+ PackageList packages;
+
+ void learnPackage(const std::string& packageName);
+ void learnClass(SchemaObjectClass* cls);
+ void learnClass(SchemaEventClass* cls);
+ bool haveClass(const SchemaClassKey* key) const;
+ SchemaObjectClass* getSchema(const SchemaClassKey* key) const;
+
+ void eventAgentAdded(boost::shared_ptr<AgentProxy> agent);
+ void eventAgentDeleted(boost::shared_ptr<AgentProxy> agent);
+ void eventNewPackage(const std::string& packageName);
+ void eventNewClass(const SchemaClassKey* key);
+ void eventObjectUpdate(ObjectPtr object, bool prop, bool stat);
+ void eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp);
+ };
+}
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/engine/MessageImpl.cpp b/qpid/cpp/src/qmf/engine/MessageImpl.cpp
new file mode 100644
index 0000000000..0047d3eb9d
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/MessageImpl.cpp
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/MessageImpl.h"
+#include <string.h>
+
+using namespace std;
+using namespace qmf::engine;
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+Message MessageImpl::copy()
+{
+ Message item;
+
+ ::memset(&item, 0, sizeof(Message));
+ item.body = const_cast<char*>(body.c_str());
+ item.length = body.length();
+ STRING_REF(destination);
+ STRING_REF(routingKey);
+ STRING_REF(replyExchange);
+ STRING_REF(replyKey);
+ STRING_REF(userId);
+
+ return item;
+}
+
diff --git a/qpid/cpp/src/qmf/engine/MessageImpl.h b/qpid/cpp/src/qmf/engine/MessageImpl.h
new file mode 100644
index 0000000000..b91291d2e4
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/MessageImpl.h
@@ -0,0 +1,44 @@
+#ifndef _QmfEngineMessageImpl_
+#define _QmfEngineMessageImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/Message.h"
+#include <string>
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+namespace engine {
+
+ struct MessageImpl {
+ typedef boost::shared_ptr<MessageImpl> Ptr;
+ std::string body;
+ std::string destination;
+ std::string routingKey;
+ std::string replyExchange;
+ std::string replyKey;
+ std::string userId;
+
+ Message copy();
+ };
+}
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
new file mode 100644
index 0000000000..b08ae2756c
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/ObjectIdImpl.h"
+#include <stdlib.h>
+
+using namespace std;
+using namespace qmf::engine;
+using qpid::framing::Buffer;
+
+
+void AgentAttachment::setBanks(uint32_t broker, uint32_t agent)
+{
+ first =
+ ((uint64_t) (broker & 0x000fffff)) << 28 |
+ ((uint64_t) (agent & 0x0fffffff));
+}
+
+ObjectIdImpl::ObjectIdImpl(Buffer& buffer) : agent(0)
+{
+ decode(buffer);
+}
+
+ObjectIdImpl::ObjectIdImpl(AgentAttachment* a, uint8_t flags, uint16_t seq, uint64_t object) : agent(a)
+{
+ first =
+ ((uint64_t) (flags & 0x0f)) << 60 |
+ ((uint64_t) (seq & 0x0fff)) << 48;
+ second = object;
+}
+
+ObjectId* ObjectIdImpl::factory(Buffer& buffer)
+{
+ ObjectIdImpl* impl(new ObjectIdImpl(buffer));
+ return new ObjectId(impl);
+}
+
+ObjectId* ObjectIdImpl::factory(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object)
+{
+ ObjectIdImpl* impl(new ObjectIdImpl(agent, flags, seq, object));
+ return new ObjectId(impl);
+}
+
+void ObjectIdImpl::decode(Buffer& buffer)
+{
+ first = buffer.getLongLong();
+ second = buffer.getLongLong();
+}
+
+void ObjectIdImpl::encode(Buffer& buffer) const
+{
+ if (agent == 0)
+ buffer.putLongLong(first);
+ else
+ buffer.putLongLong(first | agent->first);
+ buffer.putLongLong(second);
+}
+
+void ObjectIdImpl::fromString(const std::string& repr)
+{
+#define FIELDS 5
+#if defined (_WIN32) && !defined (atoll)
+# define atoll(X) _atoi64(X)
+#endif
+
+ std::string copy(repr.c_str());
+ char* cText;
+ char* field[FIELDS];
+ bool atFieldStart = true;
+ int idx = 0;
+
+ cText = const_cast<char*>(copy.c_str());
+ for (char* cursor = cText; *cursor; cursor++) {
+ if (atFieldStart) {
+ if (idx >= FIELDS)
+ return; // TODO error
+ field[idx++] = cursor;
+ atFieldStart = false;
+ } else {
+ if (*cursor == '-') {
+ *cursor = '\0';
+ atFieldStart = true;
+ }
+ }
+ }
+
+ if (idx != FIELDS)
+ return; // TODO error
+
+ first = (atoll(field[0]) << 60) +
+ (atoll(field[1]) << 48) +
+ (atoll(field[2]) << 28) +
+ atoll(field[3]);
+ second = atoll(field[4]);
+ agent = 0;
+}
+
+const string& ObjectIdImpl::asString() const
+{
+ stringstream val;
+
+ val << (int) getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" <<
+ getAgentBank() << "-" << getObjectNum();
+ repr = val.str();
+ return repr;
+}
+
+bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const
+{
+ uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+
+ return first == otherFirst && second == other.second;
+}
+
+bool ObjectIdImpl::operator<(const ObjectIdImpl& other) const
+{
+ uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+
+ return (first < otherFirst) || ((first == otherFirst) && (second < other.second));
+}
+
+bool ObjectIdImpl::operator>(const ObjectIdImpl& other) const
+{
+ uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+
+ return (first > otherFirst) || ((first == otherFirst) && (second > other.second));
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+ObjectId::ObjectId() : impl(new ObjectIdImpl()) {}
+ObjectId::ObjectId(const ObjectId& from) : impl(new ObjectIdImpl(*(from.impl))) {}
+ObjectId::ObjectId(ObjectIdImpl* i) : impl(i) {}
+ObjectId::~ObjectId() { delete impl; }
+uint64_t ObjectId::getObjectNum() const { return impl->getObjectNum(); }
+uint32_t ObjectId::getObjectNumHi() const { return impl->getObjectNumHi(); }
+uint32_t ObjectId::getObjectNumLo() const { return impl->getObjectNumLo(); }
+bool ObjectId::isDurable() const { return impl->isDurable(); }
+const char* ObjectId::str() const { return impl->asString().c_str(); }
+uint8_t ObjectId::getFlags() const { return impl->getFlags(); }
+uint16_t ObjectId::getSequence() const { return impl->getSequence(); }
+uint32_t ObjectId::getBrokerBank() const { return impl->getBrokerBank(); }
+uint32_t ObjectId::getAgentBank() const { return impl->getAgentBank(); }
+bool ObjectId::operator==(const ObjectId& other) const { return *impl == *other.impl; }
+bool ObjectId::operator<(const ObjectId& other) const { return *impl < *other.impl; }
+bool ObjectId::operator>(const ObjectId& other) const { return *impl > *other.impl; }
+bool ObjectId::operator<=(const ObjectId& other) const { return !(*impl > *other.impl); }
+bool ObjectId::operator>=(const ObjectId& other) const { return !(*impl < *other.impl); }
+
diff --git a/qpid/cpp/src/qmf/engine/ObjectIdImpl.h b/qpid/cpp/src/qmf/engine/ObjectIdImpl.h
new file mode 100644
index 0000000000..d9871ac217
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ObjectIdImpl.h
@@ -0,0 +1,72 @@
+#ifndef _QmfEngineObjectIdImpl_
+#define _QmfEngineObjectIdImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/engine/ObjectId.h>
+#include <qpid/framing/Buffer.h>
+
+namespace qmf {
+namespace engine {
+
+ struct AgentAttachment {
+ uint64_t first;
+
+ AgentAttachment() : first(0) {}
+ void setBanks(uint32_t broker, uint32_t bank);
+ uint64_t getFirst() const { return first; }
+ };
+
+ struct ObjectIdImpl {
+ AgentAttachment* agent;
+ uint64_t first;
+ uint64_t second;
+ mutable std::string repr;
+
+ ObjectIdImpl() : agent(0), first(0), second(0) {}
+ ObjectIdImpl(qpid::framing::Buffer& buffer);
+ ObjectIdImpl(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object);
+
+ static ObjectId* factory(qpid::framing::Buffer& buffer);
+ static ObjectId* factory(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object);
+
+ void decode(qpid::framing::Buffer& buffer);
+ void encode(qpid::framing::Buffer& buffer) const;
+ void fromString(const std::string& repr);
+ const std::string& asString() const;
+ uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; }
+ uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; }
+ uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; }
+ uint32_t getAgentBank() const { return first & 0x000000000FFFFFFFLL; }
+ uint64_t getObjectNum() const { return second; }
+ uint32_t getObjectNumHi() const { return (uint32_t) (second >> 32); }
+ uint32_t getObjectNumLo() const { return (uint32_t) (second & 0x00000000FFFFFFFFLL); }
+ bool isDurable() const { return getSequence() == 0; }
+ void setValue(uint64_t f, uint64_t s) { first = f; second = s; }
+
+ bool operator==(const ObjectIdImpl& other) const;
+ bool operator<(const ObjectIdImpl& other) const;
+ bool operator>(const ObjectIdImpl& other) const;
+ };
+}
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/engine/ObjectImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
new file mode 100644
index 0000000000..cae0e0da68
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/ValueImpl.h"
+#include "qmf/engine/BrokerProxyImpl.h"
+#include <qpid/sys/Time.h>
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid::sys;
+using qpid::framing::Buffer;
+
+ObjectImpl::ObjectImpl(const SchemaObjectClass* type) : objectClass(type), broker(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
+{
+ int propCount = objectClass->getPropertyCount();
+ int statCount = objectClass->getStatisticCount();
+ int idx;
+
+ for (idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* prop = objectClass->getProperty(idx);
+ properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
+ }
+
+ for (idx = 0; idx < statCount; idx++) {
+ const SchemaStatistic* stat = objectClass->getStatistic(idx);
+ statistics[stat->getName()] = ValuePtr(new Value(stat->getType()));
+ }
+}
+
+ObjectImpl::ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) :
+ objectClass(type), broker(b), createTime(0), destroyTime(0), lastUpdatedTime(0)
+{
+ int idx;
+
+ if (managed) {
+ lastUpdatedTime = buffer.getLongLong();
+ createTime = buffer.getLongLong();
+ destroyTime = buffer.getLongLong();
+ objectId.reset(ObjectIdImpl::factory(buffer));
+ }
+
+ if (prop) {
+ int propCount = objectClass->getPropertyCount();
+ set<string> excludes;
+ parsePresenceMasks(buffer, excludes);
+ for (idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* prop = objectClass->getProperty(idx);
+ if (excludes.count(prop->getName()) != 0) {
+ properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
+ } else {
+ Value* pval = ValueImpl::factory(prop->getType(), buffer);
+ properties[prop->getName()] = ValuePtr(pval);
+ }
+ }
+ }
+
+ if (stat) {
+ int statCount = objectClass->getStatisticCount();
+ for (idx = 0; idx < statCount; idx++) {
+ const SchemaStatistic* stat = objectClass->getStatistic(idx);
+ Value* sval = ValueImpl::factory(stat->getType(), buffer);
+ statistics[stat->getName()] = ValuePtr(sval);
+ }
+ }
+}
+
+Object* ObjectImpl::factory(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed)
+{
+ ObjectImpl* impl(new ObjectImpl(type, b, buffer, prop, stat, managed));
+ return new Object(impl);
+}
+
+ObjectImpl::~ObjectImpl()
+{
+}
+
+void ObjectImpl::destroy()
+{
+ destroyTime = uint64_t(Duration(now()));
+ // TODO - flag deletion
+}
+
+Value* ObjectImpl::getValue(const string& key) const
+{
+ map<string, ValuePtr>::const_iterator iter;
+
+ iter = properties.find(key);
+ if (iter != properties.end())
+ return iter->second.get();
+
+ iter = statistics.find(key);
+ if (iter != statistics.end())
+ return iter->second.get();
+
+ return 0;
+}
+
+void ObjectImpl::invokeMethod(const string& methodName, const Value* inArgs, void* context) const
+{
+ if (broker != 0 && objectId.get() != 0)
+ broker->sendMethodRequest(objectId.get(), objectClass, methodName, inArgs, context);
+}
+
+void ObjectImpl::merge(const Object& from)
+{
+ for (map<string, ValuePtr>::const_iterator piter = from.impl->properties.begin();
+ piter != from.impl->properties.end(); piter++)
+ properties[piter->first] = piter->second;
+ for (map<string, ValuePtr>::const_iterator siter = from.impl->statistics.begin();
+ siter != from.impl->statistics.end(); siter++)
+ statistics[siter->first] = siter->second;
+}
+
+void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList)
+{
+ int propCount = objectClass->getPropertyCount();
+ excludeList.clear();
+ uint8_t bit = 0;
+ uint8_t mask = 0;
+
+ for (int idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* prop = objectClass->getProperty(idx);
+ if (prop->isOptional()) {
+ if (bit == 0) {
+ mask = buffer.getOctet();
+ bit = 1;
+ }
+ if ((mask & bit) == 0)
+ excludeList.insert(string(prop->getName()));
+ if (bit == 0x80)
+ bit = 0;
+ else
+ bit = bit << 1;
+ }
+ }
+}
+
+void ObjectImpl::encodeSchemaKey(qpid::framing::Buffer& buffer) const
+{
+ buffer.putShortString(objectClass->getClassKey()->getPackageName());
+ buffer.putShortString(objectClass->getClassKey()->getClassName());
+ buffer.putBin128(const_cast<uint8_t*>(objectClass->getClassKey()->getHash()));
+}
+
+void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const
+{
+ buffer.putLongLong(lastUpdatedTime);
+ buffer.putLongLong(createTime);
+ buffer.putLongLong(destroyTime);
+ objectId->impl->encode(buffer);
+}
+
+void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const
+{
+ int propCount = objectClass->getPropertyCount();
+ uint8_t bit = 0;
+ uint8_t mask = 0;
+ ValuePtr value;
+
+ for (int idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* prop = objectClass->getProperty(idx);
+ if (prop->isOptional()) {
+ value = properties[prop->getName()];
+ if (bit == 0)
+ bit = 1;
+ if (!value->isNull())
+ mask |= bit;
+ if (bit == 0x80) {
+ buffer.putOctet(mask);
+ bit = 0;
+ mask = 0;
+ } else
+ bit = bit << 1;
+ }
+ }
+ if (bit != 0) {
+ buffer.putOctet(mask);
+ }
+
+ for (int idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* prop = objectClass->getProperty(idx);
+ value = properties[prop->getName()];
+ if (!prop->isOptional() || !value->isNull()) {
+ value->impl->encode(buffer);
+ }
+ }
+}
+
+void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const
+{
+ int statCount = objectClass->getStatisticCount();
+ for (int idx = 0; idx < statCount; idx++) {
+ const SchemaStatistic* stat = objectClass->getStatistic(idx);
+ ValuePtr value = statistics[stat->getName()];
+ value->impl->encode(buffer);
+ }
+}
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(type)) {}
+Object::Object(ObjectImpl* i) : impl(i) {}
+Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {}
+Object::~Object() { delete impl; }
+void Object::destroy() { impl->destroy(); }
+const ObjectId* Object::getObjectId() const { return impl->getObjectId(); }
+void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); }
+const SchemaObjectClass* Object::getClass() const { return impl->getClass(); }
+Value* Object::getValue(const char* key) const { return impl->getValue(key); }
+void Object::invokeMethod(const char* m, const Value* a, void* c) const { impl->invokeMethod(m, a, c); }
+bool Object::isDeleted() const { return impl->isDeleted(); }
+void Object::merge(const Object& from) { impl->merge(from); }
+
diff --git a/qpid/cpp/src/qmf/engine/ObjectImpl.h b/qpid/cpp/src/qmf/engine/ObjectImpl.h
new file mode 100644
index 0000000000..ddd20bfea2
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ObjectImpl.h
@@ -0,0 +1,76 @@
+#ifndef _QmfEngineObjectImpl_
+#define _QmfEngineObjectImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/engine/Object.h>
+#include <qmf/engine/ObjectIdImpl.h>
+#include <map>
+#include <set>
+#include <string>
+#include <qpid/framing/Buffer.h>
+#include <boost/shared_ptr.hpp>
+#include <qpid/sys/Mutex.h>
+
+namespace qmf {
+namespace engine {
+
+ class BrokerProxyImpl;
+
+ typedef boost::shared_ptr<Object> ObjectPtr;
+
+ struct ObjectImpl {
+ typedef boost::shared_ptr<Value> ValuePtr;
+ const SchemaObjectClass* objectClass;
+ BrokerProxyImpl* broker;
+ boost::shared_ptr<ObjectId> objectId;
+ uint64_t createTime;
+ uint64_t destroyTime;
+ uint64_t lastUpdatedTime;
+ mutable std::map<std::string, ValuePtr> properties;
+ mutable std::map<std::string, ValuePtr> statistics;
+
+ ObjectImpl(const SchemaObjectClass* type);
+ ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
+ bool prop, bool stat, bool managed);
+ static Object* factory(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
+ bool prop, bool stat, bool managed);
+ ~ObjectImpl();
+
+ void destroy();
+ const ObjectId* getObjectId() const { return objectId.get(); }
+ void setObjectId(ObjectId* oid) { objectId.reset(oid); }
+ const SchemaObjectClass* getClass() const { return objectClass; }
+ Value* getValue(const std::string& key) const;
+ void invokeMethod(const std::string& methodName, const Value* inArgs, void* context) const;
+ bool isDeleted() const { return destroyTime != 0; }
+ void merge(const Object& from);
+
+ void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList);
+ void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
+ void encodeManagedObjectData(qpid::framing::Buffer& buffer) const;
+ void encodeProperties(qpid::framing::Buffer& buffer) const;
+ void encodeStatistics(qpid::framing::Buffer& buffer) const;
+ };
+}
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/engine/Protocol.cpp b/qpid/cpp/src/qmf/engine/Protocol.cpp
new file mode 100644
index 0000000000..6061b70a8d
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/Protocol.cpp
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/Protocol.h"
+#include "qpid/framing/Buffer.h"
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid::framing;
+
+
+bool Protocol::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
+{
+ if (buf.available() < 8)
+ return false;
+
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
+
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '3';
+}
+
+void Protocol::encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq)
+{
+ buf.putOctet('A');
+ buf.putOctet('M');
+ buf.putOctet('3');
+ buf.putOctet(opcode);
+ buf.putLong (seq);
+}
+
+
diff --git a/qpid/cpp/src/qmf/engine/Protocol.h b/qpid/cpp/src/qmf/engine/Protocol.h
new file mode 100644
index 0000000000..1cdfa60c84
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/Protocol.h
@@ -0,0 +1,69 @@
+#ifndef _QmfEngineProtocol_
+#define _QmfEngineProtocol_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/sys/IntegerTypes.h>
+
+namespace qpid {
+ namespace framing {
+ class Buffer;
+ }
+}
+
+namespace qmf {
+namespace engine {
+
+ class Protocol {
+ public:
+ static bool checkHeader(qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ static void encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+
+ const static uint8_t OP_ATTACH_REQUEST = 'A';
+ const static uint8_t OP_ATTACH_RESPONSE = 'a';
+
+ const static uint8_t OP_BROKER_REQUEST = 'B';
+ const static uint8_t OP_BROKER_RESPONSE = 'b';
+
+ const static uint8_t OP_CONSOLE_ADDED_INDICATION = 'x';
+ const static uint8_t OP_COMMAND_COMPLETE = 'z';
+ const static uint8_t OP_HEARTBEAT_INDICATION = 'h';
+
+ const static uint8_t OP_PACKAGE_REQUEST = 'P';
+ const static uint8_t OP_PACKAGE_INDICATION = 'p';
+ const static uint8_t OP_CLASS_QUERY = 'Q';
+ const static uint8_t OP_CLASS_INDICATION = 'q';
+ const static uint8_t OP_SCHEMA_REQUEST = 'S';
+ const static uint8_t OP_SCHEMA_RESPONSE = 's';
+
+ const static uint8_t OP_METHOD_REQUEST = 'M';
+ const static uint8_t OP_METHOD_RESPONSE = 'm';
+ const static uint8_t OP_GET_QUERY = 'G';
+ const static uint8_t OP_OBJECT_INDICATION = 'g';
+ const static uint8_t OP_PROPERTY_INDICATION = 'c';
+ const static uint8_t OP_STATISTIC_INDICATION = 'i';
+ const static uint8_t OP_EVENT_INDICATION = 'e';
+ };
+
+}
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/engine/QueryImpl.cpp b/qpid/cpp/src/qmf/engine/QueryImpl.cpp
new file mode 100644
index 0000000000..6f2beeee87
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/QueryImpl.cpp
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/QueryImpl.h"
+#include "qmf/engine/ObjectIdImpl.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/FieldTable.h"
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid::framing;
+
+bool QueryElementImpl::evaluate(const Object* /*object*/) const
+{
+ // TODO: Implement this
+ return false;
+}
+
+bool QueryExpressionImpl::evaluate(const Object* /*object*/) const
+{
+ // TODO: Implement this
+ return false;
+}
+
+QueryImpl::QueryImpl(Buffer& buffer)
+{
+ FieldTable ft;
+ ft.decode(buffer);
+ // TODO
+}
+
+Query* QueryImpl::factory(Buffer& buffer)
+{
+ QueryImpl* impl(new QueryImpl(buffer));
+ return new Query(impl);
+}
+
+void QueryImpl::encode(Buffer& buffer) const
+{
+ FieldTable ft;
+
+ if (oid.get() != 0) {
+ ft.setString("_objectid", oid->impl->asString());
+ } else {
+ if (!packageName.empty())
+ ft.setString("_package", packageName);
+ ft.setString("_class", className);
+ }
+
+ ft.encode(buffer);
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+QueryElement::QueryElement(const char* attrName, const Value* value, ValueOper oper) : impl(new QueryElementImpl(attrName, value, oper)) {}
+QueryElement::QueryElement(QueryElementImpl* i) : impl(i) {}
+QueryElement::~QueryElement() { delete impl; }
+bool QueryElement::evaluate(const Object* object) const { return impl->evaluate(object); }
+
+QueryExpression::QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2) : impl(new QueryExpressionImpl(oper, operand1, operand2)) {}
+QueryExpression::QueryExpression(QueryExpressionImpl* i) : impl(i) {}
+QueryExpression::~QueryExpression() { delete impl; }
+bool QueryExpression::evaluate(const Object* object) const { return impl->evaluate(object); }
+
+Query::Query(const char* className, const char* packageName) : impl(new QueryImpl(className, packageName)) {}
+Query::Query(const SchemaClassKey* key) : impl(new QueryImpl(key)) {}
+Query::Query(const ObjectId* oid) : impl(new QueryImpl(oid)) {}
+Query::Query(QueryImpl* i) : impl(i) {}
+Query::Query(const Query& from) : impl(new QueryImpl(*(from.impl))) {}
+Query::~Query() { delete impl; }
+void Query::setSelect(const QueryOperand* criterion) { impl->setSelect(criterion); }
+void Query::setLimit(uint32_t maxResults) { impl->setLimit(maxResults); }
+void Query::setOrderBy(const char* attrName, bool decreasing) { impl->setOrderBy(attrName, decreasing); }
+const char* Query::getPackage() const { return impl->getPackage().c_str(); }
+const char* Query::getClass() const { return impl->getClass().c_str(); }
+const ObjectId* Query::getObjectId() const { return impl->getObjectId(); }
+bool Query::haveSelect() const { return impl->haveSelect(); }
+bool Query::haveLimit() const { return impl->haveLimit(); }
+bool Query::haveOrderBy() const { return impl->haveOrderBy(); }
+const QueryOperand* Query::getSelect() const { return impl->getSelect(); }
+uint32_t Query::getLimit() const { return impl->getLimit(); }
+const char* Query::getOrderBy() const { return impl->getOrderBy().c_str(); }
+bool Query::getDecreasing() const { return impl->getDecreasing(); }
+
diff --git a/qpid/cpp/src/qmf/engine/QueryImpl.h b/qpid/cpp/src/qmf/engine/QueryImpl.h
new file mode 100644
index 0000000000..2c64c6739c
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/QueryImpl.h
@@ -0,0 +1,100 @@
+#ifndef _QmfEngineQueryImpl_
+#define _QmfEngineQueryImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/Query.h"
+#include "qmf/engine/Schema.h"
+#include <string>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+ namespace framing {
+ class Buffer;
+ }
+}
+
+namespace qmf {
+namespace engine {
+
+ struct QueryElementImpl {
+ QueryElementImpl(const std::string& a, const Value* v, ValueOper o) : attrName(a), value(v), oper(o) {}
+ ~QueryElementImpl() {}
+ bool evaluate(const Object* object) const;
+
+ std::string attrName;
+ const Value* value;
+ ValueOper oper;
+ };
+
+ struct QueryExpressionImpl {
+ QueryExpressionImpl(ExprOper o, const QueryOperand* operand1, const QueryOperand* operand2) : oper(o), left(operand1), right(operand2) {}
+ ~QueryExpressionImpl() {}
+ bool evaluate(const Object* object) const;
+
+ ExprOper oper;
+ const QueryOperand* left;
+ const QueryOperand* right;
+ };
+
+ struct QueryImpl {
+ // Constructors mapped to public
+ QueryImpl(const std::string& c, const std::string& p) : packageName(p), className(c), select(0), resultLimit(0) {}
+ QueryImpl(const SchemaClassKey* key) : packageName(key->getPackageName()), className(key->getClassName()), select(0), resultLimit(0) {}
+ QueryImpl(const ObjectId* oid) : oid(new ObjectId(*oid)), select(0), resultLimit(0) {}
+
+ // Factory constructors
+ QueryImpl(qpid::framing::Buffer& buffer);
+
+ ~QueryImpl() {};
+ static Query* factory(qpid::framing::Buffer& buffer);
+
+ void setSelect(const QueryOperand* criterion) { select = criterion; }
+ void setLimit(uint32_t maxResults) { resultLimit = maxResults; }
+ void setOrderBy(const std::string& attrName, bool decreasing) {
+ orderBy = attrName; orderDecreasing = decreasing;
+ }
+
+ const std::string& getPackage() const { return packageName; }
+ const std::string& getClass() const { return className; }
+ const ObjectId* getObjectId() const { return oid.get(); }
+
+ bool haveSelect() const { return select != 0; }
+ bool haveLimit() const { return resultLimit > 0; }
+ bool haveOrderBy() const { return !orderBy.empty(); }
+ const QueryOperand* getSelect() const { return select; }
+ uint32_t getLimit() const { return resultLimit; }
+ const std::string& getOrderBy() const { return orderBy; }
+ bool getDecreasing() const { return orderDecreasing; }
+
+ void encode(qpid::framing::Buffer& buffer) const;
+
+ std::string packageName;
+ std::string className;
+ boost::shared_ptr<ObjectId> oid;
+ const QueryOperand* select;
+ uint32_t resultLimit;
+ std::string orderBy;
+ bool orderDecreasing;
+ };
+}
+}
+
+#endif
diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
new file mode 100644
index 0000000000..9502130288
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/ResilientConnection.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qmf/engine/ConnectionSettingsImpl.h"
+#include <qpid/client/Connection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/client/SubscriptionManager.h>
+#include <qpid/client/Message.h>
+#include <qpid/sys/Thread.h>
+#include <qpid/sys/Runnable.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Condition.h>
+#include <qpid/sys/Time.h>
+#include <qpid/log/Statement.h>
+#include <qpid/RefCounted.h>
+#include <boost/bind.hpp>
+#include <string>
+#include <deque>
+#include <vector>
+#include <set>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid;
+using qpid::sys::Mutex;
+
+namespace qmf {
+namespace engine {
+ struct ResilientConnectionEventImpl {
+ ResilientConnectionEvent::EventKind kind;
+ void* sessionContext;
+ string errorText;
+ MessageImpl message;
+
+ ResilientConnectionEventImpl(ResilientConnectionEvent::EventKind k,
+ const MessageImpl& m = MessageImpl()) :
+ kind(k), sessionContext(0), message(m) {}
+ ResilientConnectionEvent copy();
+ };
+
+ struct RCSession : public client::MessageListener, public qpid::sys::Runnable, public qpid::RefCounted {
+ typedef boost::intrusive_ptr<RCSession> Ptr;
+ ResilientConnectionImpl& connImpl;
+ string name;
+ client::Connection& connection;
+ client::Session session;
+ client::SubscriptionManager* subscriptions;
+ string userId;
+ void* userContext;
+ vector<string> dests;
+ qpid::sys::Thread thread;
+
+ RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc);
+ ~RCSession();
+ void received(client::Message& msg);
+ void run();
+ void stop();
+ };
+
+ class ResilientConnectionImpl : public qpid::sys::Runnable, public boost::noncopyable {
+ public:
+ ResilientConnectionImpl(const ConnectionSettings& settings);
+ ~ResilientConnectionImpl();
+
+ bool isConnected() const;
+ bool getEvent(ResilientConnectionEvent& event);
+ void popEvent();
+ bool createSession(const char* name, void* sessionContext, SessionHandle& handle);
+ void destroySession(SessionHandle handle);
+ void sendMessage(SessionHandle handle, qmf::engine::Message& message);
+ void declareQueue(SessionHandle handle, char* queue);
+ void deleteQueue(SessionHandle handle, char* queue);
+ void bind(SessionHandle handle, char* exchange, char* queue, char* key);
+ void unbind(SessionHandle handle, char* exchange, char* queue, char* key);
+ void setNotifyFd(int fd);
+
+ void run();
+ void failure();
+ void sessionClosed(RCSession* sess);
+
+ void EnqueueEvent(ResilientConnectionEvent::EventKind kind,
+ void* sessionContext = 0,
+ const MessageImpl& message = MessageImpl(),
+ const string& errorText = "");
+
+ private:
+ int notifyFd;
+ bool connected;
+ bool shutdown;
+ string lastError;
+ const ConnectionSettings settings;
+ client::Connection connection;
+ mutable qpid::sys::Mutex lock;
+ int delayMin;
+ int delayMax;
+ int delayFactor;
+ qpid::sys::Condition cond;
+ qpid::sys::Thread connThread;
+ deque<ResilientConnectionEventImpl> eventQueue;
+ set<RCSession::Ptr> sessions;
+ };
+}
+}
+
+ResilientConnectionEvent ResilientConnectionEventImpl::copy()
+{
+ ResilientConnectionEvent item;
+
+ ::memset(&item, 0, sizeof(ResilientConnectionEvent));
+ item.kind = kind;
+ item.sessionContext = sessionContext;
+ item.message = message.copy();
+ item.errorText = const_cast<char*>(errorText.c_str());
+
+ return item;
+}
+
+RCSession::RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) :
+ connImpl(ci), name(n), connection(c), session(connection.newSession(name)),
+ subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this)
+{
+ const qpid::client::ConnectionSettings& operSettings = connection.getNegotiatedSettings();
+ userId = operSettings.username;
+}
+
+RCSession::~RCSession()
+{
+ subscriptions->stop();
+ thread.join();
+ session.close();
+ delete subscriptions;
+}
+
+void RCSession::run()
+{
+ try {
+ subscriptions->run();
+ } catch (exception& /*e*/) {
+ connImpl.sessionClosed(this);
+ }
+}
+
+void RCSession::stop()
+{
+ subscriptions->stop();
+}
+
+void RCSession::received(client::Message& msg)
+{
+ MessageImpl qmsg;
+ qmsg.body = msg.getData();
+
+ qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties();
+ if (dp.hasRoutingKey()) {
+ qmsg.routingKey = dp.getRoutingKey();
+ }
+
+ qpid::framing::MessageProperties mp = msg.getMessageProperties();
+ if (mp.hasReplyTo()) {
+ const qpid::framing::ReplyTo& rt = mp.getReplyTo();
+ qmsg.replyExchange = rt.getExchange();
+ qmsg.replyKey = rt.getRoutingKey();
+ }
+
+ if (mp.hasUserId()) {
+ qmsg.userId = mp.getUserId();
+ }
+
+ connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg);
+}
+
+ResilientConnectionImpl::ResilientConnectionImpl(const ConnectionSettings& _settings) :
+ notifyFd(-1), connected(false), shutdown(false), settings(_settings), delayMin(1), connThread(*this)
+{
+ connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this));
+ settings.impl->getRetrySettings(&delayMin, &delayMax, &delayFactor);
+}
+
+ResilientConnectionImpl::~ResilientConnectionImpl()
+{
+ shutdown = true;
+ connected = false;
+ cond.notify();
+ connThread.join();
+ connection.close();
+}
+
+bool ResilientConnectionImpl::isConnected() const
+{
+ Mutex::ScopedLock _lock(lock);
+ return connected;
+}
+
+bool ResilientConnectionImpl::getEvent(ResilientConnectionEvent& event)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front().copy();
+ return true;
+}
+
+void ResilientConnectionImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+bool ResilientConnectionImpl::createSession(const char* name, void* sessionContext,
+ SessionHandle& handle)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!connected)
+ return false;
+
+ RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext));
+
+ handle.impl = (void*) sess.get();
+ sessions.insert(sess);
+
+ return true;
+}
+
+void ResilientConnectionImpl::destroySession(SessionHandle handle)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
+ set<RCSession::Ptr>::iterator iter = sessions.find(sess);
+ if (iter != sessions.end()) {
+ for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++)
+ sess->subscriptions->cancel(dIter->c_str());
+ sess->subscriptions->stop();
+ sess->subscriptions->wait();
+
+ sessions.erase(iter);
+ return;
+ }
+}
+
+void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::engine::Message& message)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
+ set<RCSession::Ptr>::iterator iter = sessions.find(sess);
+ qpid::client::Message msg;
+ string data(message.body, message.length);
+ msg.getDeliveryProperties().setRoutingKey(message.routingKey);
+ msg.getMessageProperties().setReplyTo(qpid::framing::ReplyTo(message.replyExchange, message.replyKey));
+ if (settings.impl->getSendUserId())
+ msg.getMessageProperties().setUserId(sess->userId);
+ msg.setData(data);
+
+ try {
+ sess->session.messageTransfer(client::arg::content=msg, client::arg::destination=message.destination);
+ } catch(exception& e) {
+ QPID_LOG(error, "Session Exception during message-transfer: " << e.what());
+ sessions.erase(iter);
+ EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, (*iter)->userContext);
+ }
+}
+
+void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession* sess = (RCSession*) handle.impl;
+
+ sess->session.queueDeclare(client::arg::queue=queue, client::arg::autoDelete=true, client::arg::exclusive=true);
+ sess->subscriptions->setAcceptMode(client::ACCEPT_MODE_NONE);
+ sess->subscriptions->setAcquireMode(client::ACQUIRE_MODE_PRE_ACQUIRED);
+ sess->subscriptions->subscribe(*sess, queue, queue);
+ sess->subscriptions->setFlowControl(queue, client::FlowControl::unlimited());
+ sess->dests.push_back(string(queue));
+}
+
+void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession* sess = (RCSession*) handle.impl;
+
+ sess->session.queueDelete(client::arg::queue=queue);
+ for (vector<string>::iterator iter = sess->dests.begin();
+ iter != sess->dests.end(); iter++)
+ if (*iter == queue) {
+ sess->subscriptions->cancel(queue);
+ sess->dests.erase(iter);
+ break;
+ }
+}
+
+void ResilientConnectionImpl::bind(SessionHandle handle,
+ char* exchange, char* queue, char* key)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession* sess = (RCSession*) handle.impl;
+
+ sess->session.exchangeBind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
+}
+
+void ResilientConnectionImpl::unbind(SessionHandle handle,
+ char* exchange, char* queue, char* key)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession* sess = (RCSession*) handle.impl;
+
+ sess->session.exchangeUnbind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
+}
+
+void ResilientConnectionImpl::setNotifyFd(int fd)
+{
+ notifyFd = fd;
+}
+
+void ResilientConnectionImpl::run()
+{
+ int delay(delayMin);
+
+ while (true) {
+ try {
+ QPID_LOG(trace, "Trying to open connection...");
+ connection.open(settings.impl->getClientSettings());
+ {
+ Mutex::ScopedLock _lock(lock);
+ connected = true;
+ EnqueueEvent(ResilientConnectionEvent::CONNECTED);
+
+ while (connected)
+ cond.wait(lock);
+ delay = delayMin;
+
+ while (!sessions.empty()) {
+ set<RCSession::Ptr>::iterator iter = sessions.begin();
+ RCSession::Ptr sess = *iter;
+ sessions.erase(iter);
+ EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext);
+ Mutex::ScopedUnlock _u(lock);
+ sess->stop();
+
+ // Nullify the intrusive pointer within the scoped unlock, otherwise,
+ // the reference is held until overwritted above (under lock) which causes
+ // the session destructor to be called with the lock held.
+ sess = 0;
+ }
+
+ EnqueueEvent(ResilientConnectionEvent::DISCONNECTED);
+
+ if (shutdown)
+ return;
+ }
+ connection.close();
+ } catch (exception &e) {
+ QPID_LOG(debug, "connection.open exception: " << e.what());
+ Mutex::ScopedLock _lock(lock);
+ lastError = e.what();
+ if (delay < delayMax)
+ delay *= delayFactor;
+ }
+
+ ::qpid::sys::sleep(delay);
+ }
+}
+
+void ResilientConnectionImpl::failure()
+{
+ Mutex::ScopedLock _lock(lock);
+
+ connected = false;
+ lastError = "Closed by Peer";
+ cond.notify();
+}
+
+void ResilientConnectionImpl::sessionClosed(RCSession*)
+{
+ Mutex::ScopedLock _lock(lock);
+ connected = false;
+ lastError = "Closed due to Session failure";
+ cond.notify();
+}
+
+void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind kind,
+ void* sessionContext,
+ const MessageImpl& message,
+ const string& errorText)
+{
+ Mutex::ScopedLock _lock(lock);
+ ResilientConnectionEventImpl event(kind, message);
+
+ event.sessionContext = sessionContext;
+ event.errorText = errorText;
+
+ eventQueue.push_back(event);
+ if (notifyFd != -1)
+ {
+ int unused_ret; //Suppress warnings about ignoring return value.
+ unused_ret = ::write(notifyFd, ".", 1);
+ }
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+ResilientConnection::ResilientConnection(const ConnectionSettings& settings)
+{
+ impl = new ResilientConnectionImpl(settings);
+}
+
+ResilientConnection::~ResilientConnection()
+{
+ delete impl;
+}
+
+bool ResilientConnection::isConnected() const
+{
+ return impl->isConnected();
+}
+
+bool ResilientConnection::getEvent(ResilientConnectionEvent& event)
+{
+ return impl->getEvent(event);
+}
+
+void ResilientConnection::popEvent()
+{
+ impl->popEvent();
+}
+
+bool ResilientConnection::createSession(const char* name, void* sessionContext, SessionHandle& handle)
+{
+ return impl->createSession(name, sessionContext, handle);
+}
+
+void ResilientConnection::destroySession(SessionHandle handle)
+{
+ impl->destroySession(handle);
+}
+
+void ResilientConnection::sendMessage(SessionHandle handle, qmf::engine::Message& message)
+{
+ impl->sendMessage(handle, message);
+}
+
+void ResilientConnection::declareQueue(SessionHandle handle, char* queue)
+{
+ impl->declareQueue(handle, queue);
+}
+
+void ResilientConnection::deleteQueue(SessionHandle handle, char* queue)
+{
+ impl->deleteQueue(handle, queue);
+}
+
+void ResilientConnection::bind(SessionHandle handle, char* exchange, char* queue, char* key)
+{
+ impl->bind(handle, exchange, queue, key);
+}
+
+void ResilientConnection::unbind(SessionHandle handle, char* exchange, char* queue, char* key)
+{
+ impl->unbind(handle, exchange, queue, key);
+}
+
+void ResilientConnection::setNotifyFd(int fd)
+{
+ impl->setNotifyFd(fd);
+}
+
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
new file mode 100644
index 0000000000..e366a66826
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/SchemaImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/Uuid.h>
+#include <string.h>
+#include <string>
+#include <vector>
+
+using namespace std;
+using namespace qmf::engine;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+using qpid::framing::Uuid;
+
+SchemaHash::SchemaHash()
+{
+ for (int idx = 0; idx < 16; idx++)
+ hash[idx] = 0x5A;
+}
+
+void SchemaHash::encode(Buffer& buffer) const
+{
+ buffer.putBin128(hash);
+}
+
+void SchemaHash::decode(Buffer& buffer)
+{
+ buffer.getBin128(hash);
+}
+
+void SchemaHash::update(uint8_t data)
+{
+ update((char*) &data, 1);
+}
+
+void SchemaHash::update(const char* data, uint32_t len)
+{
+ uint64_t* first = (uint64_t*) hash;
+ uint64_t* second = (uint64_t*) hash + 1;
+
+ for (uint32_t idx = 0; idx < len; idx++) {
+ *first = *first ^ (uint64_t) data[idx];
+ *second = *second << 1;
+ *second |= ((*first & 0x8000000000000000LL) >> 63);
+ *first = *first << 1;
+ *first = *first ^ *second;
+ }
+}
+
+bool SchemaHash::operator==(const SchemaHash& other) const
+{
+ return ::memcmp(&hash, &other.hash, 16) == 0;
+}
+
+bool SchemaHash::operator<(const SchemaHash& other) const
+{
+ return ::memcmp(&hash, &other.hash, 16) < 0;
+}
+
+bool SchemaHash::operator>(const SchemaHash& other) const
+{
+ return ::memcmp(&hash, &other.hash, 16) > 0;
+}
+
+SchemaArgumentImpl::SchemaArgumentImpl(Buffer& buffer)
+{
+ FieldTable map;
+ map.decode(buffer);
+
+ name = map.getAsString("name");
+ typecode = (Typecode) map.getAsInt("type");
+ unit = map.getAsString("unit");
+ description = map.getAsString("desc");
+
+ dir = DIR_IN;
+ string dstr(map.getAsString("dir"));
+ if (dstr == "O")
+ dir = DIR_OUT;
+ else if (dstr == "IO")
+ dir = DIR_IN_OUT;
+}
+
+SchemaArgument* SchemaArgumentImpl::factory(Buffer& buffer)
+{
+ SchemaArgumentImpl* impl(new SchemaArgumentImpl(buffer));
+ return new SchemaArgument(impl);
+}
+
+void SchemaArgumentImpl::encode(Buffer& buffer) const
+{
+ FieldTable map;
+
+ map.setString("name", name);
+ map.setInt("type", (int) typecode);
+ if (dir == DIR_IN)
+ map.setString("dir", "I");
+ else if (dir == DIR_OUT)
+ map.setString("dir", "O");
+ else
+ map.setString("dir", "IO");
+ if (!unit.empty())
+ map.setString("unit", unit);
+ if (!description.empty())
+ map.setString("desc", description);
+
+ map.encode(buffer);
+}
+
+void SchemaArgumentImpl::updateHash(SchemaHash& hash) const
+{
+ hash.update(name);
+ hash.update(typecode);
+ hash.update(dir);
+ hash.update(unit);
+ hash.update(description);
+}
+
+SchemaMethodImpl::SchemaMethodImpl(Buffer& buffer)
+{
+ FieldTable map;
+ int argCount;
+
+ map.decode(buffer);
+ name = map.getAsString("name");
+ argCount = map.getAsInt("argCount");
+ description = map.getAsString("desc");
+
+ for (int idx = 0; idx < argCount; idx++) {
+ SchemaArgument* arg = SchemaArgumentImpl::factory(buffer);
+ addArgument(arg);
+ }
+}
+
+SchemaMethod* SchemaMethodImpl::factory(Buffer& buffer)
+{
+ SchemaMethodImpl* impl(new SchemaMethodImpl(buffer));
+ return new SchemaMethod(impl);
+}
+
+void SchemaMethodImpl::encode(Buffer& buffer) const
+{
+ FieldTable map;
+
+ map.setString("name", name);
+ map.setInt("argCount", arguments.size());
+ if (!description.empty())
+ map.setString("desc", description);
+ map.encode(buffer);
+
+ for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
+ iter != arguments.end(); iter++)
+ (*iter)->impl->encode(buffer);
+}
+
+void SchemaMethodImpl::addArgument(const SchemaArgument* argument)
+{
+ arguments.push_back(argument);
+}
+
+const SchemaArgument* SchemaMethodImpl::getArgument(int idx) const
+{
+ int count = 0;
+ for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
+ iter != arguments.end(); iter++, count++)
+ if (idx == count)
+ return (*iter);
+ return 0;
+}
+
+void SchemaMethodImpl::updateHash(SchemaHash& hash) const
+{
+ hash.update(name);
+ hash.update(description);
+ for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
+ iter != arguments.end(); iter++)
+ (*iter)->impl->updateHash(hash);
+}
+
+SchemaPropertyImpl::SchemaPropertyImpl(Buffer& buffer)
+{
+ FieldTable map;
+ map.decode(buffer);
+
+ name = map.getAsString("name");
+ typecode = (Typecode) map.getAsInt("type");
+ access = (Access) map.getAsInt("access");
+ index = map.getAsInt("index") != 0;
+ optional = map.getAsInt("optional") != 0;
+ unit = map.getAsString("unit");
+ description = map.getAsString("desc");
+}
+
+SchemaProperty* SchemaPropertyImpl::factory(Buffer& buffer)
+{
+ SchemaPropertyImpl* impl(new SchemaPropertyImpl(buffer));
+ return new SchemaProperty(impl);
+}
+
+void SchemaPropertyImpl::encode(Buffer& buffer) const
+{
+ FieldTable map;
+
+ map.setString("name", name);
+ map.setInt("type", (int) typecode);
+ map.setInt("access", (int) access);
+ map.setInt("index", index ? 1 : 0);
+ map.setInt("optional", optional ? 1 : 0);
+ if (!unit.empty())
+ map.setString("unit", unit);
+ if (!description.empty())
+ map.setString("desc", description);
+
+ map.encode(buffer);
+}
+
+void SchemaPropertyImpl::updateHash(SchemaHash& hash) const
+{
+ hash.update(name);
+ hash.update(typecode);
+ hash.update(access);
+ hash.update(index);
+ hash.update(optional);
+ hash.update(unit);
+ hash.update(description);
+}
+
+SchemaStatisticImpl::SchemaStatisticImpl(Buffer& buffer)
+{
+ FieldTable map;
+ map.decode(buffer);
+
+ name = map.getAsString("name");
+ typecode = (Typecode) map.getAsInt("type");
+ unit = map.getAsString("unit");
+ description = map.getAsString("desc");
+}
+
+SchemaStatistic* SchemaStatisticImpl::factory(Buffer& buffer)
+{
+ SchemaStatisticImpl* impl(new SchemaStatisticImpl(buffer));
+ return new SchemaStatistic(impl);
+}
+
+void SchemaStatisticImpl::encode(Buffer& buffer) const
+{
+ FieldTable map;
+
+ map.setString("name", name);
+ map.setInt("type", (int) typecode);
+ if (!unit.empty())
+ map.setString("unit", unit);
+ if (!description.empty())
+ map.setString("desc", description);
+
+ map.encode(buffer);
+}
+
+void SchemaStatisticImpl::updateHash(SchemaHash& hash) const
+{
+ hash.update(name);
+ hash.update(typecode);
+ hash.update(unit);
+ hash.update(description);
+}
+
+SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) : package(p), name(n), hash(h) {}
+
+SchemaClassKeyImpl::SchemaClassKeyImpl(Buffer& buffer) : package(packageContainer), name(nameContainer), hash(hashContainer)
+{
+ buffer.getShortString(packageContainer);
+ buffer.getShortString(nameContainer);
+ hashContainer.decode(buffer);
+}
+
+SchemaClassKey* SchemaClassKeyImpl::factory(const string& package, const string& name, const SchemaHash& hash)
+{
+ SchemaClassKeyImpl* impl(new SchemaClassKeyImpl(package, name, hash));
+ return new SchemaClassKey(impl);
+}
+
+SchemaClassKey* SchemaClassKeyImpl::factory(Buffer& buffer)
+{
+ SchemaClassKeyImpl* impl(new SchemaClassKeyImpl(buffer));
+ return new SchemaClassKey(impl);
+}
+
+void SchemaClassKeyImpl::encode(Buffer& buffer) const
+{
+ buffer.putShortString(package);
+ buffer.putShortString(name);
+ hash.encode(buffer);
+}
+
+bool SchemaClassKeyImpl::operator==(const SchemaClassKeyImpl& other) const
+{
+ return package == other.package &&
+ name == other.name &&
+ hash == other.hash;
+}
+
+bool SchemaClassKeyImpl::operator<(const SchemaClassKeyImpl& other) const
+{
+ if (package < other.package) return true;
+ if (package > other.package) return false;
+ if (name < other.name) return true;
+ if (name > other.name) return false;
+ return hash < other.hash;
+}
+
+const string& SchemaClassKeyImpl::str() const
+{
+ Uuid printableHash(hash.get());
+ stringstream str;
+ str << package << ":" << name << "(" << printableHash << ")";
+ repr = str.str();
+ return repr;
+}
+
+SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash))
+{
+ buffer.getShortString(package);
+ buffer.getShortString(name);
+ hash.decode(buffer);
+
+ /*uint8_t hasParentClass =*/ buffer.getOctet(); // TODO: Parse parent-class indicator
+ uint16_t propCount = buffer.getShort();
+ uint16_t statCount = buffer.getShort();
+ uint16_t methodCount = buffer.getShort();
+
+ for (uint16_t idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* property = SchemaPropertyImpl::factory(buffer);
+ addProperty(property);
+ }
+
+ for (uint16_t idx = 0; idx < statCount; idx++) {
+ const SchemaStatistic* statistic = SchemaStatisticImpl::factory(buffer);
+ addStatistic(statistic);
+ }
+
+ for (uint16_t idx = 0; idx < methodCount; idx++) {
+ SchemaMethod* method = SchemaMethodImpl::factory(buffer);
+ addMethod(method);
+ }
+}
+
+SchemaObjectClass* SchemaObjectClassImpl::factory(Buffer& buffer)
+{
+ SchemaObjectClassImpl* impl(new SchemaObjectClassImpl(buffer));
+ return new SchemaObjectClass(impl);
+}
+
+void SchemaObjectClassImpl::encode(Buffer& buffer) const
+{
+ buffer.putOctet((uint8_t) CLASS_OBJECT);
+ buffer.putShortString(package);
+ buffer.putShortString(name);
+ hash.encode(buffer);
+ buffer.putOctet(0); // No parent class
+ buffer.putShort((uint16_t) properties.size());
+ buffer.putShort((uint16_t) statistics.size());
+ buffer.putShort((uint16_t) methods.size());
+
+ for (vector<const SchemaProperty*>::const_iterator iter = properties.begin();
+ iter != properties.end(); iter++)
+ (*iter)->impl->encode(buffer);
+ for (vector<const SchemaStatistic*>::const_iterator iter = statistics.begin();
+ iter != statistics.end(); iter++)
+ (*iter)->impl->encode(buffer);
+ for (vector<const SchemaMethod*>::const_iterator iter = methods.begin();
+ iter != methods.end(); iter++)
+ (*iter)->impl->encode(buffer);
+}
+
+const SchemaClassKey* SchemaObjectClassImpl::getClassKey() const
+{
+ if (!hasHash) {
+ hasHash = true;
+ hash.update(package);
+ hash.update(name);
+ for (vector<const SchemaProperty*>::const_iterator iter = properties.begin();
+ iter != properties.end(); iter++)
+ (*iter)->impl->updateHash(hash);
+ for (vector<const SchemaStatistic*>::const_iterator iter = statistics.begin();
+ iter != statistics.end(); iter++)
+ (*iter)->impl->updateHash(hash);
+ for (vector<const SchemaMethod*>::const_iterator iter = methods.begin();
+ iter != methods.end(); iter++)
+ (*iter)->impl->updateHash(hash);
+ }
+
+ return classKey.get();
+}
+
+void SchemaObjectClassImpl::addProperty(const SchemaProperty* property)
+{
+ properties.push_back(property);
+}
+
+void SchemaObjectClassImpl::addStatistic(const SchemaStatistic* statistic)
+{
+ statistics.push_back(statistic);
+}
+
+void SchemaObjectClassImpl::addMethod(const SchemaMethod* method)
+{
+ methods.push_back(method);
+}
+
+const SchemaProperty* SchemaObjectClassImpl::getProperty(int idx) const
+{
+ int count = 0;
+ for (vector<const SchemaProperty*>::const_iterator iter = properties.begin();
+ iter != properties.end(); iter++, count++)
+ if (idx == count)
+ return *iter;
+ return 0;
+}
+
+const SchemaStatistic* SchemaObjectClassImpl::getStatistic(int idx) const
+{
+ int count = 0;
+ for (vector<const SchemaStatistic*>::const_iterator iter = statistics.begin();
+ iter != statistics.end(); iter++, count++)
+ if (idx == count)
+ return *iter;
+ return 0;
+}
+
+const SchemaMethod* SchemaObjectClassImpl::getMethod(int idx) const
+{
+ int count = 0;
+ for (vector<const SchemaMethod*>::const_iterator iter = methods.begin();
+ iter != methods.end(); iter++, count++)
+ if (idx == count)
+ return *iter;
+ return 0;
+}
+
+SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash))
+{
+ buffer.getShortString(package);
+ buffer.getShortString(name);
+ hash.decode(buffer);
+ buffer.putOctet(0); // No parent class
+
+ uint16_t argCount = buffer.getShort();
+
+ for (uint16_t idx = 0; idx < argCount; idx++) {
+ SchemaArgument* argument = SchemaArgumentImpl::factory(buffer);
+ addArgument(argument);
+ }
+}
+
+SchemaEventClass* SchemaEventClassImpl::factory(Buffer& buffer)
+{
+ SchemaEventClassImpl* impl(new SchemaEventClassImpl(buffer));
+ return new SchemaEventClass(impl);
+}
+
+void SchemaEventClassImpl::encode(Buffer& buffer) const
+{
+ buffer.putOctet((uint8_t) CLASS_EVENT);
+ buffer.putShortString(package);
+ buffer.putShortString(name);
+ hash.encode(buffer);
+ buffer.putShort((uint16_t) arguments.size());
+
+ for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
+ iter != arguments.end(); iter++)
+ (*iter)->impl->encode(buffer);
+}
+
+const SchemaClassKey* SchemaEventClassImpl::getClassKey() const
+{
+ if (!hasHash) {
+ hasHash = true;
+ hash.update(package);
+ hash.update(name);
+ for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
+ iter != arguments.end(); iter++)
+ (*iter)->impl->updateHash(hash);
+ }
+ return classKey.get();
+}
+
+void SchemaEventClassImpl::addArgument(const SchemaArgument* argument)
+{
+ arguments.push_back(argument);
+}
+
+const SchemaArgument* SchemaEventClassImpl::getArgument(int idx) const
+{
+ int count = 0;
+ for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
+ iter != arguments.end(); iter++, count++)
+ if (idx == count)
+ return (*iter);
+ return 0;
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+SchemaArgument::SchemaArgument(const char* name, Typecode typecode) { impl = new SchemaArgumentImpl(name, typecode); }
+SchemaArgument::SchemaArgument(SchemaArgumentImpl* i) : impl(i) {}
+SchemaArgument::SchemaArgument(const SchemaArgument& from) : impl(new SchemaArgumentImpl(*(from.impl))) {}
+SchemaArgument::~SchemaArgument() { delete impl; }
+void SchemaArgument::setDirection(Direction dir) { impl->setDirection(dir); }
+void SchemaArgument::setUnit(const char* val) { impl->setUnit(val); }
+void SchemaArgument::setDesc(const char* desc) { impl->setDesc(desc); }
+const char* SchemaArgument::getName() const { return impl->getName().c_str(); }
+Typecode SchemaArgument::getType() const { return impl->getType(); }
+Direction SchemaArgument::getDirection() const { return impl->getDirection(); }
+const char* SchemaArgument::getUnit() const { return impl->getUnit().c_str(); }
+const char* SchemaArgument::getDesc() const { return impl->getDesc().c_str(); }
+
+SchemaMethod::SchemaMethod(const char* name) : impl(new SchemaMethodImpl(name)) {}
+SchemaMethod::SchemaMethod(SchemaMethodImpl* i) : impl(i) {}
+SchemaMethod::SchemaMethod(const SchemaMethod& from) : impl(new SchemaMethodImpl(*(from.impl))) {}
+SchemaMethod::~SchemaMethod() { delete impl; }
+void SchemaMethod::addArgument(const SchemaArgument* argument) { impl->addArgument(argument); }
+void SchemaMethod::setDesc(const char* desc) { impl->setDesc(desc); }
+const char* SchemaMethod::getName() const { return impl->getName().c_str(); }
+const char* SchemaMethod::getDesc() const { return impl->getDesc().c_str(); }
+int SchemaMethod::getArgumentCount() const { return impl->getArgumentCount(); }
+const SchemaArgument* SchemaMethod::getArgument(int idx) const { return impl->getArgument(idx); }
+
+SchemaProperty::SchemaProperty(const char* name, Typecode typecode) : impl(new SchemaPropertyImpl(name, typecode)) {}
+SchemaProperty::SchemaProperty(SchemaPropertyImpl* i) : impl(i) {}
+SchemaProperty::SchemaProperty(const SchemaProperty& from) : impl(new SchemaPropertyImpl(*(from.impl))) {}
+SchemaProperty::~SchemaProperty() { delete impl; }
+void SchemaProperty::setAccess(Access access) { impl->setAccess(access); }
+void SchemaProperty::setIndex(bool val) { impl->setIndex(val); }
+void SchemaProperty::setOptional(bool val) { impl->setOptional(val); }
+void SchemaProperty::setUnit(const char* val) { impl->setUnit(val); }
+void SchemaProperty::setDesc(const char* desc) { impl->setDesc(desc); }
+const char* SchemaProperty::getName() const { return impl->getName().c_str(); }
+Typecode SchemaProperty::getType() const { return impl->getType(); }
+Access SchemaProperty::getAccess() const { return impl->getAccess(); }
+bool SchemaProperty::isIndex() const { return impl->isIndex(); }
+bool SchemaProperty::isOptional() const { return impl->isOptional(); }
+const char* SchemaProperty::getUnit() const { return impl->getUnit().c_str(); }
+const char* SchemaProperty::getDesc() const { return impl->getDesc().c_str(); }
+
+SchemaStatistic::SchemaStatistic(const char* name, Typecode typecode) : impl(new SchemaStatisticImpl(name, typecode)) {}
+SchemaStatistic::SchemaStatistic(SchemaStatisticImpl* i) : impl(i) {}
+SchemaStatistic::SchemaStatistic(const SchemaStatistic& from) : impl(new SchemaStatisticImpl(*(from.impl))) {}
+SchemaStatistic::~SchemaStatistic() { delete impl; }
+void SchemaStatistic::setUnit(const char* val) { impl->setUnit(val); }
+void SchemaStatistic::setDesc(const char* desc) { impl->setDesc(desc); }
+const char* SchemaStatistic::getName() const { return impl->getName().c_str(); }
+Typecode SchemaStatistic::getType() const { return impl->getType(); }
+const char* SchemaStatistic::getUnit() const { return impl->getUnit().c_str(); }
+const char* SchemaStatistic::getDesc() const { return impl->getDesc().c_str(); }
+
+SchemaClassKey::SchemaClassKey(SchemaClassKeyImpl* i) : impl(i) {}
+SchemaClassKey::SchemaClassKey(const SchemaClassKey& from) : impl(new SchemaClassKeyImpl(*(from.impl))) {}
+SchemaClassKey::~SchemaClassKey() { delete impl; }
+const char* SchemaClassKey::getPackageName() const { return impl->getPackageName().c_str(); }
+const char* SchemaClassKey::getClassName() const { return impl->getClassName().c_str(); }
+const uint8_t* SchemaClassKey::getHash() const { return impl->getHash(); }
+const char* SchemaClassKey::asString() const { return impl->str().c_str(); }
+bool SchemaClassKey::operator==(const SchemaClassKey& other) const { return *impl == *(other.impl); }
+bool SchemaClassKey::operator<(const SchemaClassKey& other) const { return *impl < *(other.impl); }
+
+SchemaObjectClass::SchemaObjectClass(const char* package, const char* name) : impl(new SchemaObjectClassImpl(package, name)) {}
+SchemaObjectClass::SchemaObjectClass(SchemaObjectClassImpl* i) : impl(i) {}
+SchemaObjectClass::SchemaObjectClass(const SchemaObjectClass& from) : impl(new SchemaObjectClassImpl(*(from.impl))) {}
+SchemaObjectClass::~SchemaObjectClass() { delete impl; }
+void SchemaObjectClass::addProperty(const SchemaProperty* property) { impl->addProperty(property); }
+void SchemaObjectClass::addStatistic(const SchemaStatistic* statistic) { impl->addStatistic(statistic); }
+void SchemaObjectClass::addMethod(const SchemaMethod* method) { impl->addMethod(method); }
+const SchemaClassKey* SchemaObjectClass::getClassKey() const { return impl->getClassKey(); }
+int SchemaObjectClass::getPropertyCount() const { return impl->getPropertyCount(); }
+int SchemaObjectClass::getStatisticCount() const { return impl->getStatisticCount(); }
+int SchemaObjectClass::getMethodCount() const { return impl->getMethodCount(); }
+const SchemaProperty* SchemaObjectClass::getProperty(int idx) const { return impl->getProperty(idx); }
+const SchemaStatistic* SchemaObjectClass::getStatistic(int idx) const { return impl->getStatistic(idx); }
+const SchemaMethod* SchemaObjectClass::getMethod(int idx) const { return impl->getMethod(idx); }
+
+SchemaEventClass::SchemaEventClass(const char* package, const char* name) : impl(new SchemaEventClassImpl(package, name)) {}
+SchemaEventClass::SchemaEventClass(SchemaEventClassImpl* i) : impl(i) {}
+SchemaEventClass::SchemaEventClass(const SchemaEventClass& from) : impl(new SchemaEventClassImpl(*(from.impl))) {}
+SchemaEventClass::~SchemaEventClass() { delete impl; }
+void SchemaEventClass::addArgument(const SchemaArgument* argument) { impl->addArgument(argument); }
+void SchemaEventClass::setDesc(const char* desc) { impl->setDesc(desc); }
+const SchemaClassKey* SchemaEventClass::getClassKey() const { return impl->getClassKey(); }
+int SchemaEventClass::getArgumentCount() const { return impl->getArgumentCount(); }
+const SchemaArgument* SchemaEventClass::getArgument(int idx) const { return impl->getArgument(idx); }
+
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h
new file mode 100644
index 0000000000..af3a1d98e4
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h
@@ -0,0 +1,223 @@
+#ifndef _QmfEngineSchemaImpl_
+#define _QmfEngineSchemaImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/Schema.h"
+#include <string>
+#include <vector>
+#include <qpid/framing/Buffer.h>
+
+namespace qmf {
+namespace engine {
+
+ // TODO: Destructors for schema classes
+ // TODO: Add "frozen" attribute for schema classes so they can't be modified after
+ // they've been registered.
+
+ class SchemaHash {
+ uint8_t hash[16];
+ public:
+ SchemaHash();
+ void encode(qpid::framing::Buffer& buffer) const;
+ void decode(qpid::framing::Buffer& buffer);
+ void update(const char* data, uint32_t len);
+ void update(uint8_t data);
+ void update(const std::string& data) { update(data.c_str(), data.size()); }
+ void update(Typecode t) { update((uint8_t) t); }
+ void update(Direction d) { update((uint8_t) d); }
+ void update(Access a) { update((uint8_t) a); }
+ void update(bool b) { update((uint8_t) (b ? 1 : 0)); }
+ const uint8_t* get() const { return hash; }
+ bool operator==(const SchemaHash& other) const;
+ bool operator<(const SchemaHash& other) const;
+ bool operator>(const SchemaHash& other) const;
+ };
+
+ struct SchemaArgumentImpl {
+ std::string name;
+ Typecode typecode;
+ Direction dir;
+ std::string unit;
+ std::string description;
+
+ SchemaArgumentImpl(const char* n, Typecode t) : name(n), typecode(t), dir(DIR_IN) {}
+ SchemaArgumentImpl(qpid::framing::Buffer& buffer);
+ static SchemaArgument* factory(qpid::framing::Buffer& buffer);
+ void encode(qpid::framing::Buffer& buffer) const;
+ void setDirection(Direction d) { dir = d; }
+ void setUnit(const char* val) { unit = val; }
+ void setDesc(const char* desc) { description = desc; }
+ const std::string& getName() const { return name; }
+ Typecode getType() const { return typecode; }
+ Direction getDirection() const { return dir; }
+ const std::string& getUnit() const { return unit; }
+ const std::string& getDesc() const { return description; }
+ void updateHash(SchemaHash& hash) const;
+ };
+
+ struct SchemaMethodImpl {
+ std::string name;
+ std::string description;
+ std::vector<const SchemaArgument*> arguments;
+
+ SchemaMethodImpl(const char* n) : name(n) {}
+ SchemaMethodImpl(qpid::framing::Buffer& buffer);
+ static SchemaMethod* factory(qpid::framing::Buffer& buffer);
+ void encode(qpid::framing::Buffer& buffer) const;
+ void addArgument(const SchemaArgument* argument);
+ void setDesc(const char* desc) { description = desc; }
+ const std::string& getName() const { return name; }
+ const std::string& getDesc() const { return description; }
+ int getArgumentCount() const { return arguments.size(); }
+ const SchemaArgument* getArgument(int idx) const;
+ void updateHash(SchemaHash& hash) const;
+ };
+
+ struct SchemaPropertyImpl {
+ std::string name;
+ Typecode typecode;
+ Access access;
+ bool index;
+ bool optional;
+ std::string unit;
+ std::string description;
+
+ SchemaPropertyImpl(const char* n, Typecode t) : name(n), typecode(t), access(ACCESS_READ_ONLY), index(false), optional(false) {}
+ SchemaPropertyImpl(qpid::framing::Buffer& buffer);
+ static SchemaProperty* factory(qpid::framing::Buffer& buffer);
+ void encode(qpid::framing::Buffer& buffer) const;
+ void setAccess(Access a) { access = a; }
+ void setIndex(bool val) { index = val; }
+ void setOptional(bool val) { optional = val; }
+ void setUnit(const char* val) { unit = val; }
+ void setDesc(const char* desc) { description = desc; }
+ const std::string& getName() const { return name; }
+ Typecode getType() const { return typecode; }
+ Access getAccess() const { return access; }
+ bool isIndex() const { return index; }
+ bool isOptional() const { return optional; }
+ const std::string& getUnit() const { return unit; }
+ const std::string& getDesc() const { return description; }
+ void updateHash(SchemaHash& hash) const;
+ };
+
+ struct SchemaStatisticImpl {
+ std::string name;
+ Typecode typecode;
+ std::string unit;
+ std::string description;
+
+ SchemaStatisticImpl(const char* n, Typecode t) : name(n), typecode(t) {}
+ SchemaStatisticImpl(qpid::framing::Buffer& buffer);
+ static SchemaStatistic* factory(qpid::framing::Buffer& buffer);
+ void encode(qpid::framing::Buffer& buffer) const;
+ void setUnit(const char* val) { unit = val; }
+ void setDesc(const char* desc) { description = desc; }
+ const std::string& getName() const { return name; }
+ Typecode getType() const { return typecode; }
+ const std::string& getUnit() const { return unit; }
+ const std::string& getDesc() const { return description; }
+ void updateHash(SchemaHash& hash) const;
+ };
+
+ struct SchemaClassKeyImpl {
+ const std::string& package;
+ const std::string& name;
+ const SchemaHash& hash;
+ mutable std::string repr;
+
+ // The *Container elements are only used if there isn't an external place to
+ // store these values.
+ std::string packageContainer;
+ std::string nameContainer;
+ SchemaHash hashContainer;
+
+ SchemaClassKeyImpl(const std::string& package, const std::string& name, const SchemaHash& hash);
+ SchemaClassKeyImpl(qpid::framing::Buffer& buffer);
+ static SchemaClassKey* factory(const std::string& package, const std::string& name, const SchemaHash& hash);
+ static SchemaClassKey* factory(qpid::framing::Buffer& buffer);
+
+ const std::string& getPackageName() const { return package; }
+ const std::string& getClassName() const { return name; }
+ const uint8_t* getHash() const { return hash.get(); }
+
+ void encode(qpid::framing::Buffer& buffer) const;
+ bool operator==(const SchemaClassKeyImpl& other) const;
+ bool operator<(const SchemaClassKeyImpl& other) const;
+ const std::string& str() const;
+ };
+
+ struct SchemaObjectClassImpl {
+ std::string package;
+ std::string name;
+ mutable SchemaHash hash;
+ mutable bool hasHash;
+ std::auto_ptr<SchemaClassKey> classKey;
+ std::vector<const SchemaProperty*> properties;
+ std::vector<const SchemaStatistic*> statistics;
+ std::vector<const SchemaMethod*> methods;
+
+ SchemaObjectClassImpl(const char* p, const char* n) :
+ package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {}
+ SchemaObjectClassImpl(qpid::framing::Buffer& buffer);
+ static SchemaObjectClass* factory(qpid::framing::Buffer& buffer);
+
+ void encode(qpid::framing::Buffer& buffer) const;
+ void addProperty(const SchemaProperty* property);
+ void addStatistic(const SchemaStatistic* statistic);
+ void addMethod(const SchemaMethod* method);
+
+ const SchemaClassKey* getClassKey() const;
+ int getPropertyCount() const { return properties.size(); }
+ int getStatisticCount() const { return statistics.size(); }
+ int getMethodCount() const { return methods.size(); }
+ const SchemaProperty* getProperty(int idx) const;
+ const SchemaStatistic* getStatistic(int idx) const;
+ const SchemaMethod* getMethod(int idx) const;
+ };
+
+ struct SchemaEventClassImpl {
+ std::string package;
+ std::string name;
+ mutable SchemaHash hash;
+ mutable bool hasHash;
+ std::auto_ptr<SchemaClassKey> classKey;
+ std::string description;
+ std::vector<const SchemaArgument*> arguments;
+
+ SchemaEventClassImpl(const char* p, const char* n) :
+ package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {}
+ SchemaEventClassImpl(qpid::framing::Buffer& buffer);
+ static SchemaEventClass* factory(qpid::framing::Buffer& buffer);
+
+ void encode(qpid::framing::Buffer& buffer) const;
+ void addArgument(const SchemaArgument* argument);
+ void setDesc(const char* desc) { description = desc; }
+
+ const SchemaClassKey* getClassKey() const;
+ int getArgumentCount() const { return arguments.size(); }
+ const SchemaArgument* getArgument(int idx) const;
+ };
+}
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/engine/SequenceManager.cpp b/qpid/cpp/src/qmf/engine/SequenceManager.cpp
new file mode 100644
index 0000000000..4a4644a8b9
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/SequenceManager.cpp
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/SequenceManager.h"
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid::sys;
+
+SequenceManager::SequenceManager() : nextSequence(1) {}
+
+void SequenceManager::setUnsolicitedContext(SequenceContext::Ptr ctx)
+{
+ unsolicitedContext = ctx;
+}
+
+uint32_t SequenceManager::reserve(SequenceContext::Ptr ctx)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (ctx.get() == 0)
+ ctx = unsolicitedContext;
+ uint32_t seq = nextSequence;
+ while (contextMap.find(seq) != contextMap.end())
+ seq = seq < 0xFFFFFFFF ? seq + 1 : 1;
+ nextSequence = seq < 0xFFFFFFFF ? seq + 1 : 1;
+ contextMap[seq] = ctx;
+ ctx->reserve();
+ return seq;
+}
+
+void SequenceManager::release(uint32_t sequence)
+{
+ Mutex::ScopedLock _lock(lock);
+
+ if (sequence == 0) {
+ if (unsolicitedContext.get() != 0)
+ unsolicitedContext->release();
+ return;
+ }
+
+ map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter != contextMap.end()) {
+ if (iter->second != 0)
+ iter->second->release();
+ contextMap.erase(iter);
+ }
+}
+
+void SequenceManager::releaseAll()
+{
+ Mutex::ScopedLock _lock(lock);
+ contextMap.clear();
+}
+
+void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, const string& routingKey, qpid::framing::Buffer& buffer)
+{
+ Mutex::ScopedLock _lock(lock);
+ bool done;
+
+ if (sequence == 0) {
+ if (unsolicitedContext.get() != 0) {
+ done = unsolicitedContext->handleMessage(opcode, sequence, routingKey, buffer);
+ if (done)
+ unsolicitedContext->release();
+ }
+ return;
+ }
+
+ map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter != contextMap.end()) {
+ if (iter->second != 0) {
+ done = iter->second->handleMessage(opcode, sequence, routingKey, buffer);
+ if (done) {
+ iter->second->release();
+ contextMap.erase(iter);
+ }
+ }
+ }
+}
+
diff --git a/qpid/cpp/src/qmf/engine/SequenceManager.h b/qpid/cpp/src/qmf/engine/SequenceManager.h
new file mode 100644
index 0000000000..9e47e38610
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/SequenceManager.h
@@ -0,0 +1,68 @@
+#ifndef _QmfEngineSequenceManager_
+#define _QmfEngineSequenceManager_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+#include <map>
+
+namespace qpid {
+ namespace framing {
+ class Buffer;
+ }
+}
+
+namespace qmf {
+namespace engine {
+
+ class SequenceContext {
+ public:
+ typedef boost::shared_ptr<SequenceContext> Ptr;
+ SequenceContext() {}
+ virtual ~SequenceContext() {}
+
+ virtual void reserve() = 0;
+ virtual bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer) = 0;
+ virtual void release() = 0;
+ };
+
+ class SequenceManager {
+ public:
+ SequenceManager();
+
+ void setUnsolicitedContext(SequenceContext::Ptr ctx);
+ uint32_t reserve(SequenceContext::Ptr ctx = SequenceContext::Ptr());
+ void release(uint32_t sequence);
+ void releaseAll();
+ void dispatch(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
+
+ private:
+ mutable qpid::sys::Mutex lock;
+ uint32_t nextSequence;
+ SequenceContext::Ptr unsolicitedContext;
+ std::map<uint32_t, SequenceContext::Ptr> contextMap;
+ };
+
+}
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/engine/ValueImpl.cpp b/qpid/cpp/src/qmf/engine/ValueImpl.cpp
new file mode 100644
index 0000000000..f80bdab866
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ValueImpl.cpp
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/ValueImpl.h"
+#include <qpid/framing/FieldTable.h>
+
+using namespace std;
+using namespace qmf::engine;
+using qpid::framing::Buffer;
+
+ValueImpl::ValueImpl(Typecode t, Buffer& buf) : typecode(t)
+{
+ uint64_t first;
+ uint64_t second;
+ qpid::framing::FieldTable ft;
+
+ switch (typecode) {
+ case TYPE_UINT8 : value.u32 = (uint32_t) buf.getOctet(); break;
+ case TYPE_UINT16 : value.u32 = (uint32_t) buf.getShort(); break;
+ case TYPE_UINT32 : value.u32 = (uint32_t) buf.getLong(); break;
+ case TYPE_UINT64 : value.u64 = buf.getLongLong(); break;
+ case TYPE_SSTR : buf.getShortString(stringVal); break;
+ case TYPE_LSTR : buf.getMediumString(stringVal); break;
+ case TYPE_ABSTIME : value.s64 = buf.getLongLong(); break;
+ case TYPE_DELTATIME : value.u64 = buf.getLongLong(); break;
+ case TYPE_BOOL : value.boolVal = (buf.getOctet() != 0); break;
+ case TYPE_FLOAT : value.floatVal = buf.getFloat(); break;
+ case TYPE_DOUBLE : value.doubleVal = buf.getDouble(); break;
+ case TYPE_INT8 : value.s32 = (int32_t) ((int8_t) buf.getOctet()); break;
+ case TYPE_INT16 : value.s32 = (int32_t) ((int16_t) buf.getShort()); break;
+ case TYPE_INT32 : value.s32 = (int32_t) buf.getLong(); break;
+ case TYPE_INT64 : value.s64 = buf.getLongLong(); break;
+ case TYPE_UUID : buf.getBin128(value.uuidVal); break;
+ case TYPE_REF:
+ first = buf.getLongLong();
+ second = buf.getLongLong();
+ refVal.impl->setValue(first, second);
+ break;
+
+ case TYPE_MAP:
+ ft.decode(buf);
+ // TODO: either update to recursively use QMF types or reduce to int/string/...
+ // (maybe use another ctor with a FieldValue argument)
+ break;
+
+ case TYPE_LIST:
+ case TYPE_ARRAY:
+ case TYPE_OBJECT:
+ default:
+ break;
+ }
+}
+
+ValueImpl::ValueImpl(Typecode t, Typecode at) : typecode(t), valid(false), arrayTypecode(at)
+{
+}
+
+ValueImpl::ValueImpl(Typecode t) : typecode(t)
+{
+ ::memset(&value, 0, sizeof(value));
+}
+
+Value* ValueImpl::factory(Typecode t, Buffer& b)
+{
+ ValueImpl* impl(new ValueImpl(t, b));
+ return new Value(impl);
+}
+
+Value* ValueImpl::factory(Typecode t)
+{
+ ValueImpl* impl(new ValueImpl(t));
+ return new Value(impl);
+}
+
+ValueImpl::~ValueImpl()
+{
+}
+
+void ValueImpl::encode(Buffer& buf) const
+{
+ switch (typecode) {
+ case TYPE_UINT8 : buf.putOctet((uint8_t) value.u32); break;
+ case TYPE_UINT16 : buf.putShort((uint16_t) value.u32); break;
+ case TYPE_UINT32 : buf.putLong(value.u32); break;
+ case TYPE_UINT64 : buf.putLongLong(value.u64); break;
+ case TYPE_SSTR : buf.putShortString(stringVal); break;
+ case TYPE_LSTR : buf.putMediumString(stringVal); break;
+ case TYPE_ABSTIME : buf.putLongLong(value.s64); break;
+ case TYPE_DELTATIME : buf.putLongLong(value.u64); break;
+ case TYPE_BOOL : buf.putOctet(value.boolVal ? 1 : 0); break;
+ case TYPE_FLOAT : buf.putFloat(value.floatVal); break;
+ case TYPE_DOUBLE : buf.putDouble(value.doubleVal); break;
+ case TYPE_INT8 : buf.putOctet((uint8_t) value.s32); break;
+ case TYPE_INT16 : buf.putShort((uint16_t) value.s32); break;
+ case TYPE_INT32 : buf.putLong(value.s32); break;
+ case TYPE_INT64 : buf.putLongLong(value.s64); break;
+ case TYPE_UUID : buf.putBin128(value.uuidVal); break;
+ case TYPE_REF : refVal.impl->encode(buf); break;
+ case TYPE_MAP: // TODO
+ case TYPE_LIST:
+ case TYPE_ARRAY:
+ case TYPE_OBJECT:
+ default:
+ break;
+ }
+}
+
+bool ValueImpl::keyInMap(const char* key) const
+{
+ return typecode == TYPE_MAP && mapVal.count(key) > 0;
+}
+
+Value* ValueImpl::byKey(const char* key)
+{
+ if (keyInMap(key)) {
+ map<std::string, Value>::iterator iter = mapVal.find(key);
+ if (iter != mapVal.end())
+ return &iter->second;
+ }
+ return 0;
+}
+
+const Value* ValueImpl::byKey(const char* key) const
+{
+ if (keyInMap(key)) {
+ map<std::string, Value>::const_iterator iter = mapVal.find(key);
+ if (iter != mapVal.end())
+ return &iter->second;
+ }
+ return 0;
+}
+
+void ValueImpl::deleteKey(const char* key)
+{
+ mapVal.erase(key);
+}
+
+void ValueImpl::insert(const char* key, Value* val)
+{
+ pair<string, Value> entry(key, *val);
+ mapVal.insert(entry);
+}
+
+const char* ValueImpl::key(uint32_t idx) const
+{
+ map<std::string, Value>::const_iterator iter = mapVal.begin();
+ for (uint32_t i = 0; i < idx; i++) {
+ if (iter == mapVal.end())
+ break;
+ iter++;
+ }
+
+ if (iter == mapVal.end())
+ return 0;
+ else
+ return iter->first.c_str();
+}
+
+Value* ValueImpl::listItem(uint32_t)
+{
+ return 0;
+}
+
+void ValueImpl::appendToList(Value*)
+{
+}
+
+void ValueImpl::deleteListItem(uint32_t)
+{
+}
+
+Value* ValueImpl::arrayItem(uint32_t)
+{
+ return 0;
+}
+
+void ValueImpl::appendToArray(Value*)
+{
+}
+
+void ValueImpl::deleteArrayItem(uint32_t)
+{
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Value::Value(const Value& from) : impl(new ValueImpl(*(from.impl))) {}
+Value::Value(Typecode t, Typecode at) : impl(new ValueImpl(t, at)) {}
+Value::Value(ValueImpl* i) : impl(i) {}
+Value::~Value() { delete impl;}
+
+Typecode Value::getType() const { return impl->getType(); }
+bool Value::isNull() const { return impl->isNull(); }
+void Value::setNull() { impl->setNull(); }
+bool Value::isObjectId() const { return impl->isObjectId(); }
+const ObjectId& Value::asObjectId() const { return impl->asObjectId(); }
+void Value::setObjectId(const ObjectId& oid) { impl->setObjectId(oid); }
+bool Value::isUint() const { return impl->isUint(); }
+uint32_t Value::asUint() const { return impl->asUint(); }
+void Value::setUint(uint32_t val) { impl->setUint(val); }
+bool Value::isInt() const { return impl->isInt(); }
+int32_t Value::asInt() const { return impl->asInt(); }
+void Value::setInt(int32_t val) { impl->setInt(val); }
+bool Value::isUint64() const { return impl->isUint64(); }
+uint64_t Value::asUint64() const { return impl->asUint64(); }
+void Value::setUint64(uint64_t val) { impl->setUint64(val); }
+bool Value::isInt64() const { return impl->isInt64(); }
+int64_t Value::asInt64() const { return impl->asInt64(); }
+void Value::setInt64(int64_t val) { impl->setInt64(val); }
+bool Value::isString() const { return impl->isString(); }
+const char* Value::asString() const { return impl->asString(); }
+void Value::setString(const char* val) { impl->setString(val); }
+bool Value::isBool() const { return impl->isBool(); }
+bool Value::asBool() const { return impl->asBool(); }
+void Value::setBool(bool val) { impl->setBool(val); }
+bool Value::isFloat() const { return impl->isFloat(); }
+float Value::asFloat() const { return impl->asFloat(); }
+void Value::setFloat(float val) { impl->setFloat(val); }
+bool Value::isDouble() const { return impl->isDouble(); }
+double Value::asDouble() const { return impl->asDouble(); }
+void Value::setDouble(double val) { impl->setDouble(val); }
+bool Value::isUuid() const { return impl->isUuid(); }
+const uint8_t* Value::asUuid() const { return impl->asUuid(); }
+void Value::setUuid(const uint8_t* val) { impl->setUuid(val); }
+bool Value::isObject() const { return impl->isObject(); }
+const Object* Value::asObject() const { return impl->asObject(); }
+void Value::setObject(Object* val) { impl->setObject(val); }
+bool Value::isMap() const { return impl->isMap(); }
+bool Value::keyInMap(const char* key) const { return impl->keyInMap(key); }
+Value* Value::byKey(const char* key) { return impl->byKey(key); }
+const Value* Value::byKey(const char* key) const { return impl->byKey(key); }
+void Value::deleteKey(const char* key) { impl->deleteKey(key); }
+void Value::insert(const char* key, Value* val) { impl->insert(key, val); }
+uint32_t Value::keyCount() const { return impl->keyCount(); }
+const char* Value::key(uint32_t idx) const { return impl->key(idx); }
+bool Value::isList() const { return impl->isList(); }
+uint32_t Value::listItemCount() const { return impl->listItemCount(); }
+Value* Value::listItem(uint32_t idx) { return impl->listItem(idx); }
+void Value::appendToList(Value* val) { impl->appendToList(val); }
+void Value::deleteListItem(uint32_t idx) { impl->deleteListItem(idx); }
+bool Value::isArray() const { return impl->isArray(); }
+Typecode Value::arrayType() const { return impl->arrayType(); }
+uint32_t Value::arrayItemCount() const { return impl->arrayItemCount(); }
+Value* Value::arrayItem(uint32_t idx) { return impl->arrayItem(idx); }
+void Value::appendToArray(Value* val) { impl->appendToArray(val); }
+void Value::deleteArrayItem(uint32_t idx) { impl->deleteArrayItem(idx); }
+
diff --git a/qpid/cpp/src/qmf/engine/ValueImpl.h b/qpid/cpp/src/qmf/engine/ValueImpl.h
new file mode 100644
index 0000000000..b6adae5d93
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ValueImpl.h
@@ -0,0 +1,150 @@
+#ifndef _QmfEngineValueImpl_
+#define _QmfEngineValueImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/engine/Value.h>
+#include <qmf/engine/ObjectIdImpl.h>
+#include <qmf/engine/Object.h>
+#include <qpid/framing/Buffer.h>
+#include <string>
+#include <string.h>
+#include <map>
+#include <vector>
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+namespace engine {
+
+ // TODO: set valid flag on all value settors
+ // TODO: add a modified flag and accessors
+
+ struct ValueImpl {
+ const Typecode typecode;
+ bool valid;
+
+ ObjectId refVal;
+ std::string stringVal;
+ std::auto_ptr<Object> objectVal;
+ std::map<std::string, Value> mapVal;
+ std::vector<Value> vectorVal;
+ Typecode arrayTypecode;
+
+ union {
+ uint32_t u32;
+ uint64_t u64;
+ int32_t s32;
+ int64_t s64;
+ bool boolVal;
+ float floatVal;
+ double doubleVal;
+ uint8_t uuidVal[16];
+ } value;
+
+ ValueImpl(const ValueImpl& from) :
+ typecode(from.typecode), valid(from.valid), refVal(from.refVal), stringVal(from.stringVal),
+ objectVal(from.objectVal.get() ? new Object(*(from.objectVal)) : 0),
+ mapVal(from.mapVal), vectorVal(from.vectorVal), arrayTypecode(from.arrayTypecode),
+ value(from.value) {}
+
+ ValueImpl(Typecode t, Typecode at);
+ ValueImpl(Typecode t, qpid::framing::Buffer& b);
+ ValueImpl(Typecode t);
+ static Value* factory(Typecode t, qpid::framing::Buffer& b);
+ static Value* factory(Typecode t);
+ ~ValueImpl();
+
+ void encode(qpid::framing::Buffer& b) const;
+
+ Typecode getType() const { return typecode; }
+ bool isNull() const { return !valid; }
+ void setNull() { valid = false; }
+
+ bool isObjectId() const { return typecode == TYPE_REF; }
+ const ObjectId& asObjectId() const { return refVal; }
+ void setObjectId(const ObjectId& o) { refVal = o; } // TODO
+
+ bool isUint() const { return typecode >= TYPE_UINT8 && typecode <= TYPE_UINT32; }
+ uint32_t asUint() const { return value.u32; }
+ void setUint(uint32_t val) { value.u32 = val; }
+
+ bool isInt() const { return typecode >= TYPE_INT8 && typecode <= TYPE_INT32; }
+ int32_t asInt() const { return value.s32; }
+ void setInt(int32_t val) { value.s32 = val; }
+
+ bool isUint64() const { return typecode == TYPE_UINT64 || typecode == TYPE_DELTATIME; }
+ uint64_t asUint64() const { return value.u64; }
+ void setUint64(uint64_t val) { value.u64 = val; }
+
+ bool isInt64() const { return typecode == TYPE_INT64 || typecode == TYPE_ABSTIME; }
+ int64_t asInt64() const { return value.s64; }
+ void setInt64(int64_t val) { value.s64 = val; }
+
+ bool isString() const { return typecode == TYPE_SSTR || typecode == TYPE_LSTR; }
+ const char* asString() const { return stringVal.c_str(); }
+ void setString(const char* val) { stringVal = val; }
+
+ bool isBool() const { return typecode == TYPE_BOOL; }
+ bool asBool() const { return value.boolVal; }
+ void setBool(bool val) { value.boolVal = val; }
+
+ bool isFloat() const { return typecode == TYPE_FLOAT; }
+ float asFloat() const { return value.floatVal; }
+ void setFloat(float val) { value.floatVal = val; }
+
+ bool isDouble() const { return typecode == TYPE_DOUBLE; }
+ double asDouble() const { return value.doubleVal; }
+ void setDouble(double val) { value.doubleVal = val; }
+
+ bool isUuid() const { return typecode == TYPE_UUID; }
+ const uint8_t* asUuid() const { return value.uuidVal; }
+ void setUuid(const uint8_t* val) { ::memcpy(value.uuidVal, val, 16); }
+
+ bool isObject() const { return typecode == TYPE_OBJECT; }
+ Object* asObject() const { return objectVal.get(); }
+ void setObject(Object* val) { objectVal.reset(val); }
+
+ bool isMap() const { return typecode == TYPE_MAP; }
+ bool keyInMap(const char* key) const;
+ Value* byKey(const char* key);
+ const Value* byKey(const char* key) const;
+ void deleteKey(const char* key);
+ void insert(const char* key, Value* val);
+ uint32_t keyCount() const { return mapVal.size(); }
+ const char* key(uint32_t idx) const;
+
+ bool isList() const { return typecode == TYPE_LIST; }
+ uint32_t listItemCount() const { return vectorVal.size(); }
+ Value* listItem(uint32_t idx);
+ void appendToList(Value* val);
+ void deleteListItem(uint32_t idx);
+
+ bool isArray() const { return typecode == TYPE_ARRAY; }
+ Typecode arrayType() const { return arrayTypecode; }
+ uint32_t arrayItemCount() const { return vectorVal.size(); }
+ Value* arrayItem(uint32_t idx);
+ void appendToArray(Value* val);
+ void deleteArrayItem(uint32_t idx);
+ };
+}
+}
+
+#endif
+