summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-05-22 21:40:57 +0000
committerTed Ross <tross@apache.org>2009-05-22 21:40:57 +0000
commit36319d26120c163c0c36598038859dad716ac358 (patch)
tree5e3087d14842f7ac87aaa89513ff204e0f1d40de /cpp/src
parent2cd113fc9e5c810ca8045dc1d70bcd6efd685f47 (diff)
downloadqpid-python-36319d26120c163c0c36598038859dad716ac358.tar.gz
QPID-1874 - First drop of the second-generation QMF libraries.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@777720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/CMakeLists.txt18
-rw-r--r--cpp/src/qmf.mk38
-rw-r--r--cpp/src/qmf/Agent.cpp962
-rw-r--r--cpp/src/qmf/Agent.h206
-rw-r--r--cpp/src/qmf/Console.h82
-rw-r--r--cpp/src/qmf/Event.h30
-rw-r--r--cpp/src/qmf/Message.h39
-rw-r--r--cpp/src/qmf/MessageImpl.cpp43
-rw-r--r--cpp/src/qmf/MessageImpl.h42
-rw-r--r--cpp/src/qmf/Object.h47
-rw-r--r--cpp/src/qmf/ObjectId.h53
-rw-r--r--cpp/src/qmf/ObjectIdImpl.cpp192
-rw-r--r--cpp/src/qmf/ObjectIdImpl.h66
-rw-r--r--cpp/src/qmf/ObjectImpl.cpp222
-rw-r--r--cpp/src/qmf/ObjectImpl.h62
-rw-r--r--cpp/src/qmf/Query.h54
-rw-r--r--cpp/src/qmf/QueryImpl.cpp71
-rw-r--r--cpp/src/qmf/QueryImpl.h48
-rw-r--r--cpp/src/qmf/ResilientConnection.cpp460
-rw-r--r--cpp/src/qmf/ResilientConnection.h166
-rw-r--r--cpp/src/qmf/Schema.h160
-rw-r--r--cpp/src/qmf/SchemaImpl.cpp740
-rw-r--r--cpp/src/qmf/SchemaImpl.h195
-rw-r--r--cpp/src/qmf/Typecode.h51
-rw-r--r--cpp/src/qmf/Value.h113
-rw-r--r--cpp/src/qmf/ValueImpl.cpp478
-rw-r--r--cpp/src/qmf/ValueImpl.h144
27 files changed, 4778 insertions, 4 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index a4affcf010..222ea91c63 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -578,12 +578,28 @@ set (qmfagent_SOURCES
qpid/agent/ManagementAgent.h
qpid/agent/ManagementAgentImpl.cpp
qpid/agent/ManagementAgentImpl.h
+ qmf/Agent.cpp
)
add_library (qmfagent SHARED ${qmfagent_SOURCES})
-target_link_libraries (qmfagent qpidclient)
+target_link_libraries (qmfagent qmfcommon)
set_target_properties (qmfagent PROPERTIES
VERSION ${qpidc_version})
+set (qmfcommon_SOURCES
+ qmf/Agent.cpp
+ qmf/ResilientConnection.cpp
+ qmf/MessageImpl.cpp
+ qmf/SchemaImpl.cpp
+ qmf/ValueImpl.cpp
+ qmf/ObjectIdImpl.cpp
+ qmf/ObjectImpl.cpp
+ qmf/QueryImpl.cpp
+ )
+add_library (qmfcommon SHARED ${qmfcommon_SOURCES})
+target_link_libraries (qmfcommon qpidclient)
+set_target_properties (qmfcommon PROPERTIES
+ VERSION ${qpidc_version})
+
# QMF console library
#module_hdr += \
# qpid/console/Agent.h \
diff --git a/cpp/src/qmf.mk b/cpp/src/qmf.mk
index ebd11e04f7..d5ae0e721c 100644
--- a/cpp/src/qmf.mk
+++ b/cpp/src/qmf.mk
@@ -20,17 +20,49 @@
#
# qmf agent library makefile fragment, to be included in Makefile.am
#
-lib_LTLIBRARIES += libqmfagent.la
+lib_LTLIBRARIES += \
+ libqmfcommon.la \
+ libqmfagent.la
module_hdr += \
qpid/agent/ManagementAgent.h \
qpid/agent/ManagementAgentImpl.h \
- qpid/agent/QmfAgentImportExport.h
+ qpid/agent/QmfAgentImportExport.h \
+ qmf/Agent.h \
+ qmf/Console.h \
+ qmf/Event.h \
+ qmf/Message.h \
+ qmf/MessageImpl.h \
+ qmf/Object.h \
+ qmf/ObjectId.h \
+ qmf/ObjectIdImpl.h \
+ qmf/ObjectImpl.h \
+ qmf/Query.h \
+ qmf/QueryImpl.h \
+ qmf/ResilientConnection.h \
+ qmf/Schema.h \
+ qmf/SchemaImpl.h \
+ qmf/Typecode.h \
+ qmf/Value.h \
+ qmf/ValueImpl.h
+
+libqmfcommon_la_SOURCES = \
+ qmf/Agent.cpp \
+ qmf/ResilientConnection.cpp \
+ qmf/MessageImpl.cpp \
+ qmf/SchemaImpl.cpp \
+ qmf/ValueImpl.cpp \
+ qmf/ObjectIdImpl.cpp \
+ qmf/ObjectImpl.cpp \
+ qmf/QueryImpl.cpp
libqmfagent_la_SOURCES = \
qpid/agent/ManagementAgent.h \
qpid/agent/ManagementAgentImpl.cpp \
- qpid/agent/ManagementAgentImpl.h
+ qpid/agent/ManagementAgentImpl.h \
+ qmf/Agent.cpp
libqmfagent_la_LIBADD = libqpidclient.la
+libqmfagent_ladir = $(includedir)/qmf
+libqmfagent_la_HEADERS = $(module_hdr)
diff --git a/cpp/src/qmf/Agent.cpp b/cpp/src/qmf/Agent.cpp
new file mode 100644
index 0000000000..1071e445c8
--- /dev/null
+++ b/cpp/src/qmf/Agent.cpp
@@ -0,0 +1,962 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Agent.h"
+#include "MessageImpl.h"
+#include "SchemaImpl.h"
+#include "Typecode.h"
+#include "ObjectImpl.h"
+#include "ObjectIdImpl.h"
+#include "QueryImpl.h"
+#include "ValueImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/Time.h>
+#include <string.h>
+#include <string>
+#include <deque>
+#include <map>
+#include <unistd.h>
+#include <fcntl.h>
+#include <iostream>
+#include <fstream>
+#include <boost/shared_ptr.hpp>
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace qmf {
+
+ struct AgentEventImpl {
+ typedef boost::shared_ptr<AgentEventImpl> Ptr;
+ AgentEvent::EventKind kind;
+ uint32_t sequence;
+ string authUserId;
+ string authToken;
+ string name;
+ Object* object;
+ boost::shared_ptr<ObjectId> objectId;
+ Query query;
+ boost::shared_ptr<Value> arguments;
+ string exchange;
+ string bindingKey;
+ SchemaObjectClass* objectClass;
+
+ AgentEventImpl(AgentEvent::EventKind k) :
+ kind(k), sequence(0), object(0), objectClass(0) {}
+ ~AgentEventImpl() {}
+ AgentEvent copy();
+ };
+
+ struct AgentQueryContext {
+ typedef boost::shared_ptr<AgentQueryContext> Ptr;
+ uint32_t sequence;
+ string exchange;
+ string key;
+ SchemaMethodImpl* schemaMethod;
+ AgentQueryContext() : schemaMethod(0) {}
+ };
+
+ class AgentImpl {
+ public:
+ AgentImpl(char* label, bool internalStore);
+ ~AgentImpl();
+
+ void setStoreDir(char* path);
+ void setTransferDir(char* path);
+ void handleRcvMessage(Message& message);
+ bool getXmtMessage(Message& item);
+ void popXmt();
+ bool getEvent(AgentEvent& event);
+ void popEvent();
+ void newSession();
+ void startProtocol();
+ void heartbeat();
+ void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
+ void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat);
+ void queryComplete(uint32_t sequence);
+ void registerClass(SchemaObjectClass* cls);
+ void registerClass(SchemaEventClass* cls);
+ const ObjectId* addObject(Object& obj, uint64_t persistId);
+ const ObjectId* allocObjectId(uint64_t persistId);
+ const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
+ void raiseEvent(Event& event);
+
+ private:
+ Mutex lock;
+ Mutex addLock;
+ string label;
+ string queueName;
+ string storeDir;
+ string transferDir;
+ bool internalStore;
+ uint64_t nextTransientId;
+ Uuid systemId;
+ uint32_t requestedBrokerBank;
+ uint32_t requestedAgentBank;
+ uint32_t assignedBrokerBank;
+ uint32_t assignedAgentBank;
+ AgentAttachment attachment;
+ uint16_t bootSequence;
+ uint64_t nextObjectId;
+ uint32_t nextContextNum;
+ deque<AgentEventImpl::Ptr> eventQueue;
+ deque<MessageImpl::Ptr> xmtQueue;
+ map<uint32_t, AgentQueryContext::Ptr> contextMap;
+
+ static const char* QMF_EXCHANGE;
+ static const char* DIR_EXCHANGE;
+ static const char* BROKER_KEY;
+ static const uint32_t MERR_UNKNOWN_METHOD = 2;
+ static const uint32_t MERR_UNKNOWN_PACKAGE = 8;
+ static const uint32_t MERR_UNKNOWN_CLASS = 9;
+ static const uint32_t MERR_INTERNAL_ERROR = 10;
+# define MA_BUFFER_SIZE 65536
+ char outputBuffer[MA_BUFFER_SIZE];
+
+ struct SchemaClassKey {
+ string name;
+ uint8_t hash[16];
+ SchemaClassKey(const string& n, const uint8_t* h) : name(n) {
+ memcpy(hash, h, 16);
+ }
+ SchemaClassKey(Buffer& buffer) {
+ buffer.getShortString(name);
+ buffer.getBin128(hash);
+ }
+ string repr() {
+ return name;
+ }
+ };
+
+ struct SchemaClassKeyComp {
+ bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+ typedef map<SchemaClassKey, SchemaObjectClassImpl*, SchemaClassKeyComp> ObjectClassMap;
+ typedef map<SchemaClassKey, SchemaEventClassImpl*, SchemaClassKeyComp> EventClassMap;
+
+ struct ClassMaps {
+ ObjectClassMap objectClasses;
+ EventClassMap eventClasses;
+ };
+
+ map<string, ClassMaps> packages;
+
+ bool checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ void encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ AgentEventImpl::Ptr eventDeclareQueue(const string& queueName);
+ AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
+ AgentEventImpl::Ptr eventSetupComplete();
+ AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls,
+ boost::shared_ptr<ObjectId> oid);
+ AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
+ boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
+ SchemaObjectClass* objectClass);
+ void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
+
+ void sendPackageIndicationLH(const string& packageName);
+ void sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key);
+ void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
+ uint32_t code = 0, const string& text = "OK");
+ void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
+ void handleAttachResponse(Buffer& inBuffer);
+ void handlePackageRequest(Buffer& inBuffer);
+ void handleClassQuery(Buffer& inBuffer);
+ void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
+ const string& replyToExchange, const string& replyToKey);
+ void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
+ void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
+ void handleConsoleAddedIndication();
+ };
+}
+
+const char* AgentImpl::QMF_EXCHANGE = "qpid.management";
+const char* AgentImpl::DIR_EXCHANGE = "amq.direct";
+const char* AgentImpl::BROKER_KEY = "broker";
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+AgentEvent AgentEventImpl::copy()
+{
+ AgentEvent item;
+
+ ::memset(&item, 0, sizeof(AgentEvent));
+ item.kind = kind;
+ item.sequence = sequence;
+ item.object = object;
+ item.objectId = objectId.get();
+ item.query = &query;
+ item.arguments = arguments.get();
+ item.objectClass = objectClass;
+
+ STRING_REF(authUserId);
+ STRING_REF(authToken);
+ STRING_REF(name);
+ STRING_REF(exchange);
+ STRING_REF(bindingKey);
+
+ return item;
+}
+
+AgentImpl::AgentImpl(char* _label, bool i) :
+ label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1),
+ requestedBrokerBank(0), requestedAgentBank(0),
+ assignedBrokerBank(0), assignedAgentBank(0),
+ bootSequence(1), nextObjectId(1), nextContextNum(1)
+{
+ queueName += label;
+}
+
+AgentImpl::~AgentImpl()
+{
+ cout << "AgentImpl::~AgentImpl" << endl;
+}
+
+void AgentImpl::setStoreDir(char* path)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (path)
+ storeDir = path;
+ else
+ storeDir.clear();
+}
+
+void AgentImpl::setTransferDir(char* path)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (path)
+ transferDir = path;
+ else
+ transferDir.clear();
+}
+
+void AgentImpl::handleRcvMessage(Message& message)
+{
+ Buffer inBuffer(message.body, message.length);
+ uint8_t opcode;
+ uint32_t sequence;
+ string replyToExchange(message.replyExchange ? message.replyExchange : "");
+ string replyToKey(message.replyKey ? message.replyKey : "");
+ string userId(message.userId ? message.userId : "");
+
+ if (checkHeader(inBuffer, &opcode, &sequence)) {
+ if (opcode == 'a') handleAttachResponse(inBuffer);
+ else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
+ else if (opcode == 'x') handleConsoleAddedIndication();
+ else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey, userId);
+ else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey, userId);
+ }
+}
+
+bool AgentImpl::getXmtMessage(Message& item)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (xmtQueue.empty())
+ return false;
+ item = xmtQueue.front()->copy();
+ return true;
+}
+
+void AgentImpl::popXmt()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!xmtQueue.empty())
+ xmtQueue.pop_front();
+}
+
+bool AgentImpl::getEvent(AgentEvent& event)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void AgentImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+void AgentImpl::newSession()
+{
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.clear();
+ xmtQueue.clear();
+ eventQueue.push_back(eventDeclareQueue(queueName));
+ eventQueue.push_back(eventBind("amq.direct", queueName, queueName));
+ eventQueue.push_back(eventSetupComplete());
+}
+
+void AgentImpl::startProtocol()
+{
+ Mutex::ScopedLock _lock(lock);
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ encodeHeader(buffer, 'A');
+ buffer.putShortString("qmfa");
+ systemId.encode(buffer);
+ buffer.putLong(requestedBrokerBank);
+ buffer.putLong(requestedAgentBank);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
+ " reqAgent=" << requestedAgentBank);
+}
+
+void AgentImpl::heartbeat()
+{
+ Mutex::ScopedLock _lock(lock);
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+
+ encodeHeader(buffer, 'h');
+ buffer.putLongLong(uint64_t(Duration(now())));
+ stringstream key;
+ key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
+ sendBufferLH(buffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT HeartbeatIndication");
+}
+
+void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
+ const Value& argMap)
+{
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ AgentQueryContext::Ptr context = iter->second;
+ contextMap.erase(iter);
+
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ encodeHeader(buffer, 'm', context->sequence);
+ buffer.putLong(status);
+ buffer.putMediumString(text);
+ if (status == 0) {
+ for (vector<SchemaArgumentImpl*>::const_iterator aIter = context->schemaMethod->arguments.begin();
+ aIter != context->schemaMethod->arguments.end(); aIter++) {
+ const SchemaArgumentImpl* schemaArg = *aIter;
+ if (schemaArg->dir == DIR_OUT || schemaArg->dir == DIR_IN_OUT) {
+ if (argMap.keyInMap(schemaArg->name.c_str())) {
+ const Value* val = argMap.byKey(schemaArg->name.c_str());
+ val->impl->encode(buffer);
+ } else {
+ Value val(schemaArg->typecode);
+ val.impl->encode(buffer);
+ }
+ }
+ }
+ }
+ sendBufferLH(buffer, context->exchange, context->key);
+ QPID_LOG(trace, "SENT MethodResponse");
+}
+
+void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
+{
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ AgentQueryContext::Ptr context = iter->second;
+
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ encodeHeader(buffer, 'g', context->sequence);
+
+ object.impl->encodeSchemaKey(buffer);
+ object.impl->encodeManagedObjectData(buffer);
+ if (prop)
+ object.impl->encodeProperties(buffer);
+ if (stat)
+ object.impl->encodeStatistics(buffer);
+
+ sendBufferLH(buffer, context->exchange, context->key);
+ QPID_LOG(trace, "SENT ContentIndication");
+}
+
+void AgentImpl::queryComplete(uint32_t sequence)
+{
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+
+ AgentQueryContext::Ptr context = iter->second;
+ contextMap.erase(iter);
+ sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
+}
+
+void AgentImpl::registerClass(SchemaObjectClass* cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ SchemaObjectClassImpl* impl = cls->impl;
+
+ map<string, ClassMaps>::iterator iter = packages.find(impl->package);
+ if (iter == packages.end()) {
+ packages[impl->package] = ClassMaps();
+ iter = packages.find(impl->package);
+ // TODO: Indicate this package if connected
+ }
+
+ SchemaClassKey key(impl->name, impl->getHash());
+ iter->second.objectClasses[key] = impl;
+
+ // TODO: Indicate this schema if connected.
+}
+
+void AgentImpl::registerClass(SchemaEventClass* cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ SchemaEventClassImpl* impl = cls->impl;
+
+ map<string, ClassMaps>::iterator iter = packages.find(impl->package);
+ if (iter == packages.end()) {
+ packages[impl->package] = ClassMaps();
+ iter = packages.find(impl->package);
+ // TODO: Indicate this package if connected
+ }
+
+ SchemaClassKey key(impl->name, impl->getHash());
+ iter->second.eventClasses[key] = impl;
+
+ // TODO: Indicate this schema if connected.
+}
+
+const ObjectId* AgentImpl::addObject(Object&, uint64_t)
+{
+ Mutex::ScopedLock _lock(lock);
+ return 0;
+}
+
+const ObjectId* AgentImpl::allocObjectId(uint64_t persistId)
+{
+ Mutex::ScopedLock _lock(lock);
+ uint16_t sequence = persistId ? 0 : bootSequence;
+ uint64_t objectNum = persistId ? persistId : nextObjectId++;
+
+ ObjectIdImpl* oid = new ObjectIdImpl(&attachment, 0, sequence, objectNum);
+ return oid->envelope;
+}
+
+const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
+{
+ return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo);
+}
+
+void AgentImpl::raiseEvent(Event&)
+{
+ Mutex::ScopedLock _lock(lock);
+}
+
+void AgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
+{
+ buf.putOctet('A');
+ buf.putOctet('M');
+ buf.putOctet('2');
+ buf.putOctet(opcode);
+ buf.putLong (seq);
+}
+
+bool AgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
+{
+ if (buf.getSize() < 8)
+ return false;
+
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
+
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '2';
+}
+
+AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name)
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
+ event->name = name;
+
+ return event;
+}
+
+AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue,
+ const string& key)
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND));
+ event->name = queue;
+ event->exchange = exchange;
+ event->bindingKey = key;
+
+ return event;
+}
+
+AgentEventImpl::Ptr AgentImpl::eventSetupComplete()
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE));
+ return event;
+}
+
+AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package,
+ const string& cls, boost::shared_ptr<ObjectId> oid)
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
+ event->sequence = num;
+ event->authUserId = userId;
+ event->query.impl->packageName = package;
+ event->query.impl->className = cls;
+ event->query.impl->oid = oid;
+ return event;
+}
+
+AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method,
+ boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
+ SchemaObjectClass* objectClass)
+{
+ AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL));
+ event->sequence = num;
+ event->authUserId = userId;
+ event->name = method;
+ event->objectId = oid;
+ event->arguments = argMap;
+ event->objectClass = objectClass;
+ return event;
+}
+
+void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+{
+ uint32_t length = buf.getPosition();
+ MessageImpl::Ptr message(new MessageImpl);
+
+ buf.reset();
+ buf.getRawData(message->body, length);
+ message->destination = destination;
+ message->routingKey = routingKey;
+ message->replyExchange = "amq.direct";
+ message->replyKey = queueName;
+
+ xmtQueue.push_back(message);
+}
+
+void AgentImpl::sendPackageIndicationLH(const string& packageName)
+{
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ encodeHeader(buffer, 'p');
+ buffer.putShortString(packageName);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
+}
+
+void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key)
+{
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ encodeHeader(buffer, 'q');
+ buffer.putOctet((int) kind);
+ buffer.putShortString(packageName);
+ buffer.putShortString(key.name);
+ buffer.putBin128(const_cast<uint8_t*>(key.hash)); // const_cast needed for older Qpid libraries
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name);
+}
+
+void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey,
+ uint32_t sequence, uint32_t code, const string& text)
+{
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ encodeHeader(buffer, 'z', sequence);
+ buffer.putLong(code);
+ buffer.putShortString(text);
+ sendBufferLH(buffer, exchange, replyToKey);
+ QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
+}
+
+void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
+{
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ encodeHeader(buffer, 'm', sequence);
+ buffer.putLong(code);
+
+ string fulltext;
+ switch (code) {
+ case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break;
+ case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break;
+ case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break;
+ case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break;
+ default: fulltext = "Unspecified Error"; break;
+ }
+
+ if (!text.empty()) {
+ fulltext += " (";
+ fulltext += text;
+ fulltext += ")";
+ }
+
+ buffer.putMediumString(fulltext);
+ sendBufferLH(buffer, DIR_EXCHANGE, key);
+ QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext);
+}
+
+void AgentImpl::handleAttachResponse(Buffer& inBuffer)
+{
+ Mutex::ScopedLock _lock(lock);
+
+ assignedBrokerBank = inBuffer.getLong();
+ assignedAgentBank = inBuffer.getLong();
+
+ QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
+
+ if ((assignedBrokerBank != requestedBrokerBank) ||
+ (assignedAgentBank != requestedAgentBank)) {
+ if (requestedAgentBank == 0) {
+ QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
+ assignedAgentBank);
+ } else {
+ QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
+ "." << assignedAgentBank);
+ }
+ //storeData(); // TODO
+ requestedBrokerBank = assignedBrokerBank;
+ requestedAgentBank = assignedAgentBank;
+ }
+
+ attachment.setBanks(assignedBrokerBank, assignedAgentBank);
+
+ // Bind to qpid.management to receive commands
+ stringstream key;
+ key << "agent." << assignedBrokerBank << "." << assignedAgentBank;
+ eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str()));
+
+ // Send package indications for all local packages
+ for (map<string, ClassMaps>::iterator pIter = packages.begin();
+ pIter != packages.end();
+ pIter++) {
+ sendPackageIndicationLH(pIter->first);
+
+ // Send class indications for all local classes
+ ClassMaps cMap = pIter->second;
+ for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin();
+ cIter != cMap.objectClasses.end(); cIter++)
+ sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first);
+ for (EventClassMap::iterator cIter = cMap.eventClasses.begin();
+ cIter != cMap.eventClasses.end(); cIter++)
+ sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first);
+ }
+}
+
+void AgentImpl::handlePackageRequest(Buffer&)
+{
+ Mutex::ScopedLock _lock(lock);
+}
+
+void AgentImpl::handleClassQuery(Buffer&)
+{
+ Mutex::ScopedLock _lock(lock);
+}
+
+void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
+ const string& replyExchange, const string& replyKey)
+{
+ Mutex::ScopedLock _lock(lock);
+ string rExchange(replyExchange);
+ string rKey(replyKey);
+ string packageName;
+ inBuffer.getShortString(packageName);
+ SchemaClassKey key(inBuffer);
+
+ if (rExchange.empty())
+ rExchange = QMF_EXCHANGE;
+ if (rKey.empty())
+ rKey = BROKER_KEY;
+
+ QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
+
+ map<string, ClassMaps>::iterator pIter = packages.find(packageName);
+ if (pIter == packages.end()) {
+ sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found");
+ return;
+ }
+
+ ClassMaps cMap = pIter->second;
+ ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key);
+ if (ocIter != cMap.objectClasses.end()) {
+ SchemaObjectClassImpl* oImpl = ocIter->second;
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ encodeHeader(buffer, 's', sequence);
+ oImpl->encode(buffer);
+ sendBufferLH(buffer, rExchange, rKey);
+ QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
+ return;
+ }
+
+ EventClassMap::iterator ecIter = cMap.eventClasses.find(key);
+ if (ecIter != cMap.eventClasses.end()) {
+ SchemaEventClassImpl* eImpl = ecIter->second;
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ encodeHeader(buffer, 's', sequence);
+ eImpl->encode(buffer);
+ sendBufferLH(buffer, rExchange, rKey);
+ QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
+ return;
+ }
+
+ sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found");
+}
+
+void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId)
+{
+ Mutex::ScopedLock _lock(lock);
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+ map<string, ClassMaps>::const_iterator pIter = packages.end();
+ string pname;
+ string cname;
+ string oidRepr;
+ boost::shared_ptr<ObjectId> oid;
+
+ ft.decode(inBuffer);
+
+ QPID_LOG(trace, "RCVD GetQuery: map=" << ft);
+
+ value = ft.get("_package");
+ if (value.get() && value->convertsTo<string>()) {
+ pname = value->get<string>();
+ pIter = packages.find(pname);
+ if (pIter == packages.end()) {
+ sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence);
+ return;
+ }
+ }
+
+ value = ft.get("_class");
+ if (value.get() && value->convertsTo<string>()) {
+ cname = value->get<string>();
+ // TODO - check for validity of class (in package or any package)
+ if (pIter == packages.end()) {
+ } else {
+
+ }
+ }
+
+ value = ft.get("_objectid");
+ if (value.get() && value->convertsTo<string>()) {
+ oidRepr = value->get<string>();
+ oid.reset(new ObjectId());
+ oid->impl->fromString(oidRepr);
+ }
+
+ AgentQueryContext::Ptr context(new AgentQueryContext);
+ uint32_t contextNum = nextContextNum++;
+ context->sequence = sequence;
+ context->exchange = DIR_EXCHANGE;
+ context->key = replyTo;
+ contextMap[contextNum] = context;
+
+ eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid));
+}
+
+void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId)
+{
+ Mutex::ScopedLock _lock(lock);
+ string pname;
+ string method;
+ ObjectIdImpl* oidImpl = new ObjectIdImpl(buffer);
+ boost::shared_ptr<ObjectId> oid(oidImpl->envelope);
+ buffer.getShortString(pname);
+ SchemaClassKey classKey(buffer);
+ buffer.getShortString(method);
+
+ map<string, ClassMaps>::const_iterator pIter = packages.find(pname);
+ if (pIter == packages.end()) {
+ sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname);
+ return;
+ }
+
+ ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey);
+ if (cIter == pIter->second.objectClasses.end()) {
+ sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr());
+ return;
+ }
+
+ const SchemaObjectClassImpl* schema = cIter->second;
+ vector<SchemaMethodImpl*>::const_iterator mIter = schema->methods.begin();
+ for (; mIter != schema->methods.end(); mIter++) {
+ if ((*mIter)->name == method)
+ break;
+ }
+
+ if (mIter == schema->methods.end()) {
+ sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method);
+ return;
+ }
+
+ SchemaMethodImpl* schemaMethod = *mIter;
+ boost::shared_ptr<Value> argMap(new Value(TYPE_MAP));
+ ValueImpl* value;
+ for (vector<SchemaArgumentImpl*>::const_iterator aIter = schemaMethod->arguments.begin();
+ aIter != schemaMethod->arguments.end(); aIter++) {
+ const SchemaArgumentImpl* schemaArg = *aIter;
+ if (schemaArg->dir == DIR_IN || schemaArg->dir == DIR_IN_OUT)
+ value = new ValueImpl(schemaArg->typecode, buffer);
+ else
+ value = new ValueImpl(schemaArg->typecode);
+ argMap->insert(schemaArg->name.c_str(), value->envelope);
+ }
+
+ AgentQueryContext::Ptr context(new AgentQueryContext);
+ uint32_t contextNum = nextContextNum++;
+ context->sequence = sequence;
+ context->exchange = DIR_EXCHANGE;
+ context->key = replyTo;
+ context->schemaMethod = schemaMethod;
+ contextMap[contextNum] = context;
+
+ eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema->envelope));
+}
+
+void AgentImpl::handleConsoleAddedIndication()
+{
+ Mutex::ScopedLock _lock(lock);
+}
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Agent::Agent(char* label, bool internalStore)
+{
+ impl = new AgentImpl(label, internalStore);
+}
+
+Agent::~Agent()
+{
+ cout << "Agent::~Agent" << endl;
+ delete impl;
+}
+
+void Agent::setStoreDir(char* path)
+{
+ impl->setStoreDir(path);
+}
+
+void Agent::setTransferDir(char* path)
+{
+ impl->setTransferDir(path);
+}
+
+void Agent::handleRcvMessage(Message& message)
+{
+ impl->handleRcvMessage(message);
+}
+
+bool Agent::getXmtMessage(Message& item)
+{
+ return impl->getXmtMessage(item);
+}
+
+void Agent::popXmt()
+{
+ impl->popXmt();
+}
+
+bool Agent::getEvent(AgentEvent& event)
+{
+ return impl->getEvent(event);
+}
+
+void Agent::popEvent()
+{
+ impl->popEvent();
+}
+
+void Agent::newSession()
+{
+ impl->newSession();
+}
+
+void Agent::startProtocol()
+{
+ impl->startProtocol();
+}
+
+void Agent::heartbeat()
+{
+ impl->heartbeat();
+}
+
+void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments)
+{
+ impl->methodResponse(sequence, status, text, arguments);
+}
+
+void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
+{
+ impl->queryResponse(sequence, object, prop, stat);
+}
+
+void Agent::queryComplete(uint32_t sequence)
+{
+ impl->queryComplete(sequence);
+}
+
+void Agent::registerClass(SchemaObjectClass* cls)
+{
+ impl->registerClass(cls);
+}
+
+void Agent::registerClass(SchemaEventClass* cls)
+{
+ impl->registerClass(cls);
+}
+
+const ObjectId* Agent::addObject(Object& obj, uint64_t persistId)
+{
+ return impl->addObject(obj, persistId);
+}
+
+const ObjectId* Agent::allocObjectId(uint64_t persistId)
+{
+ return impl->allocObjectId(persistId);
+}
+
+const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
+{
+ return impl->allocObjectId(persistIdLo, persistIdHi);
+}
+
+void Agent::raiseEvent(Event& event)
+{
+ impl->raiseEvent(event);
+}
+
diff --git a/cpp/src/qmf/Agent.h b/cpp/src/qmf/Agent.h
new file mode 100644
index 0000000000..d8f784e9d8
--- /dev/null
+++ b/cpp/src/qmf/Agent.h
@@ -0,0 +1,206 @@
+#ifndef _QmfAgent_
+#define _QmfAgent_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/Schema.h>
+#include <qmf/ObjectId.h>
+#include <qmf/Object.h>
+#include <qmf/Event.h>
+#include <qmf/Query.h>
+#include <qmf/Value.h>
+#include <qmf/Message.h>
+
+namespace qmf {
+
+ /**
+ * AgentEvent
+ *
+ * This structure represents a QMF event coming from the agent to
+ * the application.
+ */
+ struct AgentEvent {
+ enum EventKind {
+ GET_QUERY = 1,
+ START_SYNC = 2,
+ END_SYNC = 3,
+ METHOD_CALL = 4,
+ DECLARE_QUEUE = 5,
+ DELETE_QUEUE = 6,
+ BIND = 7,
+ UNBIND = 8,
+ SETUP_COMPLETE = 9
+ };
+
+ EventKind kind;
+ uint32_t sequence; // Protocol sequence (for all kinds)
+ char* authUserId; // Authenticated user ID (for all kinds)
+ char* authToken; // Authentication token if issued (for all kinds)
+ char* name; // Name of the method/sync query
+ // (METHOD_CALL, START_SYNC, END_SYNC, DECLARE_QUEUE, BIND, UNBIND)
+ Object* object; // Object involved in method call (METHOD_CALL)
+ ObjectId* objectId; // ObjectId for method call (METHOD_CALL)
+ Query* query; // Query parameters (GET_QUERY, START_SYNC)
+ Value* arguments; // Method parameters (METHOD_CALL)
+ char* exchange; // Exchange for bind (BIND, UNBIND)
+ char* bindingKey; // Key for bind (BIND, UNBIND)
+ SchemaObjectClass* objectClass; // (METHOD_CALL)
+ };
+
+ class AgentImpl;
+
+ /**
+ * Agent - Protocol engine for the QMF agent
+ */
+ class Agent {
+ public:
+ Agent(char* label, bool internalStore=true);
+ ~Agent();
+
+ /**
+ * Configure the directory path for storing persistent data.
+ *@param path Null-terminated string containing a directory path where files can be
+ * created, written, and read. If NULL, no persistent storage will be
+ * attempted.
+ */
+ void setStoreDir(char* path);
+
+ /**
+ * Configure the directory path for files transferred over QMF.
+ *@param path Null-terminated string containing a directory path where files can be
+ * created, deleted, written, and read. If NULL, file transfers shall not
+ * be permitted.
+ */
+ void setTransferDir(char* path);
+
+ /**
+ * Pass messages received from the AMQP session to the Agent engine.
+ *@param message AMQP messages received on the agent session.
+ */
+ void handleRcvMessage(Message& message);
+
+ /**
+ * Get the next message to be sent to the AMQP network.
+ *@param item The Message structure describing the message to be produced.
+ *@return true if the Message is valid, false if there are no messages to send.
+ */
+ bool getXmtMessage(Message& item);
+
+ /**
+ * Remove and discard one message from the head of the transmit queue.
+ */
+ void popXmt();
+
+ /**
+ * Get the next application event from the agent engine.
+ *@param event The event iff the return value is true
+ *@return true if event is valid, false if there are no events to process
+ */
+ bool getEvent(AgentEvent& event);
+
+ /**
+ * Remove and discard one event from the head of the event queue.
+ */
+ void popEvent();
+
+ /**
+ * A new AMQP session has been established for Agent communication.
+ */
+ void newSession();
+
+ /**
+ * Start the QMF Agent protocol. This should be invoked after a SETUP_COMPLETE event
+ * is received from the Agent engine.
+ */
+ void startProtocol();
+
+ /**
+ * This method is called periodically so the agent can supply a heartbeat.
+ */
+ void heartbeat();
+
+ /**
+ * Respond to a method request.
+ *@param sequence The sequence number from the method request event.
+ *@param status The method's completion status.
+ *@param text Status text ("OK" or an error message)
+ *@param arguments The list of output arguments from the method call.
+ */
+ void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
+
+ /**
+ * Send a content indication to the QMF bus. This is only needed for objects that are
+ * managed by the application. This is *NOT* needed for objects managed by the Agent
+ * (inserted using addObject).
+ *@param sequence The sequence number of the GET request or the SYNC_START request.
+ *@param object The object (annotated with "changed" flags) for publication.
+ *@param prop If true, changed object properties are transmitted.
+ *@param stat If true, changed object statistics are transmitted.
+ */
+ void queryResponse(uint32_t sequence, Object& object, bool prop = true, bool stat = true);
+
+ /**
+ * Indicate the completion of a query. This is not used for SYNC_START requests.
+ *@param sequence The sequence number of the GET request.
+ */
+ void queryComplete(uint32_t sequence);
+
+ /**
+ * Register a schema class with the Agent.
+ *@param cls A SchemaObejctClass object that defines data managed by the agent.
+ */
+ void registerClass(SchemaObjectClass* cls);
+
+ /**
+ * Register a schema class with the Agent.
+ *@param cls A SchemaEventClass object that defines events sent by the agent.
+ */
+ void registerClass(SchemaEventClass* cls);
+
+ /**
+ * Give an object to the Agent for storage and management. Once added, the agent takes
+ * responsibility for the life cycle of the object.
+ *@param obj The object to be managed by the Agent.
+ *@param persistId A unique non-zero value if the object-id is to be persistent.
+ *@return The objectId of the managed object.
+ */
+ const ObjectId* addObject(Object& obj, uint64_t persistId);
+
+ /**
+ * Allocate an objecc-id for an object that will be managed by the application.
+ *@param persistId A unique non-zero value if the object-id is to be persistent.
+ @return The objectId structure for the allocated ID.
+ */
+ const ObjectId* allocObjectId(uint64_t persistId);
+ const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
+
+ /**
+ * Raise an event into the QMF network..
+ *@param event The event object for the event to be raised.
+ */
+ void raiseEvent(Event& event);
+
+ private:
+ AgentImpl* impl;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/Console.h b/cpp/src/qmf/Console.h
new file mode 100644
index 0000000000..de7949e1de
--- /dev/null
+++ b/cpp/src/qmf/Console.h
@@ -0,0 +1,82 @@
+#ifndef _QmfConsole_
+#define _QmfConsole_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/ManagedConnection.h>
+#include <qmf/Agent.h>
+#include <qmf/Broker.h>
+#include <qmf/Package.h>
+#include <qmf/SchemaClassTable.h>
+#include <qmf/Object.h>
+#include <qmf/ConsoleHandler.h>
+#include <set>
+#include <vector>
+#include <string>
+
+namespace qmf {
+
+ struct ConsoleSettings {
+ bool rcvObjects;
+ bool rcvEvents;
+ bool rcvHeartbeats;
+ bool userBindings;
+ uint32_t methodTimeout;
+ uint32_t getTimeout;
+
+ ConsoleSettings() :
+ rcvObjects(true),
+ rcvEvents(true),
+ rcvHeartbeats(true),
+ userBindings(false),
+ methodTimeout(20),
+ getTimeout(20) {}
+ };
+
+ class Console {
+ public:
+ Console(ConsoleHandler* handler = 0, ConsoleSettings settings = ConsoleSettings());
+ ~Console();
+
+ Broker* addConnection(ManagedConnection& connection);
+ void delConnection(Broker* broker);
+ void delConnection(ManagedConnection& connection);
+
+ const PackageMap& getPackages() const;
+
+ void bindPackage(const Package& package);
+ void bindPackage(const std::string& packageName);
+ void bindClass(const SchemaClass& otype);
+ void bindClass(const std::string& packageName, const std::string& className);
+
+ void getAgents(std::set<Agent>& agents, Broker* = 0);
+ void getObjects(std::vector<Object>& objects, const std::string& typeName,
+ const std::string& packageName = "",
+ Broker* broker = 0,
+ Agent* agent = 0);
+ void getObjects(std::vector<Object>& objects,
+ const std::map<std::string, std::string>& query,
+ Broker* broker = 0,
+ Agent* agent = 0);
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/Event.h b/cpp/src/qmf/Event.h
new file mode 100644
index 0000000000..f20c6d2fb1
--- /dev/null
+++ b/cpp/src/qmf/Event.h
@@ -0,0 +1,30 @@
+#ifndef _QmfEvent_
+#define _QmfEvent_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace qmf {
+
+ class Event {
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/Message.h b/cpp/src/qmf/Message.h
new file mode 100644
index 0000000000..7b3745b723
--- /dev/null
+++ b/cpp/src/qmf/Message.h
@@ -0,0 +1,39 @@
+#ifndef _QmfMessage_
+#define _QmfMessage_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <string>
+
+namespace qmf {
+
+ struct Message {
+ char* body;
+ uint32_t length;
+ char* destination;
+ char* routingKey;
+ char* replyExchange;
+ char* replyKey;
+ char* userId;
+ };
+
+}
+
+#endif
diff --git a/cpp/src/qmf/MessageImpl.cpp b/cpp/src/qmf/MessageImpl.cpp
new file mode 100644
index 0000000000..0f3dd7caaf
--- /dev/null
+++ b/cpp/src/qmf/MessageImpl.cpp
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "MessageImpl.h"
+#include <string.h>
+
+using namespace std;
+using namespace qmf;
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+Message MessageImpl::copy()
+{
+ Message item;
+
+ ::memset(&item, 0, sizeof(Message));
+ item.body = const_cast<char*>(body.c_str());
+ item.length = body.length();
+ STRING_REF(destination);
+ STRING_REF(routingKey);
+ STRING_REF(replyExchange);
+ STRING_REF(replyKey);
+ STRING_REF(userId);
+
+ return item;
+}
+
diff --git a/cpp/src/qmf/MessageImpl.h b/cpp/src/qmf/MessageImpl.h
new file mode 100644
index 0000000000..618e2c1940
--- /dev/null
+++ b/cpp/src/qmf/MessageImpl.h
@@ -0,0 +1,42 @@
+#ifndef _QmfMessageImpl_
+#define _QmfMessageImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Message.h"
+#include <string>
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+
+ struct MessageImpl {
+ typedef boost::shared_ptr<MessageImpl> Ptr;
+ std::string body;
+ std::string destination;
+ std::string routingKey;
+ std::string replyExchange;
+ std::string replyKey;
+ std::string userId;
+
+ Message copy();
+ };
+}
+
+#endif
diff --git a/cpp/src/qmf/Object.h b/cpp/src/qmf/Object.h
new file mode 100644
index 0000000000..6416aa3c8c
--- /dev/null
+++ b/cpp/src/qmf/Object.h
@@ -0,0 +1,47 @@
+#ifndef _QmfObject_
+#define _QmfObject_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/Schema.h>
+#include <qmf/ObjectId.h>
+#include <qmf/Value.h>
+
+namespace qmf {
+
+ class ObjectImpl;
+ class Object {
+ public:
+ Object(const SchemaObjectClass* type);
+ Object(ObjectImpl* impl);
+ ~Object();
+
+ void destroy();
+ const ObjectId* getObjectId() const;
+ void setObjectId(ObjectId* oid);
+ const SchemaObjectClass* getClass() const;
+ Value* getValue(char* key);
+
+ ObjectImpl* impl;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/ObjectId.h b/cpp/src/qmf/ObjectId.h
new file mode 100644
index 0000000000..d27c804773
--- /dev/null
+++ b/cpp/src/qmf/ObjectId.h
@@ -0,0 +1,53 @@
+#ifndef _QmfObjectId_
+#define _QmfObjectId_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdint.h>
+
+namespace qmf {
+
+ // TODO: Add to/from string and << operator
+
+ class ObjectIdImpl;
+ class ObjectId {
+ public:
+ ObjectId();
+ ObjectId(ObjectIdImpl* impl);
+ ~ObjectId();
+
+ uint64_t getObjectNum() const;
+ uint32_t getObjectNumHi() const;
+ uint32_t getObjectNumLo() const;
+ bool isDurable() const;
+
+ bool operator==(const ObjectId& other) const;
+ bool operator!=(const ObjectId& other) const;
+ bool operator<(const ObjectId& other) const;
+ bool operator>(const ObjectId& other) const;
+ bool operator<=(const ObjectId& other) const;
+ bool operator>=(const ObjectId& other) const;
+
+ ObjectIdImpl* impl;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/ObjectIdImpl.cpp b/cpp/src/qmf/ObjectIdImpl.cpp
new file mode 100644
index 0000000000..83fd6cc34f
--- /dev/null
+++ b/cpp/src/qmf/ObjectIdImpl.cpp
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ObjectIdImpl.h"
+#include <stdlib.h>
+
+using namespace std;
+using namespace qmf;
+using qpid::framing::Buffer;
+
+
+void AgentAttachment::setBanks(uint32_t broker, uint32_t agent)
+{
+ first =
+ ((uint64_t) (broker & 0x000fffff)) << 28 |
+ ((uint64_t) (agent & 0x0fffffff));
+}
+
+ObjectIdImpl::ObjectIdImpl(Buffer& buffer) : envelope(new ObjectId(this)), agent(0)
+{
+ decode(buffer);
+}
+
+ObjectIdImpl::ObjectIdImpl(AgentAttachment* a, uint8_t flags, uint16_t seq, uint64_t object) :
+ envelope(new ObjectId(this)), agent(a)
+{
+ first =
+ ((uint64_t) (flags & 0x0f)) << 60 |
+ ((uint64_t) (seq & 0x0fff)) << 48;
+ second = object;
+}
+
+void ObjectIdImpl::decode(Buffer& buffer)
+{
+ first = buffer.getLongLong();
+ second = buffer.getLongLong();
+}
+
+void ObjectIdImpl::encode(Buffer& buffer) const
+{
+ if (agent == 0)
+ buffer.putLongLong(first);
+ else
+ buffer.putLongLong(first | agent->first);
+ buffer.putLongLong(second);
+}
+
+void ObjectIdImpl::fromString(const std::string& repr)
+{
+#define FIELDS 5
+#if defined (_WIN32) && !defined (atoll)
+# define atoll(X) _atoi64(X)
+#endif
+
+ std::string copy(repr.c_str());
+ char* cText;
+ char* field[FIELDS];
+ bool atFieldStart = true;
+ int idx = 0;
+
+ cText = const_cast<char*>(copy.c_str());
+ for (char* cursor = cText; *cursor; cursor++) {
+ if (atFieldStart) {
+ if (idx >= FIELDS)
+ return; // TODO error
+ field[idx++] = cursor;
+ atFieldStart = false;
+ } else {
+ if (*cursor == '-') {
+ *cursor = '\0';
+ atFieldStart = true;
+ }
+ }
+ }
+
+ if (idx != FIELDS)
+ return; // TODO error
+
+ first = (atoll(field[0]) << 60) +
+ (atoll(field[1]) << 48) +
+ (atoll(field[2]) << 28) +
+ atoll(field[3]);
+ second = atoll(field[4]);
+ agent = 0;
+}
+
+bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const
+{
+ uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+
+ return first == otherFirst && second == other.second;
+}
+
+bool ObjectIdImpl::operator<(const ObjectIdImpl& other) const
+{
+ uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+
+ return (first < otherFirst) || ((first == otherFirst) && (second < other.second));
+}
+
+bool ObjectIdImpl::operator>(const ObjectIdImpl& other) const
+{
+ uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+
+ return (first > otherFirst) || ((first == otherFirst) && (second > other.second));
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+ObjectId::ObjectId()
+{
+ impl = new ObjectIdImpl(this);
+}
+
+ObjectId::ObjectId(ObjectIdImpl* i)
+{
+ impl = i;
+}
+
+ObjectId::~ObjectId()
+{
+ delete impl;
+}
+
+uint64_t ObjectId::getObjectNum() const
+{
+ return impl->getObjectNum();
+}
+
+uint32_t ObjectId::getObjectNumHi() const
+{
+ return impl->getObjectNumHi();
+}
+
+uint32_t ObjectId::getObjectNumLo() const
+{
+ return impl->getObjectNumLo();
+}
+
+bool ObjectId::isDurable() const
+{
+ return impl->isDurable();
+}
+
+bool ObjectId::operator==(const ObjectId& other) const
+{
+ return *impl == *other.impl;
+}
+
+bool ObjectId::operator!=(const ObjectId& other) const
+{
+ return !(*impl == *other.impl);
+}
+
+bool ObjectId::operator<(const ObjectId& other) const
+{
+ return *impl < *other.impl;
+}
+
+bool ObjectId::operator>(const ObjectId& other) const
+{
+ return *impl > *other.impl;
+}
+
+bool ObjectId::operator<=(const ObjectId& other) const
+{
+ return !(*impl > *other.impl);
+}
+
+bool ObjectId::operator>=(const ObjectId& other) const
+{
+ return !(*impl < *other.impl);
+}
diff --git a/cpp/src/qmf/ObjectIdImpl.h b/cpp/src/qmf/ObjectIdImpl.h
new file mode 100644
index 0000000000..5d8ee59aee
--- /dev/null
+++ b/cpp/src/qmf/ObjectIdImpl.h
@@ -0,0 +1,66 @@
+#ifndef _QmfObjectIdImpl_
+#define _QmfObjectIdImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/ObjectId.h>
+#include <qpid/framing/Buffer.h>
+
+namespace qmf {
+
+ struct AgentAttachment {
+ uint64_t first;
+
+ AgentAttachment() : first(0) {}
+ void setBanks(uint32_t broker, uint32_t bank);
+ uint64_t getFirst() const { return first; }
+ };
+
+ struct ObjectIdImpl {
+ ObjectId* envelope;
+ AgentAttachment* agent;
+ uint64_t first;
+ uint64_t second;
+
+ ObjectIdImpl(ObjectId* e) : envelope(e), agent(0) {}
+ ObjectIdImpl(qpid::framing::Buffer& buffer);
+ ObjectIdImpl(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object);
+
+ void decode(qpid::framing::Buffer& buffer);
+ void encode(qpid::framing::Buffer& buffer) const;
+ void fromString(const std::string& repr);
+ uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; }
+ uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; }
+ uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; }
+ uint32_t getAgentBank() const { return first & 0x000000000FFFFFFFLL; }
+ uint64_t getObjectNum() const { return second; }
+ uint32_t getObjectNumHi() const { return (uint32_t) (second >> 32); }
+ uint32_t getObjectNumLo() const { return (uint32_t) (second & 0x00000000FFFFFFFFLL); }
+ bool isDurable() const { return getSequence() == 0; }
+ void setValue(uint64_t f, uint64_t s) { first = f; second = s; }
+
+ bool operator==(const ObjectIdImpl& other) const;
+ bool operator<(const ObjectIdImpl& other) const;
+ bool operator>(const ObjectIdImpl& other) const;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/ObjectImpl.cpp b/cpp/src/qmf/ObjectImpl.cpp
new file mode 100644
index 0000000000..d00dd89b3d
--- /dev/null
+++ b/cpp/src/qmf/ObjectImpl.cpp
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ObjectImpl.h"
+#include "ValueImpl.h"
+#include <qpid/sys/Time.h>
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::sys;
+using qpid::framing::Buffer;
+
+ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) :
+ envelope(e), objectClass(type), createTime(uint64_t(Duration(now()))),
+ destroyTime(0), lastUpdatedTime(createTime)
+{
+ int propCount = objectClass->getPropertyCount();
+ int statCount = objectClass->getStatisticCount();
+ int idx;
+
+ for (idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* prop = objectClass->getProperty(idx);
+ properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
+ }
+
+ for (idx = 0; idx < statCount; idx++) {
+ const SchemaStatistic* stat = objectClass->getStatistic(idx);
+ statistics[stat->getName()] = ValuePtr(new Value(stat->getType()));
+ }
+}
+
+ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer) :
+ envelope(new Object(this)), objectClass(type), createTime(uint64_t(Duration(now()))),
+ destroyTime(0), lastUpdatedTime(createTime)
+{
+ int propCount = objectClass->getPropertyCount();
+ int statCount = objectClass->getStatisticCount();
+ int idx;
+ set<string> excludes;
+
+ parsePresenceMasks(buffer, excludes);
+ for (idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* prop = objectClass->getProperty(idx);
+ if (excludes.count(prop->getName()) != 0) {
+ properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
+ } else {
+ ValueImpl* pval = new ValueImpl(prop->getType(), buffer);
+ properties[prop->getName()] = ValuePtr(pval->envelope);
+ }
+ }
+
+ for (idx = 0; idx < statCount; idx++) {
+ const SchemaStatistic* stat = objectClass->getStatistic(idx);
+ ValueImpl* sval = new ValueImpl(stat->getType(), buffer);
+ statistics[stat->getName()] = ValuePtr(sval->envelope);
+ }
+}
+
+ObjectImpl::~ObjectImpl()
+{
+}
+
+void ObjectImpl::destroy()
+{
+ destroyTime = uint64_t(Duration(now()));
+ // TODO - flag deletion
+}
+
+Value* ObjectImpl::getValue(const string& key)
+{
+ map<string, ValuePtr>::const_iterator iter;
+
+ iter = properties.find(key);
+ if (iter != properties.end())
+ return iter->second.get();
+
+ iter = statistics.find(key);
+ if (iter != statistics.end())
+ return iter->second.get();
+
+ return 0;
+}
+
+void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList)
+{
+ int propCount = objectClass->getPropertyCount();
+ excludeList.clear();
+ uint8_t bit = 0;
+ uint8_t mask = 0;
+
+ for (int idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* prop = objectClass->getProperty(idx);
+ if (prop->isOptional()) {
+ if (bit == 0) {
+ mask = buffer.getOctet();
+ bit = 1;
+ }
+ if ((mask & bit) == 0)
+ excludeList.insert(string(prop->getName()));
+ if (bit == 0x80)
+ bit = 0;
+ else
+ bit = bit << 1;
+ }
+ }
+}
+
+void ObjectImpl::encodeSchemaKey(qpid::framing::Buffer& buffer) const
+{
+ buffer.putShortString(objectClass->getPackage());
+ buffer.putShortString(objectClass->getName());
+ buffer.putBin128(const_cast<uint8_t*>(objectClass->getHash()));
+}
+
+void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const
+{
+ buffer.putLongLong(lastUpdatedTime);
+ buffer.putLongLong(createTime);
+ buffer.putLongLong(destroyTime);
+ objectId->impl->encode(buffer);
+}
+
+void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const
+{
+ int propCount = objectClass->getPropertyCount();
+ uint8_t bit = 0;
+ uint8_t mask = 0;
+ ValuePtr value;
+
+ for (int idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* prop = objectClass->getProperty(idx);
+ if (prop->isOptional()) {
+ value = properties[prop->getName()];
+ if (bit == 0)
+ bit = 1;
+ if (!value->isNull())
+ mask |= bit;
+ if (bit == 0x80) {
+ buffer.putOctet(mask);
+ bit = 0;
+ mask = 0;
+ } else
+ bit = bit << 1;
+ }
+ }
+ if (bit != 0) {
+ buffer.putOctet(mask);
+ }
+
+ for (int idx = 0; idx < propCount; idx++) {
+ const SchemaProperty* prop = objectClass->getProperty(idx);
+ value = properties[prop->getName()];
+ if (!prop->isOptional() || !value->isNull()) {
+ value->impl->encode(buffer);
+ }
+ }
+}
+
+void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const
+{
+ int statCount = objectClass->getStatisticCount();
+ for (int idx = 0; idx < statCount; idx++) {
+ const SchemaStatistic* stat = objectClass->getStatistic(idx);
+ ValuePtr value = statistics[stat->getName()];
+ value->impl->encode(buffer);
+ }
+}
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(this, type)) {}
+
+Object::Object(ObjectImpl* i) : impl(i) {}
+
+Object::~Object()
+{
+ delete impl;
+}
+
+void Object::destroy()
+{
+ impl->destroy();
+}
+
+const ObjectId* Object::getObjectId() const
+{
+ return impl->getObjectId();
+}
+
+void Object::setObjectId(ObjectId* oid)
+{
+ impl->setObjectId(oid);
+}
+
+const SchemaObjectClass* Object::getClass() const
+{
+ return impl->getClass();
+}
+
+Value* Object::getValue(char* key)
+{
+ return impl->getValue(key);
+}
+
diff --git a/cpp/src/qmf/ObjectImpl.h b/cpp/src/qmf/ObjectImpl.h
new file mode 100644
index 0000000000..4dc2170bfc
--- /dev/null
+++ b/cpp/src/qmf/ObjectImpl.h
@@ -0,0 +1,62 @@
+#ifndef _QmfObjectImpl_
+#define _QmfObjectImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/Object.h>
+#include <map>
+#include <set>
+#include <string>
+#include <qpid/framing/Buffer.h>
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+
+ struct ObjectImpl {
+ typedef boost::shared_ptr<Value> ValuePtr;
+ Object* envelope;
+ const SchemaObjectClass* objectClass;
+ boost::shared_ptr<ObjectId> objectId;
+ uint64_t createTime;
+ uint64_t destroyTime;
+ uint64_t lastUpdatedTime;
+ mutable std::map<std::string, ValuePtr> properties;
+ mutable std::map<std::string, ValuePtr> statistics;
+
+ ObjectImpl(Object* e, const SchemaObjectClass* type);
+ ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer);
+ ~ObjectImpl();
+
+ void destroy();
+ const ObjectId* getObjectId() const { return objectId.get(); }
+ void setObjectId(ObjectId* oid) { objectId.reset(oid); }
+ const SchemaObjectClass* getClass() const { return objectClass; }
+ Value* getValue(const std::string& key);
+
+ void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList);
+ void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
+ void encodeManagedObjectData(qpid::framing::Buffer& buffer) const;
+ void encodeProperties(qpid::framing::Buffer& buffer) const;
+ void encodeStatistics(qpid::framing::Buffer& buffer) const;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/Query.h b/cpp/src/qmf/Query.h
new file mode 100644
index 0000000000..346d8c336b
--- /dev/null
+++ b/cpp/src/qmf/Query.h
@@ -0,0 +1,54 @@
+#ifndef _QmfQuery_
+#define _QmfQuery_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/ObjectId.h>
+#include <qmf/Value.h>
+
+namespace qmf {
+
+ class QueryImpl;
+ class Query {
+ public:
+ Query();
+ Query(QueryImpl* impl);
+ ~Query();
+
+ const char* getPackage() const;
+ const char* getClass() const;
+ const ObjectId* getObjectId() const;
+
+ enum Oper {
+ OPER_AND = 1,
+ OPER_OR = 2
+ };
+
+ int whereCount() const;
+ Oper whereOper() const;
+ const char* whereKey() const;
+ const Value* whereValue() const;
+
+ QueryImpl* impl;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/QueryImpl.cpp b/cpp/src/qmf/QueryImpl.cpp
new file mode 100644
index 0000000000..f74d9238ae
--- /dev/null
+++ b/cpp/src/qmf/QueryImpl.cpp
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "QueryImpl.h"
+
+using namespace std;
+using namespace qmf;
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Query::Query() : impl(new QueryImpl(this)) {}
+Query::Query(QueryImpl* i) : impl(i) {}
+
+Query::~Query()
+{
+ delete impl;
+}
+
+const char* Query::getPackage() const
+{
+ return impl->getPackage();
+}
+
+const char* Query::getClass() const
+{
+ return impl->getClass();
+}
+
+const ObjectId* Query::getObjectId() const
+{
+ return impl->getObjectId();
+}
+
+int Query::whereCount() const
+{
+ return impl->whereCount();
+}
+
+Query::Oper Query::whereOper() const
+{
+ return impl->whereOper();
+}
+
+const char* Query::whereKey() const
+{
+ return impl->whereKey();
+}
+
+const Value* Query::whereValue() const
+{
+ return impl->whereValue();
+}
+
diff --git a/cpp/src/qmf/QueryImpl.h b/cpp/src/qmf/QueryImpl.h
new file mode 100644
index 0000000000..1cb9bfe554
--- /dev/null
+++ b/cpp/src/qmf/QueryImpl.h
@@ -0,0 +1,48 @@
+#ifndef _QmfQueryImpl_
+#define _QmfQueryImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/Query.h>
+#include <string>
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+
+ struct QueryImpl {
+ Query* envelope;
+ std::string packageName;
+ std::string className;
+ boost::shared_ptr<ObjectId> oid;
+
+ QueryImpl(Query* e) : envelope(e) {}
+
+ const char* getPackage() const { return packageName.empty() ? 0 : packageName.c_str(); }
+ const char* getClass() const { return className.empty() ? 0 : className.c_str(); }
+ const ObjectId* getObjectId() const { return oid.get(); }
+
+ int whereCount() const { return 0;}
+ Query::Oper whereOper() const { return Query::OPER_AND; }
+ const char* whereKey() const { return 0; }
+ const Value* whereValue() const { return 0; }
+ };
+}
+
+#endif
diff --git a/cpp/src/qmf/ResilientConnection.cpp b/cpp/src/qmf/ResilientConnection.cpp
new file mode 100644
index 0000000000..7ebd0a47c1
--- /dev/null
+++ b/cpp/src/qmf/ResilientConnection.cpp
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ResilientConnection.h"
+#include "MessageImpl.h"
+#include <qpid/client/Session.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/client/SubscriptionManager.h>
+#include <qpid/client/Message.h>
+#include <qpid/sys/Thread.h>
+#include <qpid/sys/Runnable.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Condition.h>
+#include <qpid/log/Statement.h>
+#include <qpid/RefCounted.h>
+#include <boost/bind.hpp>
+#include <string>
+#include <deque>
+#include <vector>
+#include <set>
+#include <boost/intrusive_ptr.hpp>
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::client;
+using qpid::sys::Mutex;
+
+namespace qmf {
+ struct ResilientConnectionEventImpl {
+ ResilientConnectionEvent::EventKind kind;
+ void* sessionContext;
+ string errorText;
+ MessageImpl message;
+
+ ResilientConnectionEventImpl(ResilientConnectionEvent::EventKind k,
+ const MessageImpl& m = MessageImpl()) :
+ kind(k), sessionContext(0), message(m) {}
+ ResilientConnectionEvent copy();
+ };
+
+ struct RCSession : public MessageListener, public qpid::sys::Runnable, public qpid::RefCounted {
+ typedef boost::intrusive_ptr<RCSession> Ptr;
+ ResilientConnectionImpl& connImpl;
+ string name;
+ Connection& connection;
+ Session session;
+ SubscriptionManager* subscriptions;
+ void* userContext;
+ vector<string> dests;
+ qpid::sys::Thread thread;
+
+ RCSession(ResilientConnectionImpl& ci, const string& n, Connection& c, void* uc) :
+ connImpl(ci), name(n), connection(c), session(connection.newSession(name)),
+ subscriptions(new SubscriptionManager(session)), userContext(uc), thread(*this) {}
+ ~RCSession();
+ void received(qpid::client::Message& msg);
+ void run();
+ void stop();
+ };
+
+ class ResilientConnectionImpl : public qpid::sys::Runnable {
+ public:
+ ResilientConnectionImpl(ConnectionSettings& settings,
+ int dmin, int dmax, int dfactor);
+ ~ResilientConnectionImpl();
+
+ bool isConnected() const;
+ bool getEvent(ResilientConnectionEvent& event);
+ void popEvent();
+ bool createSession(const char* name, void* sessionContext, SessionHandle& handle);
+ void destroySession(SessionHandle handle);
+ void sendMessage(SessionHandle handle, qmf::Message& message);
+ void declareQueue(SessionHandle handle, char* queue);
+ void deleteQueue(SessionHandle handle, char* queue);
+ void bind(SessionHandle handle, char* exchange, char* queue, char* key);
+ void unbind(SessionHandle handle, char* exchange, char* queue, char* key);
+ void setNotifyFd(int fd);
+
+ void run();
+ void failure();
+ void sessionClosed(RCSession* sess);
+
+ void EnqueueEvent(ResilientConnectionEvent::EventKind kind,
+ void* sessionContext = 0,
+ const MessageImpl& message = MessageImpl(),
+ const string& errorText = "");
+
+ private:
+ int notifyFd;
+ bool connected;
+ bool shutdown;
+ string lastError;
+ ConnectionSettings settings;
+ Connection connection;
+ mutable qpid::sys::Mutex lock;
+ int delayMin;
+ int delayMax;
+ int delayFactor;
+ qpid::sys::Condition cond;
+ qpid::sys::Thread connThread;
+ deque<ResilientConnectionEventImpl> eventQueue;
+ set<RCSession::Ptr> sessions;
+ };
+}
+
+ResilientConnectionEvent ResilientConnectionEventImpl::copy()
+{
+ ResilientConnectionEvent item;
+
+ ::memset(&item, 0, sizeof(ResilientConnectionEvent));
+ item.kind = kind;
+ item.sessionContext = sessionContext;
+ item.message = message.copy();
+ item.errorText = const_cast<char*>(errorText.c_str());
+
+ return item;
+}
+
+RCSession::~RCSession()
+{
+ subscriptions->stop();
+ thread.join();
+ session.close();
+ delete subscriptions;
+}
+
+void RCSession::run()
+{
+ try {
+ subscriptions->run();
+ } catch (exception& e) {
+ connImpl.sessionClosed(this);
+ }
+}
+
+void RCSession::stop()
+{
+ subscriptions->stop();
+}
+
+void RCSession::received(qpid::client::Message& msg)
+{
+ qmf::MessageImpl qmsg;
+ qmsg.body = msg.getData();
+
+ qpid::framing::MessageProperties p = msg.getMessageProperties();
+ if (p.hasReplyTo()) {
+ const qpid::framing::ReplyTo& rt = p.getReplyTo();
+ qmsg.replyExchange = rt.getExchange();
+ qmsg.replyKey = rt.getRoutingKey();
+ }
+
+ if (p.hasUserId()) {
+ qmsg.userId = p.getUserId();
+ }
+
+ connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg);
+}
+
+ResilientConnectionImpl::ResilientConnectionImpl(ConnectionSettings& _settings,
+ int dmin, int dmax, int dfactor) :
+ notifyFd(-1), connected(false), shutdown(false), settings(_settings),
+ delayMin(dmin), delayMax(dmax), delayFactor(dfactor), connThread(*this)
+{
+ connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this));
+}
+
+ResilientConnectionImpl::~ResilientConnectionImpl()
+{
+ shutdown = true;
+ connected = false;
+ cond.notify();
+ connThread.join();
+ connection.close();
+}
+
+bool ResilientConnectionImpl::isConnected() const
+{
+ Mutex::ScopedLock _lock(lock);
+ return connected;
+}
+
+bool ResilientConnectionImpl::getEvent(ResilientConnectionEvent& event)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front().copy();
+ return true;
+}
+
+void ResilientConnectionImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+bool ResilientConnectionImpl::createSession(const char* name, void* sessionContext,
+ SessionHandle& handle)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!connected)
+ return false;
+
+ RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext));
+
+ handle.handle = (void*) sess.get();
+ sessions.insert(sess);
+
+ return true;
+}
+
+void ResilientConnectionImpl::destroySession(SessionHandle handle)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.handle);
+ set<RCSession::Ptr>::iterator iter = sessions.find(sess);
+ if (iter != sessions.end()) {
+ for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++)
+ sess->subscriptions->cancel(dIter->c_str());
+ sess->subscriptions->stop();
+ sess->subscriptions->wait();
+
+ sessions.erase(iter);
+ return;
+ }
+}
+
+void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& message)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.handle);
+ set<RCSession::Ptr>::iterator iter = sessions.find(sess);
+ qpid::client::Message msg;
+ string data(message.body, message.length);
+ msg.getDeliveryProperties().setRoutingKey(message.routingKey);
+ msg.getMessageProperties().setReplyTo(qpid::framing::ReplyTo(message.replyExchange, message.replyKey));
+ msg.setData(data);
+
+ try {
+ sess->session.messageTransfer(arg::content=msg, arg::destination=message.destination);
+ } catch(exception& e) {
+ QPID_LOG(error, "Session Exception during message-transfer: " << e.what());
+ sessions.erase(iter);
+ EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, (*iter)->userContext);
+ }
+}
+
+void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession* sess = (RCSession*) handle.handle;
+
+ sess->session.queueDeclare(arg::queue=queue, arg::autoDelete=true, arg::exclusive=true);
+ sess->subscriptions->subscribe(*sess, queue, queue);
+ sess->dests.push_back(string(queue));
+}
+
+void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession* sess = (RCSession*) handle.handle;
+
+ sess->session.queueDelete(arg::queue=queue);
+ for (vector<string>::iterator iter = sess->dests.begin();
+ iter != sess->dests.end(); iter++)
+ if (*iter == queue) {
+ sess->subscriptions->cancel(queue);
+ sess->dests.erase(iter);
+ break;
+ }
+}
+
+void ResilientConnectionImpl::bind(SessionHandle handle,
+ char* exchange, char* queue, char* key)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession* sess = (RCSession*) handle.handle;
+
+ sess->session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=key);
+}
+
+void ResilientConnectionImpl::unbind(SessionHandle handle,
+ char* exchange, char* queue, char* key)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession* sess = (RCSession*) handle.handle;
+
+ sess->session.exchangeUnbind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=key);
+}
+
+void ResilientConnectionImpl::setNotifyFd(int fd)
+{
+ notifyFd = fd;
+}
+
+void ResilientConnectionImpl::run()
+{
+ int delay(delayMin);
+
+ while (true) {
+ try {
+ connection.open(settings);
+ {
+ Mutex::ScopedLock _lock(lock);
+ connected = true;
+ EnqueueEvent(ResilientConnectionEvent::CONNECTED);
+
+ while (connected)
+ cond.wait(lock);
+
+ while (!sessions.empty()) {
+ set<RCSession::Ptr>::iterator iter = sessions.begin();
+ RCSession::Ptr sess = *iter;
+ sessions.erase(iter);
+ EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext);
+ Mutex::ScopedUnlock _u(lock);
+ sess->stop();
+ }
+
+ EnqueueEvent(ResilientConnectionEvent::DISCONNECTED);
+
+ if (shutdown)
+ return;
+ }
+ delay = delayMin;
+ connection.close();
+ } catch (exception &e) {
+ QPID_LOG(debug, "connection.open exception: " << e.what());
+ Mutex::ScopedLock _lock(lock);
+ lastError = e.what();
+ if (delay < delayMax)
+ delay *= delayFactor;
+ }
+
+ ::sleep(delay);
+ }
+}
+
+void ResilientConnectionImpl::failure()
+{
+ Mutex::ScopedLock _lock(lock);
+
+ connected = false;
+ lastError = "Closed by Peer";
+ cond.notify();
+}
+
+void ResilientConnectionImpl::sessionClosed(RCSession*)
+{
+ Mutex::ScopedLock _lock(lock);
+ connected = false;
+ lastError = "Closed due to Session failure";
+ cond.notify();
+}
+
+void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind kind,
+ void* sessionContext,
+ const MessageImpl& message,
+ const string& errorText)
+{
+ Mutex::ScopedLock _lock(lock);
+ ResilientConnectionEventImpl event(kind, message);
+
+ event.sessionContext = sessionContext;
+ event.errorText = errorText;
+
+ eventQueue.push_back(event);
+ if (notifyFd != -1)
+ ::write(notifyFd, ".", 1);
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+ResilientConnection::ResilientConnection(ConnectionSettings& settings,
+ int delayMin, int delayMax, int delayFactor)
+{
+ impl = new ResilientConnectionImpl(settings, delayMin, delayMax, delayFactor);
+}
+
+ResilientConnection::~ResilientConnection()
+{
+ delete impl;
+}
+
+bool ResilientConnection::isConnected() const
+{
+ return impl->isConnected();
+}
+
+bool ResilientConnection::getEvent(ResilientConnectionEvent& event)
+{
+ return impl->getEvent(event);
+}
+
+void ResilientConnection::popEvent()
+{
+ impl->popEvent();
+}
+
+bool ResilientConnection::createSession(const char* name, void* sessionContext, SessionHandle& handle)
+{
+ return impl->createSession(name, sessionContext, handle);
+}
+
+void ResilientConnection::destroySession(SessionHandle handle)
+{
+ impl->destroySession(handle);
+}
+
+void ResilientConnection::sendMessage(SessionHandle handle, qmf::Message& message)
+{
+ impl->sendMessage(handle, message);
+}
+
+void ResilientConnection::declareQueue(SessionHandle handle, char* queue)
+{
+ impl->declareQueue(handle, queue);
+}
+
+void ResilientConnection::deleteQueue(SessionHandle handle, char* queue)
+{
+ impl->deleteQueue(handle, queue);
+}
+
+void ResilientConnection::bind(SessionHandle handle, char* exchange, char* queue, char* key)
+{
+ impl->bind(handle, exchange, queue, key);
+}
+
+void ResilientConnection::unbind(SessionHandle handle, char* exchange, char* queue, char* key)
+{
+ impl->unbind(handle, exchange, queue, key);
+}
+
+void ResilientConnection::setNotifyFd(int fd)
+{
+ impl->setNotifyFd(fd);
+}
+
diff --git a/cpp/src/qmf/ResilientConnection.h b/cpp/src/qmf/ResilientConnection.h
new file mode 100644
index 0000000000..bb565e27ae
--- /dev/null
+++ b/cpp/src/qmf/ResilientConnection.h
@@ -0,0 +1,166 @@
+#ifndef _QmfResilientConnection_
+#define _QmfResilientConnection_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/Message.h>
+#include <qpid/client/Connection.h>
+#include <qpid/client/ConnectionSettings.h>
+#include <string>
+
+namespace qmf {
+
+ /**
+ * Represents events that occur, unsolicited, from ResilientConnection.
+ */
+ struct ResilientConnectionEvent {
+ enum EventKind {
+ CONNECTED = 1,
+ DISCONNECTED = 2,
+ SESSION_CLOSED = 3,
+ RECV = 4
+ };
+
+ EventKind kind;
+ void* sessionContext; // SESSION_CLOSED, RECV
+ char* errorText; // DISCONNECTED, SESSION_CLOSED
+ Message message; // RECV
+ };
+
+ struct SessionHandle {
+ void* handle;
+ };
+
+ class ResilientConnectionImpl;
+
+ /**
+ * ResilientConnection represents a Qpid connection that is resilient.
+ *
+ * Upon creation, ResilientConnection attempts to establish a connection to the
+ * messaging broker. If it fails, it will continue to retry at an interval that
+ * increases over time (to a maximum interval). If an extablished connection is
+ * dropped, a reconnect will be attempted.
+ */
+ class ResilientConnection {
+ public:
+
+ /**
+ * Create a new resilient connection.
+ *@param settings Settings that define how the connection is to be made.
+ *@param delayMin Minimum delay (in seconds) between retries.
+ *@param delayMax Maximum delay (in seconds) between retries.
+ *@param delayFactor Factor to multiply retry delay by after each failure.
+ */
+ ResilientConnection(qpid::client::ConnectionSettings& settings,
+ int delayMin = 1,
+ int delayMax = 128,
+ int delayFactor = 2);
+ ~ResilientConnection();
+
+ /**
+ * Get the connected status of the resilient connection.
+ *@return true iff the connection is established.
+ */
+ bool isConnected() const;
+
+ /**
+ * Get the next event (if present) from the connection.
+ *@param event Returned event if one is available.
+ *@return true if event is valid, false if there are no more events to handle.
+ */
+ bool getEvent(ResilientConnectionEvent& event);
+
+ /**
+ * Discard the event on the front of the queue. This should be invoked after processing
+ * the event from getEvent.
+ */
+ void popEvent();
+
+ /**
+ * Create a new AMQP session.
+ *@param name Unique name for the session.
+ *@param sessionContext Optional user-context value that will be provided in events
+ * pertaining to this session.
+ *@param handle Output handle to be stored and used in subsequent calls pertaining to
+ * this session.
+ *@return true iff the session was successfully created.
+ */
+ bool createSession(const char* name, void* sessionContext, SessionHandle& handle);
+
+ /**
+ * Destroy a created session.
+ *@param handle SessionHandle returned by createSession.
+ */
+ void destroySession(SessionHandle handle);
+
+ /**
+ * Send a message into the AMQP broker via a session.
+ *@param handle The session handle of the session to transmit through.
+ *@param message The QMF message to transmit.
+ */
+ void sendMessage(SessionHandle handle, Message& message);
+
+ /**
+ * Declare an exclusive, auto-delete queue for a session.
+ *@param handle The session handle for the owner of the queue.
+ *@param queue The name of the queue.
+ */
+ void declareQueue(SessionHandle handle, char* queue);
+
+ /**
+ * Delete a queue.
+ *@param handle The session handle for the owner of the queue.
+ *@param queue The name of the queue.
+ */
+ void deleteQueue(SessionHandle handle, char* queue);
+
+ /**
+ * Bind a queue to an exchange.
+ *@param handle The session handle of the session to use for binding.
+ *@param exchange The name of the exchange for binding.
+ *@param queue The name of the queue for binding.
+ *@param key The binding key.
+ */
+ void bind(SessionHandle handle, char* exchange, char* queue, char* key);
+
+ /**
+ * Remove a binding.
+ *@param handle The session handle of the session to use for un-binding.
+ *@param exchange The name of the exchange.
+ *@param queue The name of the queue.
+ *@param key The binding key.
+ */
+ void unbind(SessionHandle handle, char* exchange, char* queue, char* key);
+
+ /**
+ * Establish a file descriptor for event notification.
+ *@param fd A file descriptor into which the connection shall write a character each
+ * time an event is enqueued. This fd may be in a pair, the other fd of which
+ * is used in a select loop to control execution.
+ */
+ void setNotifyFd(int fd);
+
+ private:
+ ResilientConnectionImpl* impl;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/Schema.h b/cpp/src/qmf/Schema.h
new file mode 100644
index 0000000000..c4106393ff
--- /dev/null
+++ b/cpp/src/qmf/Schema.h
@@ -0,0 +1,160 @@
+#ifndef _QmfSchema_
+#define _QmfSchema_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/Typecode.h>
+#include <stdint.h>
+
+namespace qmf {
+
+ enum Access { ACCESS_READ_CREATE = 1, ACCESS_READ_WRITE = 2, ACCESS_READ_ONLY = 3 };
+ enum Direction { DIR_IN = 1, DIR_OUT = 2, DIR_IN_OUT = 3 };
+ enum ClassKind { CLASS_OBJECT = 1, CLASS_EVENT = 2 };
+
+ class SchemaArgumentImpl;
+ class SchemaMethodImpl;
+ class SchemaPropertyImpl;
+ class SchemaStatisticImpl;
+ class SchemaObjectClassImpl;
+ class SchemaEventClassImpl;
+
+ /**
+ */
+ class SchemaArgument {
+ public:
+ SchemaArgument(const char* name, Typecode typecode);
+ SchemaArgument(SchemaArgumentImpl* impl);
+ ~SchemaArgument();
+ void setDirection(Direction dir);
+ void setUnit(const char* val);
+ void setDesc(const char* desc);
+ const char* getName() const;
+ Typecode getType() const;
+ Direction getDirection() const;
+ const char* getUnit() const;
+ const char* getDesc() const;
+
+ SchemaArgumentImpl* impl;
+ };
+
+ /**
+ */
+ class SchemaMethod {
+ public:
+ SchemaMethod(const char* name);
+ SchemaMethod(SchemaMethodImpl* impl);
+ ~SchemaMethod();
+ void addArgument(const SchemaArgument& argument);
+ void setDesc(const char* desc);
+ const char* getName() const;
+ const char* getDesc() const;
+ int getArgumentCount() const;
+ const SchemaArgument* getArgument(int idx) const;
+
+ SchemaMethodImpl* impl;
+ };
+
+ /**
+ */
+ class SchemaProperty {
+ public:
+ SchemaProperty(const char* name, Typecode typecode);
+ SchemaProperty(SchemaPropertyImpl* impl);
+ ~SchemaProperty();
+ void setAccess(Access access);
+ void setIndex(bool val);
+ void setOptional(bool val);
+ void setUnit(const char* val);
+ void setDesc(const char* desc);
+ const char* getName() const;
+ Typecode getType() const;
+ Access getAccess() const;
+ bool isIndex() const;
+ bool isOptional() const;
+ const char* getUnit() const;
+ const char* getDesc() const;
+
+ SchemaPropertyImpl* impl;
+ };
+
+ /**
+ */
+ class SchemaStatistic {
+ public:
+ SchemaStatistic(const char* name, Typecode typecode);
+ SchemaStatistic(SchemaStatisticImpl* impl);
+ ~SchemaStatistic();
+ void setUnit(const char* val);
+ void setDesc(const char* desc);
+ const char* getName() const;
+ Typecode getType() const;
+ const char* getUnit() const;
+ const char* getDesc() const;
+
+ SchemaStatisticImpl* impl;
+ };
+
+ /**
+ */
+ class SchemaObjectClass {
+ public:
+ SchemaObjectClass(const char* package, const char* name);
+ SchemaObjectClass(SchemaObjectClassImpl* impl);
+ ~SchemaObjectClass();
+ void addProperty(const SchemaProperty& property);
+ void addStatistic(const SchemaStatistic& statistic);
+ void addMethod(const SchemaMethod& method);
+
+ const char* getPackage() const;
+ const char* getName() const;
+ const uint8_t* getHash() const;
+ int getPropertyCount() const;
+ int getStatisticCount() const;
+ int getMethodCount() const;
+ const SchemaProperty* getProperty(int idx) const;
+ const SchemaStatistic* getStatistic(int idx) const;
+ const SchemaMethod* getMethod(int idx) const;
+
+ SchemaObjectClassImpl* impl;
+ };
+
+ /**
+ */
+ class SchemaEventClass {
+ public:
+ SchemaEventClass(const char* package, const char* name);
+ SchemaEventClass(SchemaEventClassImpl* impl);
+ ~SchemaEventClass();
+ void addArgument(const SchemaArgument& argument);
+ void setDesc(const char* desc);
+
+ const char* getPackage() const;
+ const char* getName() const;
+ const uint8_t* getHash() const;
+ int getArgumentCount() const;
+ const SchemaArgument* getArgument(int idx) const;
+
+ SchemaEventClassImpl* impl;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/SchemaImpl.cpp b/cpp/src/qmf/SchemaImpl.cpp
new file mode 100644
index 0000000000..57d6148cac
--- /dev/null
+++ b/cpp/src/qmf/SchemaImpl.cpp
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "SchemaImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/FieldTable.h>
+#include <string>
+#include <vector>
+
+using namespace std;
+using namespace qmf;
+using qpid::framing::Buffer;
+using qpid::framing::FieldTable;
+
+SchemaHash::SchemaHash()
+{
+ for (int idx = 0; idx < 16; idx++)
+ hash[idx] = 0x5A;
+}
+
+void SchemaHash::encode(Buffer& buffer)
+{
+ buffer.putBin128(hash);
+}
+
+void SchemaHash::decode(Buffer& buffer)
+{
+ buffer.getBin128(hash);
+}
+
+void SchemaHash::update(uint8_t data)
+{
+ update((char*) &data, 1);
+}
+
+void SchemaHash::update(const char* data, uint32_t len)
+{
+ uint64_t* first = (uint64_t*) hash;
+ uint64_t* second = (uint64_t*) hash + 1;
+
+ for (uint32_t idx = 0; idx < len; idx++) {
+ *first = *first ^ (uint64_t) data[idx];
+ *second = *second << 1;
+ *second |= ((*first & 0x8000000000000000LL) >> 63);
+ *first = *first << 1;
+ *first = *first ^ *second;
+ }
+}
+
+SchemaArgumentImpl::SchemaArgumentImpl(Buffer& buffer) : envelope(new SchemaArgument(this))
+{
+ FieldTable map;
+ map.decode(buffer);
+
+ name = map.getAsString("name");
+ typecode = (Typecode) map.getAsInt("type");
+ unit = map.getAsString("unit");
+ description = map.getAsString("desc");
+
+ dir = DIR_IN;
+ string dstr(map.getAsString("dir"));
+ if (dstr == "O")
+ dir = DIR_OUT;
+ else if (dstr == "IO")
+ dir = DIR_IN_OUT;
+}
+
+void SchemaArgumentImpl::encode(Buffer& buffer) const
+{
+ FieldTable map;
+
+ map.setString("name", name);
+ map.setInt("type", (int) typecode);
+ if (dir == DIR_IN)
+ map.setString("dir", "I");
+ else if (dir == DIR_OUT)
+ map.setString("dir", "O");
+ else
+ map.setString("dir", "IO");
+ if (!unit.empty())
+ map.setString("unit", unit);
+ if (!description.empty())
+ map.setString("desc", description);
+
+ map.encode(buffer);
+}
+
+void SchemaArgumentImpl::updateHash(SchemaHash& hash) const
+{
+ hash.update(name);
+ hash.update(typecode);
+ hash.update(dir);
+ hash.update(unit);
+ hash.update(description);
+}
+
+SchemaMethodImpl::SchemaMethodImpl(Buffer& buffer) : envelope(new SchemaMethod(this))
+{
+ FieldTable map;
+ int argCount;
+
+ map.decode(buffer);
+ name = map.getAsString("name");
+ argCount = map.getAsInt("argCount");
+ description = map.getAsString("desc");
+
+ for (int idx = 0; idx < argCount; idx++) {
+ SchemaArgumentImpl* arg = new SchemaArgumentImpl(buffer);
+ addArgument(*arg->envelope);
+ }
+}
+
+void SchemaMethodImpl::encode(Buffer& buffer) const
+{
+ FieldTable map;
+
+ map.setString("name", name);
+ map.setInt("argCount", arguments.size());
+ if (!description.empty())
+ map.setString("desc", description);
+ map.encode(buffer);
+
+ for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin();
+ iter != arguments.end(); iter++)
+ (*iter)->encode(buffer);
+}
+
+void SchemaMethodImpl::addArgument(const SchemaArgument& argument)
+{
+ arguments.push_back(argument.impl);
+}
+
+const SchemaArgument* SchemaMethodImpl::getArgument(int idx) const
+{
+ int count = 0;
+ for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin();
+ iter != arguments.end(); iter++, count++)
+ if (idx == count)
+ return (*iter)->envelope;
+ return 0;
+}
+
+void SchemaMethodImpl::updateHash(SchemaHash& hash) const
+{
+ hash.update(name);
+ hash.update(description);
+ for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin();
+ iter != arguments.end(); iter++)
+ (*iter)->updateHash(hash);
+}
+
+SchemaPropertyImpl::SchemaPropertyImpl(Buffer& buffer) : envelope(new SchemaProperty(this))
+{
+ FieldTable map;
+ map.decode(buffer);
+
+ name = map.getAsString("name");
+ typecode = (Typecode) map.getAsInt("type");
+ access = (Access) map.getAsInt("access");
+ index = map.getAsInt("index") != 0;
+ optional = map.getAsInt("optional") != 0;
+ unit = map.getAsString("unit");
+ description = map.getAsString("desc");
+}
+
+void SchemaPropertyImpl::encode(Buffer& buffer) const
+{
+ FieldTable map;
+
+ map.setString("name", name);
+ map.setInt("type", (int) typecode);
+ map.setInt("access", (int) access);
+ map.setInt("index", index ? 1 : 0);
+ map.setInt("optional", optional ? 1 : 0);
+ if (!unit.empty())
+ map.setString("unit", unit);
+ if (!description.empty())
+ map.setString("desc", description);
+
+ map.encode(buffer);
+}
+
+void SchemaPropertyImpl::updateHash(SchemaHash& hash) const
+{
+ hash.update(name);
+ hash.update(typecode);
+ hash.update(access);
+ hash.update(index);
+ hash.update(optional);
+ hash.update(unit);
+ hash.update(description);
+}
+
+SchemaStatisticImpl::SchemaStatisticImpl(Buffer& buffer) : envelope(new SchemaStatistic(this))
+{
+ FieldTable map;
+ map.decode(buffer);
+
+ name = map.getAsString("name");
+ typecode = (Typecode) map.getAsInt("type");
+ unit = map.getAsString("unit");
+ description = map.getAsString("desc");
+}
+
+void SchemaStatisticImpl::encode(Buffer& buffer) const
+{
+ FieldTable map;
+
+ map.setString("name", name);
+ map.setInt("type", (int) typecode);
+ if (!unit.empty())
+ map.setString("unit", unit);
+ if (!description.empty())
+ map.setString("desc", description);
+
+ map.encode(buffer);
+}
+
+void SchemaStatisticImpl::updateHash(SchemaHash& hash) const
+{
+ hash.update(name);
+ hash.update(typecode);
+ hash.update(unit);
+ hash.update(description);
+}
+
+SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : envelope(new SchemaObjectClass(this)), hasHash(true)
+{
+ buffer.getShortString(package);
+ buffer.getShortString(name);
+ hash.decode(buffer);
+
+ uint16_t propCount = buffer.getShort();
+ uint16_t statCount = buffer.getShort();
+ uint16_t methodCount = buffer.getShort();
+
+ for (uint16_t idx = 0; idx < propCount; idx++) {
+ SchemaPropertyImpl* property = new SchemaPropertyImpl(buffer);
+ addProperty(*property->envelope);
+ }
+
+ for (uint16_t idx = 0; idx < statCount; idx++) {
+ SchemaStatisticImpl* statistic = new SchemaStatisticImpl(buffer);
+ addStatistic(*statistic->envelope);
+ }
+
+ for (uint16_t idx = 0; idx < methodCount; idx++) {
+ SchemaMethodImpl* method = new SchemaMethodImpl(buffer);
+ addMethod(*method->envelope);
+ }
+}
+
+void SchemaObjectClassImpl::encode(Buffer& buffer) const
+{
+ buffer.putOctet((uint8_t) CLASS_OBJECT);
+ buffer.putShortString(package);
+ buffer.putShortString(name);
+ hash.encode(buffer);
+ buffer.putShort((uint16_t) properties.size());
+ buffer.putShort((uint16_t) statistics.size());
+ buffer.putShort((uint16_t) methods.size());
+
+ for (vector<SchemaPropertyImpl*>::const_iterator iter = properties.begin();
+ iter != properties.end(); iter++)
+ (*iter)->encode(buffer);
+ for (vector<SchemaStatisticImpl*>::const_iterator iter = statistics.begin();
+ iter != statistics.end(); iter++)
+ (*iter)->encode(buffer);
+ for (vector<SchemaMethodImpl*>::const_iterator iter = methods.begin();
+ iter != methods.end(); iter++)
+ (*iter)->encode(buffer);
+}
+
+const uint8_t* SchemaObjectClassImpl::getHash() const
+{
+ if (!hasHash) {
+ hasHash = true;
+ hash.update(package);
+ hash.update(name);
+ for (vector<SchemaPropertyImpl*>::const_iterator iter = properties.begin();
+ iter != properties.end(); iter++)
+ (*iter)->updateHash(hash);
+ for (vector<SchemaStatisticImpl*>::const_iterator iter = statistics.begin();
+ iter != statistics.end(); iter++)
+ (*iter)->updateHash(hash);
+ for (vector<SchemaMethodImpl*>::const_iterator iter = methods.begin();
+ iter != methods.end(); iter++)
+ (*iter)->updateHash(hash);
+ }
+
+ return hash.get();
+}
+
+void SchemaObjectClassImpl::addProperty(const SchemaProperty& property)
+{
+ properties.push_back(property.impl);
+}
+
+void SchemaObjectClassImpl::addStatistic(const SchemaStatistic& statistic)
+{
+ statistics.push_back(statistic.impl);
+}
+
+void SchemaObjectClassImpl::addMethod(const SchemaMethod& method)
+{
+ methods.push_back(method.impl);
+}
+
+const SchemaProperty* SchemaObjectClassImpl::getProperty(int idx) const
+{
+ int count = 0;
+ for (vector<SchemaPropertyImpl*>::const_iterator iter = properties.begin();
+ iter != properties.end(); iter++, count++)
+ if (idx == count)
+ return (*iter)->envelope;
+ return 0;
+}
+
+const SchemaStatistic* SchemaObjectClassImpl::getStatistic(int idx) const
+{
+ int count = 0;
+ for (vector<SchemaStatisticImpl*>::const_iterator iter = statistics.begin();
+ iter != statistics.end(); iter++, count++)
+ if (idx == count)
+ return (*iter)->envelope;
+ return 0;
+}
+
+const SchemaMethod* SchemaObjectClassImpl::getMethod(int idx) const
+{
+ int count = 0;
+ for (vector<SchemaMethodImpl*>::const_iterator iter = methods.begin();
+ iter != methods.end(); iter++, count++)
+ if (idx == count)
+ return (*iter)->envelope;
+ return 0;
+}
+
+SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : envelope(new SchemaEventClass(this)), hasHash(true)
+{
+ buffer.getShortString(package);
+ buffer.getShortString(name);
+ hash.decode(buffer);
+
+ uint16_t argCount = buffer.getShort();
+
+ for (uint16_t idx = 0; idx < argCount; idx++) {
+ SchemaArgumentImpl* argument = new SchemaArgumentImpl(buffer);
+ addArgument(*argument->envelope);
+ }
+}
+
+void SchemaEventClassImpl::encode(Buffer& buffer) const
+{
+ buffer.putOctet((uint8_t) CLASS_EVENT);
+ buffer.putShortString(package);
+ buffer.putShortString(name);
+ hash.encode(buffer);
+ buffer.putShort((uint16_t) arguments.size());
+
+ for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin();
+ iter != arguments.end(); iter++)
+ (*iter)->encode(buffer);
+}
+
+const uint8_t* SchemaEventClassImpl::getHash() const
+{
+ if (!hasHash) {
+ hasHash = true;
+ hash.update(package);
+ hash.update(name);
+ for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin();
+ iter != arguments.end(); iter++)
+ (*iter)->updateHash(hash);
+ }
+ return hash.get();
+}
+
+void SchemaEventClassImpl::addArgument(const SchemaArgument& argument)
+{
+ arguments.push_back(argument.impl);
+}
+
+const SchemaArgument* SchemaEventClassImpl::getArgument(int idx) const
+{
+ int count = 0;
+ for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin();
+ iter != arguments.end(); iter++, count++)
+ if (idx == count)
+ return (*iter)->envelope;
+ return 0;
+}
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+SchemaArgument::SchemaArgument(const char* name, Typecode typecode)
+{
+ impl = new SchemaArgumentImpl(this, name, typecode);
+}
+
+SchemaArgument::SchemaArgument(SchemaArgumentImpl* i) : impl(i) {}
+
+SchemaArgument::~SchemaArgument()
+{
+ delete impl;
+}
+
+void SchemaArgument::setDirection(Direction dir)
+{
+ impl->setDirection(dir);
+}
+
+void SchemaArgument::setUnit(const char* val)
+{
+ impl->setUnit(val);
+}
+
+void SchemaArgument::setDesc(const char* desc)
+{
+ impl->setDesc(desc);
+}
+
+const char* SchemaArgument::getName() const
+{
+ return impl->getName().c_str();
+}
+
+Typecode SchemaArgument::getType() const
+{
+ return impl->getType();
+}
+
+Direction SchemaArgument::getDirection() const
+{
+ return impl->getDirection();
+}
+
+const char* SchemaArgument::getUnit() const
+{
+ return impl->getUnit().c_str();
+}
+
+const char* SchemaArgument::getDesc() const
+{
+ return impl->getDesc().c_str();
+}
+
+SchemaMethod::SchemaMethod(const char* name)
+{
+ impl = new SchemaMethodImpl(this, name);
+}
+
+SchemaMethod::SchemaMethod(SchemaMethodImpl* i) : impl(i) {}
+
+SchemaMethod::~SchemaMethod()
+{
+ delete impl;
+}
+
+void SchemaMethod::addArgument(const SchemaArgument& argument)
+{
+ impl->addArgument(argument);
+}
+
+void SchemaMethod::setDesc(const char* desc)
+{
+ impl->setDesc(desc);
+}
+
+const char* SchemaMethod::getName() const
+{
+ return impl->getName().c_str();
+}
+
+const char* SchemaMethod::getDesc() const
+{
+ return impl->getDesc().c_str();
+}
+
+int SchemaMethod::getArgumentCount() const
+{
+ return impl->getArgumentCount();
+}
+
+const SchemaArgument* SchemaMethod::getArgument(int idx) const
+{
+ return impl->getArgument(idx);
+}
+
+SchemaProperty::SchemaProperty(const char* name, Typecode typecode)
+{
+ impl = new SchemaPropertyImpl(this, name, typecode);
+}
+
+SchemaProperty::SchemaProperty(SchemaPropertyImpl* i) : impl(i) {}
+
+SchemaProperty::~SchemaProperty()
+{
+ delete impl;
+}
+
+void SchemaProperty::setAccess(Access access)
+{
+ impl->setAccess(access);
+}
+
+void SchemaProperty::setIndex(bool val)
+{
+ impl->setIndex(val);
+}
+
+void SchemaProperty::setOptional(bool val)
+{
+ impl->setOptional(val);
+}
+
+void SchemaProperty::setUnit(const char* val)
+{
+ impl->setUnit(val);
+}
+
+void SchemaProperty::setDesc(const char* desc)
+{
+ impl->setDesc(desc);
+}
+
+const char* SchemaProperty::getName() const
+{
+ return impl->getName().c_str();
+}
+
+Typecode SchemaProperty::getType() const
+{
+ return impl->getType();
+}
+
+Access SchemaProperty::getAccess() const
+{
+ return impl->getAccess();
+}
+
+bool SchemaProperty::isIndex() const
+{
+ return impl->isIndex();
+}
+
+bool SchemaProperty::isOptional() const
+{
+ return impl->isOptional();
+}
+
+const char* SchemaProperty::getUnit() const
+{
+ return impl->getUnit().c_str();
+}
+
+const char* SchemaProperty::getDesc() const
+{
+ return impl->getDesc().c_str();
+}
+
+SchemaStatistic::SchemaStatistic(const char* name, Typecode typecode)
+{
+ impl = new SchemaStatisticImpl(this, name, typecode);
+}
+
+SchemaStatistic::SchemaStatistic(SchemaStatisticImpl* i) : impl(i) {}
+
+SchemaStatistic::~SchemaStatistic()
+{
+ delete impl;
+}
+
+void SchemaStatistic::setUnit(const char* val)
+{
+ impl->setUnit(val);
+}
+
+void SchemaStatistic::setDesc(const char* desc)
+{
+ impl->setDesc(desc);
+}
+
+const char* SchemaStatistic::getName() const
+{
+ return impl->getName().c_str();
+}
+
+Typecode SchemaStatistic::getType() const
+{
+ return impl->getType();
+}
+
+const char* SchemaStatistic::getUnit() const
+{
+ return impl->getUnit().c_str();
+}
+
+const char* SchemaStatistic::getDesc() const
+{
+ return impl->getDesc().c_str();
+}
+
+SchemaObjectClass::SchemaObjectClass(const char* package, const char* name)
+{
+ impl = new SchemaObjectClassImpl(this, package, name);
+}
+
+SchemaObjectClass::SchemaObjectClass(SchemaObjectClassImpl* i) : impl(i) {}
+
+SchemaObjectClass::~SchemaObjectClass()
+{
+ delete impl;
+}
+
+void SchemaObjectClass::addProperty(const SchemaProperty& property)
+{
+ impl->addProperty(property);
+}
+
+void SchemaObjectClass::addStatistic(const SchemaStatistic& statistic)
+{
+ impl->addStatistic(statistic);
+}
+
+void SchemaObjectClass::addMethod(const SchemaMethod& method)
+{
+ impl->addMethod(method);
+}
+
+const char* SchemaObjectClass::getPackage() const
+{
+ return impl->getPackage().c_str();
+}
+
+const char* SchemaObjectClass::getName() const
+{
+ return impl->getName().c_str();
+}
+
+const uint8_t* SchemaObjectClass::getHash() const
+{
+ return impl->getHash();
+}
+
+int SchemaObjectClass::getPropertyCount() const
+{
+ return impl->getPropertyCount();
+}
+
+int SchemaObjectClass::getStatisticCount() const
+{
+ return impl->getStatisticCount();
+}
+
+int SchemaObjectClass::getMethodCount() const
+{
+ return impl->getMethodCount();
+}
+
+const SchemaProperty* SchemaObjectClass::getProperty(int idx) const
+{
+ return impl->getProperty(idx);
+}
+
+const SchemaStatistic* SchemaObjectClass::getStatistic(int idx) const
+{
+ return impl->getStatistic(idx);
+}
+
+const SchemaMethod* SchemaObjectClass::getMethod(int idx) const
+{
+ return impl->getMethod(idx);
+}
+
+SchemaEventClass::SchemaEventClass(const char* package, const char* name)
+{
+ impl = new SchemaEventClassImpl(this, package, name);
+}
+
+SchemaEventClass::SchemaEventClass(SchemaEventClassImpl* i) : impl(i) {}
+
+SchemaEventClass::~SchemaEventClass()
+{
+ delete impl;
+}
+
+void SchemaEventClass::addArgument(const SchemaArgument& argument)
+{
+ impl->addArgument(argument);
+}
+
+void SchemaEventClass::setDesc(const char* desc)
+{
+ impl->setDesc(desc);
+}
+
+const char* SchemaEventClass::getPackage() const
+{
+ return impl->getPackage().c_str();
+}
+
+const char* SchemaEventClass::getName() const
+{
+ return impl->getName().c_str();
+}
+
+const uint8_t* SchemaEventClass::getHash() const
+{
+ return impl->getHash();
+}
+
+int SchemaEventClass::getArgumentCount() const
+{
+ return impl->getArgumentCount();
+}
+
+const SchemaArgument* SchemaEventClass::getArgument(int idx) const
+{
+ return impl->getArgument(idx);
+}
+
diff --git a/cpp/src/qmf/SchemaImpl.h b/cpp/src/qmf/SchemaImpl.h
new file mode 100644
index 0000000000..fbc156c538
--- /dev/null
+++ b/cpp/src/qmf/SchemaImpl.h
@@ -0,0 +1,195 @@
+#ifndef _QmfSchemaImpl_
+#define _QmfSchemaImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Schema.h"
+#include <string>
+#include <vector>
+#include <qpid/framing/Buffer.h>
+
+namespace qmf {
+
+ // TODO: Destructors for schema classes
+ // TODO: Add "frozen" attribute for schema classes so they can't be modified after
+ // they've been registered.
+
+ class SchemaHash {
+ uint8_t hash[16];
+ public:
+ SchemaHash();
+ void encode(qpid::framing::Buffer& buffer);
+ void decode(qpid::framing::Buffer& buffer);
+ void update(const char* data, uint32_t len);
+ void update(uint8_t data);
+ void update(const std::string& data) { update(data.c_str(), data.size()); }
+ void update(Typecode t) { update((uint8_t) t); }
+ void update(Direction d) { update((uint8_t) d); }
+ void update(Access a) { update((uint8_t) a); }
+ void update(bool b) { update((uint8_t) (b ? 1 : 0)); }
+ const uint8_t* get() const { return hash; }
+ };
+
+ struct SchemaArgumentImpl {
+ SchemaArgument* envelope;
+ std::string name;
+ Typecode typecode;
+ Direction dir;
+ std::string unit;
+ std::string description;
+
+ SchemaArgumentImpl(SchemaArgument* e, const char* n, Typecode t) :
+ envelope(e), name(n), typecode(t), dir(DIR_IN) {}
+ SchemaArgumentImpl(qpid::framing::Buffer& buffer);
+ void encode(qpid::framing::Buffer& buffer) const;
+ void setDirection(Direction d) { dir = d; }
+ void setUnit(const char* val) { unit = val; }
+ void setDesc(const char* desc) { description = desc; }
+ const std::string& getName() const { return name; }
+ Typecode getType() const { return typecode; }
+ Direction getDirection() const { return dir; }
+ const std::string& getUnit() const { return unit; }
+ const std::string& getDesc() const { return description; }
+ void updateHash(SchemaHash& hash) const;
+ };
+
+ struct SchemaMethodImpl {
+ SchemaMethod* envelope;
+ std::string name;
+ std::string description;
+ std::vector<SchemaArgumentImpl*> arguments;
+
+ SchemaMethodImpl(SchemaMethod* e, const char* n) : envelope(e), name(n) {}
+ SchemaMethodImpl(qpid::framing::Buffer& buffer);
+ void encode(qpid::framing::Buffer& buffer) const;
+ void addArgument(const SchemaArgument& argument);
+ void setDesc(const char* desc) { description = desc; }
+ const std::string& getName() const { return name; }
+ const std::string& getDesc() const { return description; }
+ int getArgumentCount() const { return arguments.size(); }
+ const SchemaArgument* getArgument(int idx) const;
+ void updateHash(SchemaHash& hash) const;
+ };
+
+ struct SchemaPropertyImpl {
+ SchemaProperty* envelope;
+ std::string name;
+ Typecode typecode;
+ Access access;
+ bool index;
+ bool optional;
+ std::string unit;
+ std::string description;
+
+ SchemaPropertyImpl(SchemaProperty* e, const char* n, Typecode t) :
+ envelope(e), name(n), typecode(t), access(ACCESS_READ_ONLY),
+ index(false), optional(false) {}
+ SchemaPropertyImpl(qpid::framing::Buffer& buffer);
+ void encode(qpid::framing::Buffer& buffer) const;
+ void setAccess(Access a) { access = a; }
+ void setIndex(bool val) { index = val; }
+ void setOptional(bool val) { optional = val; }
+ void setUnit(const char* val) { unit = val; }
+ void setDesc(const char* desc) { description = desc; }
+ const std::string& getName() const { return name; }
+ Typecode getType() const { return typecode; }
+ Access getAccess() const { return access; }
+ bool isIndex() const { return index; }
+ bool isOptional() const { return optional; }
+ const std::string& getUnit() const { return unit; }
+ const std::string& getDesc() const { return description; }
+ void updateHash(SchemaHash& hash) const;
+ };
+
+ struct SchemaStatisticImpl {
+ SchemaStatistic* envelope;
+ std::string name;
+ Typecode typecode;
+ std::string unit;
+ std::string description;
+
+ SchemaStatisticImpl(SchemaStatistic* e, const char* n, Typecode t) :
+ envelope(e), name(n), typecode(t) {}
+ SchemaStatisticImpl(qpid::framing::Buffer& buffer);
+ void encode(qpid::framing::Buffer& buffer) const;
+ void setUnit(const char* val) { unit = val; }
+ void setDesc(const char* desc) { description = desc; }
+ const std::string& getName() const { return name; }
+ Typecode getType() const { return typecode; }
+ const std::string& getUnit() const { return unit; }
+ const std::string& getDesc() const { return description; }
+ void updateHash(SchemaHash& hash) const;
+ };
+
+ struct SchemaObjectClassImpl {
+ SchemaObjectClass* envelope;
+ std::string package;
+ std::string name;
+ mutable SchemaHash hash;
+ mutable bool hasHash;
+ std::vector<SchemaPropertyImpl*> properties;
+ std::vector<SchemaStatisticImpl*> statistics;
+ std::vector<SchemaMethodImpl*> methods;
+
+ SchemaObjectClassImpl(SchemaObjectClass* e, const char* p, const char* n) :
+ envelope(e), package(p), name(n), hasHash(false) {}
+ SchemaObjectClassImpl(qpid::framing::Buffer& buffer);
+ void encode(qpid::framing::Buffer& buffer) const;
+ void addProperty(const SchemaProperty& property);
+ void addStatistic(const SchemaStatistic& statistic);
+ void addMethod(const SchemaMethod& method);
+
+ const std::string& getPackage() const { return package; }
+ const std::string& getName() const { return name; }
+ const uint8_t* getHash() const;
+ int getPropertyCount() const { return properties.size(); }
+ int getStatisticCount() const { return statistics.size(); }
+ int getMethodCount() const { return methods.size(); }
+ const SchemaProperty* getProperty(int idx) const;
+ const SchemaStatistic* getStatistic(int idx) const;
+ const SchemaMethod* getMethod(int idx) const;
+ };
+
+ struct SchemaEventClassImpl {
+ SchemaEventClass* envelope;
+ std::string package;
+ std::string name;
+ mutable SchemaHash hash;
+ mutable bool hasHash;
+ std::string description;
+ std::vector<SchemaArgumentImpl*> arguments;
+
+ SchemaEventClassImpl(SchemaEventClass* e, const char* p, const char* n) :
+ envelope(e), package(p), name(n), hasHash(false) {}
+ SchemaEventClassImpl(qpid::framing::Buffer& buffer);
+ void encode(qpid::framing::Buffer& buffer) const;
+ void addArgument(const SchemaArgument& argument);
+ void setDesc(const char* desc) { description = desc; }
+
+ const std::string& getPackage() const { return package; }
+ const std::string& getName() const { return name; }
+ const uint8_t* getHash() const;
+ int getArgumentCount() const { return arguments.size(); }
+ const SchemaArgument* getArgument(int idx) const;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/Typecode.h b/cpp/src/qmf/Typecode.h
new file mode 100644
index 0000000000..94614d2977
--- /dev/null
+++ b/cpp/src/qmf/Typecode.h
@@ -0,0 +1,51 @@
+#ifndef _QmfTypecode_
+#define _QmfTypecode_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace qmf {
+
+ enum Typecode {
+ TYPE_UINT8 = 1,
+ TYPE_UINT16 = 2,
+ TYPE_UINT32 = 3,
+ TYPE_UINT64 = 4,
+ TYPE_SSTR = 6,
+ TYPE_LSTR = 7,
+ TYPE_ABSTIME = 8,
+ TYPE_DELTATIME = 9,
+ TYPE_REF = 10,
+ TYPE_BOOL = 11,
+ TYPE_FLOAT = 12,
+ TYPE_DOUBLE = 13,
+ TYPE_UUID = 14,
+ TYPE_MAP = 15,
+ TYPE_INT8 = 16,
+ TYPE_INT16 = 17,
+ TYPE_INT32 = 18,
+ TYPE_INT64 = 19,
+ TYPE_OBJECT = 20,
+ TYPE_LIST = 21,
+ TYPE_ARRAY = 22
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/Value.h b/cpp/src/qmf/Value.h
new file mode 100644
index 0000000000..7d54293e08
--- /dev/null
+++ b/cpp/src/qmf/Value.h
@@ -0,0 +1,113 @@
+#ifndef _QmfValue_
+#define _QmfValue_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/ObjectId.h>
+#include <qmf/Typecode.h>
+
+namespace qmf {
+
+ class Object;
+ class ValueImpl;
+
+ class Value {
+ public:
+ Value();
+ Value(Typecode t, Typecode arrayType = TYPE_UINT8);
+ Value(ValueImpl* impl);
+ ~Value();
+
+ Typecode getType() const;
+ bool isNull() const;
+ void setNull();
+
+ bool isObjectId() const;
+ const ObjectId& asObjectId() const;
+ void setObjectId(const ObjectId& oid);
+
+ bool isUint() const;
+ uint32_t asUint() const;
+ void setUint(uint32_t val);
+
+ bool isInt() const;
+ int32_t asInt() const;
+ void setInt(int32_t val);
+
+ bool isUint64() const;
+ uint64_t asUint64() const;
+ void setUint64(uint64_t val);
+
+ bool isInt64() const;
+ int64_t asInt64() const;
+ void setInt64(int64_t val);
+
+ bool isString() const;
+ const char* asString() const;
+ void setString(const char* val);
+
+ bool isBool() const;
+ bool asBool() const;
+ void setBool(bool val);
+
+ bool isFloat() const;
+ float asFloat() const;
+ void setFloat(float val);
+
+ bool isDouble() const;
+ double asDouble() const;
+ void setDouble(double val);
+
+ bool isUuid() const;
+ const uint8_t* asUuid() const;
+ void setUuid(const uint8_t* val);
+
+ bool isObject() const;
+ Object* asObject() const;
+ void setObject(Object* val);
+
+ bool isMap() const;
+ bool keyInMap(const char* key) const;
+ Value* byKey(const char* key);
+ const Value* byKey(const char* key) const;
+ void deleteKey(const char* key);
+ void insert(const char* key, Value* val);
+ uint32_t keyCount() const;
+ const char* key(uint32_t idx) const;
+
+ bool isList() const;
+ uint32_t listItemCount() const;
+ Value* listItem(uint32_t idx);
+ void appendToList(Value* val);
+ void deleteListItem(uint32_t idx);
+
+ bool isArray() const;
+ Typecode arrayType() const;
+ uint32_t arrayItemCount() const;
+ Value* arrayItem(uint32_t idx);
+ void appendToArray(Value* val);
+ void deleteArrayItem(uint32_t idx);
+
+ ValueImpl* impl;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/ValueImpl.cpp b/cpp/src/qmf/ValueImpl.cpp
new file mode 100644
index 0000000000..538255ea20
--- /dev/null
+++ b/cpp/src/qmf/ValueImpl.cpp
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ValueImpl.h"
+#include <qpid/framing/FieldTable.h>
+
+using namespace std;
+using namespace qmf;
+using qpid::framing::Buffer;
+
+ValueImpl::ValueImpl(Typecode t, Buffer& buf) : envelope(new Value(this)), typecode(t)
+{
+ uint64_t first;
+ uint64_t second;
+ qpid::framing::FieldTable ft;
+
+ switch (typecode) {
+ case TYPE_UINT8 : value.u32 = (uint32_t) buf.getOctet(); break;
+ case TYPE_UINT16 : value.u32 = (uint32_t) buf.getShort(); break;
+ case TYPE_UINT32 : value.u32 = (uint32_t) buf.getLong(); break;
+ case TYPE_UINT64 : value.u64 = buf.getLongLong(); break;
+ case TYPE_SSTR : buf.getShortString(stringVal); break;
+ case TYPE_LSTR : buf.getMediumString(stringVal); break;
+ case TYPE_ABSTIME : value.s64 = buf.getLongLong(); break;
+ case TYPE_DELTATIME : value.u64 = buf.getLongLong(); break;
+ case TYPE_BOOL : value.boolVal = (buf.getOctet() != 0); break;
+ case TYPE_FLOAT : value.floatVal = buf.getFloat(); break;
+ case TYPE_DOUBLE : value.doubleVal = buf.getDouble(); break;
+ case TYPE_INT8 : value.s32 = (int32_t) buf.getOctet(); break;
+ case TYPE_INT16 : value.s32 = (int32_t) buf.getShort(); break;
+ case TYPE_INT32 : value.s32 = (int32_t) buf.getLong(); break;
+ case TYPE_INT64 : value.s64 = buf.getLongLong(); break;
+ case TYPE_UUID : buf.getBin128(value.uuidVal); break;
+ case TYPE_REF:
+ first = buf.getLongLong();
+ second = buf.getLongLong();
+ refVal.impl->setValue(first, second);
+ break;
+
+ case TYPE_MAP:
+ ft.decode(buf);
+ // TODO: either update to recursively use QMF types or reduce to int/string/...
+ // (maybe use another ctor with a FieldValue argument)
+ break;
+
+ case TYPE_LIST:
+ case TYPE_ARRAY:
+ case TYPE_OBJECT:
+ default:
+ break;
+ }
+}
+
+ValueImpl::ValueImpl(Typecode t) : envelope(new Value(this)), typecode(t)
+{
+ ::memset(&value, 0, sizeof(value));
+}
+
+ValueImpl::~ValueImpl()
+{
+}
+
+void ValueImpl::encode(Buffer& buf) const
+{
+ switch (typecode) {
+ case TYPE_UINT8 : buf.putOctet((uint8_t) value.u32); break;
+ case TYPE_UINT16 : buf.putShort((uint16_t) value.u32); break;
+ case TYPE_UINT32 : buf.putLong(value.u32); break;
+ case TYPE_UINT64 : buf.putLongLong(value.u64); break;
+ case TYPE_SSTR : buf.putShortString(stringVal); break;
+ case TYPE_LSTR : buf.putMediumString(stringVal); break;
+ case TYPE_ABSTIME : buf.putLongLong(value.s64); break;
+ case TYPE_DELTATIME : buf.putLongLong(value.u64); break;
+ case TYPE_BOOL : buf.putOctet(value.boolVal ? 1 : 0); break;
+ case TYPE_FLOAT : buf.putFloat(value.floatVal); break;
+ case TYPE_DOUBLE : buf.putDouble(value.doubleVal); break;
+ case TYPE_INT8 : buf.putOctet((uint8_t) value.s32); break;
+ case TYPE_INT16 : buf.putShort((uint16_t) value.s32); break;
+ case TYPE_INT32 : buf.putLong(value.s32); break;
+ case TYPE_INT64 : buf.putLongLong(value.s64); break;
+ case TYPE_UUID : buf.putBin128(value.uuidVal); break;
+ case TYPE_REF : refVal.impl->encode(buf); break;
+ case TYPE_MAP: // TODO
+ case TYPE_LIST:
+ case TYPE_ARRAY:
+ case TYPE_OBJECT:
+ default:
+ break;
+ }
+}
+
+bool ValueImpl::keyInMap(const char* key) const
+{
+ return typecode == TYPE_MAP && mapVal.count(key) > 0;
+}
+
+Value* ValueImpl::byKey(const char* key)
+{
+ if (keyInMap(key)) {
+ map<std::string, VPtr>::iterator iter = mapVal.find(key);
+ if (iter != mapVal.end())
+ return iter->second.get();
+ }
+ return 0;
+}
+
+const Value* ValueImpl::byKey(const char* key) const
+{
+ if (keyInMap(key)) {
+ map<std::string, VPtr>::const_iterator iter = mapVal.find(key);
+ if (iter != mapVal.end())
+ return iter->second.get();
+ }
+ return 0;
+}
+
+void ValueImpl::deleteKey(const char* key)
+{
+ mapVal.erase(key);
+}
+
+void ValueImpl::insert(const char* key, Value* val)
+{
+ mapVal[key] = VPtr(val);
+}
+
+const char* ValueImpl::key(uint32_t idx) const
+{
+ map<std::string, VPtr>::const_iterator iter = mapVal.begin();
+ for (uint32_t i = 0; i < idx; i++) {
+ if (iter == mapVal.end())
+ break;
+ iter++;
+ }
+
+ if (iter == mapVal.end())
+ return 0;
+ else
+ return iter->first.c_str();
+}
+
+Value* ValueImpl::listItem(uint32_t)
+{
+ return 0;
+}
+
+void ValueImpl::appendToList(Value*)
+{
+}
+
+void ValueImpl::deleteListItem(uint32_t)
+{
+}
+
+Value* ValueImpl::arrayItem(uint32_t)
+{
+ return 0;
+}
+
+void ValueImpl::appendToArray(Value*)
+{
+}
+
+void ValueImpl::deleteArrayItem(uint32_t)
+{
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Value::Value(Typecode t, Typecode at)
+{
+ impl = new ValueImpl(this, t, at);
+}
+
+Value::Value(ValueImpl* i)
+{
+ impl = i;
+}
+
+Value::~Value()
+{
+ delete impl;
+}
+
+Typecode Value::getType() const
+{
+ return impl->getType();
+}
+
+bool Value::isNull() const
+{
+ return impl->isNull();
+}
+
+void Value::setNull()
+{
+ impl->setNull();
+}
+
+bool Value::isObjectId() const
+{
+ return impl->isObjectId();
+}
+
+const ObjectId& Value::asObjectId() const
+{
+ return impl->asObjectId();
+}
+
+void Value::setObjectId(const ObjectId& oid)
+{
+ impl->setObjectId(oid);
+}
+
+bool Value::isUint() const
+{
+ return impl->isUint();
+}
+
+uint32_t Value::asUint() const
+{
+ return impl->asUint();
+}
+
+void Value::setUint(uint32_t val)
+{
+ impl->setUint(val);
+}
+
+bool Value::isInt() const
+{
+ return impl->isInt();
+}
+
+int32_t Value::asInt() const
+{
+ return impl->asInt();
+}
+
+void Value::setInt(int32_t val)
+{
+ impl->setInt(val);
+}
+
+bool Value::isUint64() const
+{
+ return impl->isUint64();
+}
+
+uint64_t Value::asUint64() const
+{
+ return impl->asUint64();
+}
+
+void Value::setUint64(uint64_t val)
+{
+ impl->setUint64(val);
+}
+
+bool Value::isInt64() const
+{
+ return impl->isInt64();
+}
+
+int64_t Value::asInt64() const
+{
+ return impl->asInt64();
+}
+
+void Value::setInt64(int64_t val)
+{
+ impl->setInt64(val);
+}
+
+bool Value::isString() const
+{
+ return impl->isString();
+}
+
+const char* Value::asString() const
+{
+ return impl->asString();
+}
+
+void Value::setString(const char* val)
+{
+ impl->setString(val);
+}
+
+bool Value::isBool() const
+{
+ return impl->isBool();
+}
+
+bool Value::asBool() const
+{
+ return impl->asBool();
+}
+
+void Value::setBool(bool val)
+{
+ impl->setBool(val);
+}
+
+bool Value::isFloat() const
+{
+ return impl->isFloat();
+}
+
+float Value::asFloat() const
+{
+ return impl->asFloat();
+}
+
+void Value::setFloat(float val)
+{
+ impl->setFloat(val);
+}
+
+bool Value::isDouble() const
+{
+ return impl->isDouble();
+}
+
+double Value::asDouble() const
+{
+ return impl->asDouble();
+}
+
+void Value::setDouble(double val)
+{
+ impl->setDouble(val);
+}
+
+bool Value::isUuid() const
+{
+ return impl->isUuid();
+}
+
+const uint8_t* Value::asUuid() const
+{
+ return impl->asUuid();
+}
+
+void Value::setUuid(const uint8_t* val)
+{
+ impl->setUuid(val);
+}
+
+bool Value::isObject() const
+{
+ return impl->isObject();
+}
+
+Object* Value::asObject() const
+{
+ return impl->asObject();
+}
+
+void Value::setObject(Object* val)
+{
+ impl->setObject(val);
+}
+
+bool Value::isMap() const
+{
+ return impl->isMap();
+}
+
+bool Value::keyInMap(const char* key) const
+{
+ return impl->keyInMap(key);
+}
+
+Value* Value::byKey(const char* key)
+{
+ return impl->byKey(key);
+}
+
+const Value* Value::byKey(const char* key) const
+{
+ return impl->byKey(key);
+}
+
+void Value::deleteKey(const char* key)
+{
+ impl->deleteKey(key);
+}
+
+void Value::insert(const char* key, Value* val)
+{
+ impl->insert(key, val);
+}
+
+uint32_t Value::keyCount() const
+{
+ return impl->keyCount();
+}
+
+const char* Value::key(uint32_t idx) const
+{
+ return impl->key(idx);
+}
+
+bool Value::isList() const
+{
+ return impl->isList();
+}
+
+uint32_t Value::listItemCount() const
+{
+ return impl->listItemCount();
+}
+
+Value* Value::listItem(uint32_t idx)
+{
+ return impl->listItem(idx);
+}
+
+void Value::appendToList(Value* val)
+{
+ impl->appendToList(val);
+}
+
+void Value::deleteListItem(uint32_t idx)
+{
+ impl->deleteListItem(idx);
+}
+
+bool Value::isArray() const
+{
+ return impl->isArray();
+}
+
+Typecode Value::arrayType() const
+{
+ return impl->arrayType();
+}
+
+uint32_t Value::arrayItemCount() const
+{
+ return impl->arrayItemCount();
+}
+
+Value* Value::arrayItem(uint32_t idx)
+{
+ return impl->arrayItem(idx);
+}
+
+void Value::appendToArray(Value* val)
+{
+ impl->appendToArray(val);
+}
+
+void Value::deleteArrayItem(uint32_t idx)
+{
+ impl->deleteArrayItem(idx);
+}
+
diff --git a/cpp/src/qmf/ValueImpl.h b/cpp/src/qmf/ValueImpl.h
new file mode 100644
index 0000000000..cf33035bf7
--- /dev/null
+++ b/cpp/src/qmf/ValueImpl.h
@@ -0,0 +1,144 @@
+#ifndef _QmfValueImpl_
+#define _QmfValueImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/Value.h>
+#include <qmf/ObjectIdImpl.h>
+#include <qmf/Object.h>
+#include <qpid/framing/Buffer.h>
+#include <string>
+#include <string.h>
+#include <map>
+#include <vector>
+#include <boost/shared_ptr.hpp>
+
+namespace qmf {
+
+ // TODO: set valid flag on all value settors
+ // TODO: add a modified flag and accessors
+
+ struct ValueImpl {
+ typedef boost::shared_ptr<Value> VPtr;
+ typedef boost::shared_ptr<Object> OPtr;
+ Value* envelope;
+ const Typecode typecode;
+ bool valid;
+
+ ObjectId refVal;
+ std::string stringVal;
+ OPtr objectVal;
+ std::map<std::string, VPtr> mapVal;
+ std::vector<VPtr> vectorVal;
+ Typecode arrayTypecode;
+
+ union {
+ uint32_t u32;
+ uint64_t u64;
+ int32_t s32;
+ int64_t s64;
+ bool boolVal;
+ float floatVal;
+ double doubleVal;
+ uint8_t uuidVal[16];
+ } value;
+
+ ValueImpl(Value* e, Typecode t, Typecode at) :
+ envelope(e), typecode(t), valid(false), arrayTypecode(at) {}
+ ValueImpl(Typecode t, qpid::framing::Buffer& b);
+ ValueImpl(Typecode t);
+ ~ValueImpl();
+
+ void encode(qpid::framing::Buffer& b) const;
+
+ Typecode getType() const { return typecode; }
+ bool isNull() const { return !valid; }
+ void setNull() { valid = false; }
+
+ bool isObjectId() const { return typecode == TYPE_REF; }
+ const ObjectId& asObjectId() const { return refVal; }
+ void setObjectId(const ObjectId& o) { refVal = o; } // TODO
+
+ bool isUint() const { return typecode >= TYPE_UINT8 && typecode <= TYPE_UINT32; }
+ uint32_t asUint() const { return value.u32; }
+ void setUint(uint32_t val) { value.u32 = val; }
+
+ bool isInt() const { return typecode >= TYPE_INT8 && typecode <= TYPE_INT32; }
+ int32_t asInt() const { return value.s32; }
+ void setInt(int32_t val) { value.s32 = val; }
+
+ bool isUint64() const { return typecode == TYPE_UINT64 || typecode == TYPE_DELTATIME; }
+ uint64_t asUint64() const { return value.u64; }
+ void setUint64(uint64_t val) { value.u64 = val; }
+
+ bool isInt64() const { return typecode == TYPE_INT64 || typecode == TYPE_ABSTIME; }
+ int64_t asInt64() const { return value.s64; }
+ void setInt64(int64_t val) { value.s64 = val; }
+
+ bool isString() const { return typecode == TYPE_SSTR || typecode == TYPE_LSTR; }
+ const char* asString() const { return stringVal.c_str(); }
+ void setString(const char* val) { stringVal = val; }
+
+ bool isBool() const { return typecode == TYPE_BOOL; }
+ bool asBool() const { return value.boolVal; }
+ void setBool(bool val) { value.boolVal = val; }
+
+ bool isFloat() const { return typecode == TYPE_FLOAT; }
+ float asFloat() const { return value.floatVal; }
+ void setFloat(float val) { value.floatVal = val; }
+
+ bool isDouble() const { return typecode == TYPE_DOUBLE; }
+ double asDouble() const { return value.doubleVal; }
+ void setDouble(double val) { value.doubleVal = val; }
+
+ bool isUuid() const { return typecode == TYPE_UUID; }
+ const uint8_t* asUuid() const { return value.uuidVal; }
+ void setUuid(const uint8_t* val) { ::memcpy(value.uuidVal, val, 16); }
+
+ bool isObject() const { return typecode == TYPE_OBJECT; }
+ Object* asObject() const { return objectVal.get(); }
+ void setObject(Object* val) { objectVal.reset(val); }
+
+ bool isMap() const { return typecode == TYPE_MAP; }
+ bool keyInMap(const char* key) const;
+ Value* byKey(const char* key);
+ const Value* byKey(const char* key) const;
+ void deleteKey(const char* key);
+ void insert(const char* key, Value* val);
+ uint32_t keyCount() const { return mapVal.size(); }
+ const char* key(uint32_t idx) const;
+
+ bool isList() const { return typecode == TYPE_LIST; }
+ uint32_t listItemCount() const { return vectorVal.size(); }
+ Value* listItem(uint32_t idx);
+ void appendToList(Value* val);
+ void deleteListItem(uint32_t idx);
+
+ bool isArray() const { return typecode == TYPE_ARRAY; }
+ Typecode arrayType() const { return arrayTypecode; }
+ uint32_t arrayItemCount() const { return vectorVal.size(); }
+ Value* arrayItem(uint32_t idx);
+ void appendToArray(Value* val);
+ void deleteArrayItem(uint32_t idx);
+ };
+}
+
+#endif
+