diff options
author | Ted Ross <tross@apache.org> | 2009-05-22 21:40:57 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-05-22 21:40:57 +0000 |
commit | 36319d26120c163c0c36598038859dad716ac358 (patch) | |
tree | 5e3087d14842f7ac87aaa89513ff204e0f1d40de /cpp/src | |
parent | 2cd113fc9e5c810ca8045dc1d70bcd6efd685f47 (diff) | |
download | qpid-python-36319d26120c163c0c36598038859dad716ac358.tar.gz |
QPID-1874 - First drop of the second-generation QMF libraries.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@777720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/CMakeLists.txt | 18 | ||||
-rw-r--r-- | cpp/src/qmf.mk | 38 | ||||
-rw-r--r-- | cpp/src/qmf/Agent.cpp | 962 | ||||
-rw-r--r-- | cpp/src/qmf/Agent.h | 206 | ||||
-rw-r--r-- | cpp/src/qmf/Console.h | 82 | ||||
-rw-r--r-- | cpp/src/qmf/Event.h | 30 | ||||
-rw-r--r-- | cpp/src/qmf/Message.h | 39 | ||||
-rw-r--r-- | cpp/src/qmf/MessageImpl.cpp | 43 | ||||
-rw-r--r-- | cpp/src/qmf/MessageImpl.h | 42 | ||||
-rw-r--r-- | cpp/src/qmf/Object.h | 47 | ||||
-rw-r--r-- | cpp/src/qmf/ObjectId.h | 53 | ||||
-rw-r--r-- | cpp/src/qmf/ObjectIdImpl.cpp | 192 | ||||
-rw-r--r-- | cpp/src/qmf/ObjectIdImpl.h | 66 | ||||
-rw-r--r-- | cpp/src/qmf/ObjectImpl.cpp | 222 | ||||
-rw-r--r-- | cpp/src/qmf/ObjectImpl.h | 62 | ||||
-rw-r--r-- | cpp/src/qmf/Query.h | 54 | ||||
-rw-r--r-- | cpp/src/qmf/QueryImpl.cpp | 71 | ||||
-rw-r--r-- | cpp/src/qmf/QueryImpl.h | 48 | ||||
-rw-r--r-- | cpp/src/qmf/ResilientConnection.cpp | 460 | ||||
-rw-r--r-- | cpp/src/qmf/ResilientConnection.h | 166 | ||||
-rw-r--r-- | cpp/src/qmf/Schema.h | 160 | ||||
-rw-r--r-- | cpp/src/qmf/SchemaImpl.cpp | 740 | ||||
-rw-r--r-- | cpp/src/qmf/SchemaImpl.h | 195 | ||||
-rw-r--r-- | cpp/src/qmf/Typecode.h | 51 | ||||
-rw-r--r-- | cpp/src/qmf/Value.h | 113 | ||||
-rw-r--r-- | cpp/src/qmf/ValueImpl.cpp | 478 | ||||
-rw-r--r-- | cpp/src/qmf/ValueImpl.h | 144 |
27 files changed, 4778 insertions, 4 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index a4affcf010..222ea91c63 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -578,12 +578,28 @@ set (qmfagent_SOURCES qpid/agent/ManagementAgent.h qpid/agent/ManagementAgentImpl.cpp qpid/agent/ManagementAgentImpl.h + qmf/Agent.cpp ) add_library (qmfagent SHARED ${qmfagent_SOURCES}) -target_link_libraries (qmfagent qpidclient) +target_link_libraries (qmfagent qmfcommon) set_target_properties (qmfagent PROPERTIES VERSION ${qpidc_version}) +set (qmfcommon_SOURCES + qmf/Agent.cpp + qmf/ResilientConnection.cpp + qmf/MessageImpl.cpp + qmf/SchemaImpl.cpp + qmf/ValueImpl.cpp + qmf/ObjectIdImpl.cpp + qmf/ObjectImpl.cpp + qmf/QueryImpl.cpp + ) +add_library (qmfcommon SHARED ${qmfcommon_SOURCES}) +target_link_libraries (qmfcommon qpidclient) +set_target_properties (qmfcommon PROPERTIES + VERSION ${qpidc_version}) + # QMF console library #module_hdr += \ # qpid/console/Agent.h \ diff --git a/cpp/src/qmf.mk b/cpp/src/qmf.mk index ebd11e04f7..d5ae0e721c 100644 --- a/cpp/src/qmf.mk +++ b/cpp/src/qmf.mk @@ -20,17 +20,49 @@ # # qmf agent library makefile fragment, to be included in Makefile.am # -lib_LTLIBRARIES += libqmfagent.la +lib_LTLIBRARIES += \ + libqmfcommon.la \ + libqmfagent.la module_hdr += \ qpid/agent/ManagementAgent.h \ qpid/agent/ManagementAgentImpl.h \ - qpid/agent/QmfAgentImportExport.h + qpid/agent/QmfAgentImportExport.h \ + qmf/Agent.h \ + qmf/Console.h \ + qmf/Event.h \ + qmf/Message.h \ + qmf/MessageImpl.h \ + qmf/Object.h \ + qmf/ObjectId.h \ + qmf/ObjectIdImpl.h \ + qmf/ObjectImpl.h \ + qmf/Query.h \ + qmf/QueryImpl.h \ + qmf/ResilientConnection.h \ + qmf/Schema.h \ + qmf/SchemaImpl.h \ + qmf/Typecode.h \ + qmf/Value.h \ + qmf/ValueImpl.h + +libqmfcommon_la_SOURCES = \ + qmf/Agent.cpp \ + qmf/ResilientConnection.cpp \ + qmf/MessageImpl.cpp \ + qmf/SchemaImpl.cpp \ + qmf/ValueImpl.cpp \ + qmf/ObjectIdImpl.cpp \ + qmf/ObjectImpl.cpp \ + qmf/QueryImpl.cpp libqmfagent_la_SOURCES = \ qpid/agent/ManagementAgent.h \ qpid/agent/ManagementAgentImpl.cpp \ - qpid/agent/ManagementAgentImpl.h + qpid/agent/ManagementAgentImpl.h \ + qmf/Agent.cpp libqmfagent_la_LIBADD = libqpidclient.la +libqmfagent_ladir = $(includedir)/qmf +libqmfagent_la_HEADERS = $(module_hdr) diff --git a/cpp/src/qmf/Agent.cpp b/cpp/src/qmf/Agent.cpp new file mode 100644 index 0000000000..1071e445c8 --- /dev/null +++ b/cpp/src/qmf/Agent.cpp @@ -0,0 +1,962 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "Agent.h" +#include "MessageImpl.h" +#include "SchemaImpl.h" +#include "Typecode.h" +#include "ObjectImpl.h" +#include "ObjectIdImpl.h" +#include "QueryImpl.h" +#include "ValueImpl.h" +#include <qpid/framing/Buffer.h> +#include <qpid/framing/Uuid.h> +#include <qpid/framing/FieldTable.h> +#include <qpid/framing/FieldValue.h> +#include <qpid/sys/Mutex.h> +#include <qpid/log/Statement.h> +#include <qpid/sys/Time.h> +#include <string.h> +#include <string> +#include <deque> +#include <map> +#include <unistd.h> +#include <fcntl.h> +#include <iostream> +#include <fstream> +#include <boost/shared_ptr.hpp> + +using namespace std; +using namespace qmf; +using namespace qpid::framing; +using namespace qpid::sys; + +namespace qmf { + + struct AgentEventImpl { + typedef boost::shared_ptr<AgentEventImpl> Ptr; + AgentEvent::EventKind kind; + uint32_t sequence; + string authUserId; + string authToken; + string name; + Object* object; + boost::shared_ptr<ObjectId> objectId; + Query query; + boost::shared_ptr<Value> arguments; + string exchange; + string bindingKey; + SchemaObjectClass* objectClass; + + AgentEventImpl(AgentEvent::EventKind k) : + kind(k), sequence(0), object(0), objectClass(0) {} + ~AgentEventImpl() {} + AgentEvent copy(); + }; + + struct AgentQueryContext { + typedef boost::shared_ptr<AgentQueryContext> Ptr; + uint32_t sequence; + string exchange; + string key; + SchemaMethodImpl* schemaMethod; + AgentQueryContext() : schemaMethod(0) {} + }; + + class AgentImpl { + public: + AgentImpl(char* label, bool internalStore); + ~AgentImpl(); + + void setStoreDir(char* path); + void setTransferDir(char* path); + void handleRcvMessage(Message& message); + bool getXmtMessage(Message& item); + void popXmt(); + bool getEvent(AgentEvent& event); + void popEvent(); + void newSession(); + void startProtocol(); + void heartbeat(); + void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments); + void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat); + void queryComplete(uint32_t sequence); + void registerClass(SchemaObjectClass* cls); + void registerClass(SchemaEventClass* cls); + const ObjectId* addObject(Object& obj, uint64_t persistId); + const ObjectId* allocObjectId(uint64_t persistId); + const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi); + void raiseEvent(Event& event); + + private: + Mutex lock; + Mutex addLock; + string label; + string queueName; + string storeDir; + string transferDir; + bool internalStore; + uint64_t nextTransientId; + Uuid systemId; + uint32_t requestedBrokerBank; + uint32_t requestedAgentBank; + uint32_t assignedBrokerBank; + uint32_t assignedAgentBank; + AgentAttachment attachment; + uint16_t bootSequence; + uint64_t nextObjectId; + uint32_t nextContextNum; + deque<AgentEventImpl::Ptr> eventQueue; + deque<MessageImpl::Ptr> xmtQueue; + map<uint32_t, AgentQueryContext::Ptr> contextMap; + + static const char* QMF_EXCHANGE; + static const char* DIR_EXCHANGE; + static const char* BROKER_KEY; + static const uint32_t MERR_UNKNOWN_METHOD = 2; + static const uint32_t MERR_UNKNOWN_PACKAGE = 8; + static const uint32_t MERR_UNKNOWN_CLASS = 9; + static const uint32_t MERR_INTERNAL_ERROR = 10; +# define MA_BUFFER_SIZE 65536 + char outputBuffer[MA_BUFFER_SIZE]; + + struct SchemaClassKey { + string name; + uint8_t hash[16]; + SchemaClassKey(const string& n, const uint8_t* h) : name(n) { + memcpy(hash, h, 16); + } + SchemaClassKey(Buffer& buffer) { + buffer.getShortString(name); + buffer.getBin128(hash); + } + string repr() { + return name; + } + }; + + struct SchemaClassKeyComp { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + typedef map<SchemaClassKey, SchemaObjectClassImpl*, SchemaClassKeyComp> ObjectClassMap; + typedef map<SchemaClassKey, SchemaEventClassImpl*, SchemaClassKeyComp> EventClassMap; + + struct ClassMaps { + ObjectClassMap objectClasses; + EventClassMap eventClasses; + }; + + map<string, ClassMaps> packages; + + bool checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq); + void encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq = 0); + AgentEventImpl::Ptr eventDeclareQueue(const string& queueName); + AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); + AgentEventImpl::Ptr eventSetupComplete(); + AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls, + boost::shared_ptr<ObjectId> oid); + AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method, + boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap, + SchemaObjectClass* objectClass); + void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); + + void sendPackageIndicationLH(const string& packageName); + void sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key); + void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq, + uint32_t code = 0, const string& text = "OK"); + void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text=""); + void handleAttachResponse(Buffer& inBuffer); + void handlePackageRequest(Buffer& inBuffer); + void handleClassQuery(Buffer& inBuffer); + void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, + const string& replyToExchange, const string& replyToKey); + void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId); + void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId); + void handleConsoleAddedIndication(); + }; +} + +const char* AgentImpl::QMF_EXCHANGE = "qpid.management"; +const char* AgentImpl::DIR_EXCHANGE = "amq.direct"; +const char* AgentImpl::BROKER_KEY = "broker"; + +#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} + +AgentEvent AgentEventImpl::copy() +{ + AgentEvent item; + + ::memset(&item, 0, sizeof(AgentEvent)); + item.kind = kind; + item.sequence = sequence; + item.object = object; + item.objectId = objectId.get(); + item.query = &query; + item.arguments = arguments.get(); + item.objectClass = objectClass; + + STRING_REF(authUserId); + STRING_REF(authToken); + STRING_REF(name); + STRING_REF(exchange); + STRING_REF(bindingKey); + + return item; +} + +AgentImpl::AgentImpl(char* _label, bool i) : + label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1), + requestedBrokerBank(0), requestedAgentBank(0), + assignedBrokerBank(0), assignedAgentBank(0), + bootSequence(1), nextObjectId(1), nextContextNum(1) +{ + queueName += label; +} + +AgentImpl::~AgentImpl() +{ + cout << "AgentImpl::~AgentImpl" << endl; +} + +void AgentImpl::setStoreDir(char* path) +{ + Mutex::ScopedLock _lock(lock); + if (path) + storeDir = path; + else + storeDir.clear(); +} + +void AgentImpl::setTransferDir(char* path) +{ + Mutex::ScopedLock _lock(lock); + if (path) + transferDir = path; + else + transferDir.clear(); +} + +void AgentImpl::handleRcvMessage(Message& message) +{ + Buffer inBuffer(message.body, message.length); + uint8_t opcode; + uint32_t sequence; + string replyToExchange(message.replyExchange ? message.replyExchange : ""); + string replyToKey(message.replyKey ? message.replyKey : ""); + string userId(message.userId ? message.userId : ""); + + if (checkHeader(inBuffer, &opcode, &sequence)) { + if (opcode == 'a') handleAttachResponse(inBuffer); + else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); + else if (opcode == 'x') handleConsoleAddedIndication(); + else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey, userId); + else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey, userId); + } +} + +bool AgentImpl::getXmtMessage(Message& item) +{ + Mutex::ScopedLock _lock(lock); + if (xmtQueue.empty()) + return false; + item = xmtQueue.front()->copy(); + return true; +} + +void AgentImpl::popXmt() +{ + Mutex::ScopedLock _lock(lock); + if (!xmtQueue.empty()) + xmtQueue.pop_front(); +} + +bool AgentImpl::getEvent(AgentEvent& event) +{ + Mutex::ScopedLock _lock(lock); + if (eventQueue.empty()) + return false; + event = eventQueue.front()->copy(); + return true; +} + +void AgentImpl::popEvent() +{ + Mutex::ScopedLock _lock(lock); + if (!eventQueue.empty()) + eventQueue.pop_front(); +} + +void AgentImpl::newSession() +{ + Mutex::ScopedLock _lock(lock); + eventQueue.clear(); + xmtQueue.clear(); + eventQueue.push_back(eventDeclareQueue(queueName)); + eventQueue.push_back(eventBind("amq.direct", queueName, queueName)); + eventQueue.push_back(eventSetupComplete()); +} + +void AgentImpl::startProtocol() +{ + Mutex::ScopedLock _lock(lock); + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + encodeHeader(buffer, 'A'); + buffer.putShortString("qmfa"); + systemId.encode(buffer); + buffer.putLong(requestedBrokerBank); + buffer.putLong(requestedAgentBank); + sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank << + " reqAgent=" << requestedAgentBank); +} + +void AgentImpl::heartbeat() +{ + Mutex::ScopedLock _lock(lock); + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + + encodeHeader(buffer, 'h'); + buffer.putLongLong(uint64_t(Duration(now()))); + stringstream key; + key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; + sendBufferLH(buffer, QMF_EXCHANGE, key.str()); + QPID_LOG(trace, "SENT HeartbeatIndication"); +} + +void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, + const Value& argMap) +{ + Mutex::ScopedLock _lock(lock); + map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); + if (iter == contextMap.end()) + return; + AgentQueryContext::Ptr context = iter->second; + contextMap.erase(iter); + + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 'm', context->sequence); + buffer.putLong(status); + buffer.putMediumString(text); + if (status == 0) { + for (vector<SchemaArgumentImpl*>::const_iterator aIter = context->schemaMethod->arguments.begin(); + aIter != context->schemaMethod->arguments.end(); aIter++) { + const SchemaArgumentImpl* schemaArg = *aIter; + if (schemaArg->dir == DIR_OUT || schemaArg->dir == DIR_IN_OUT) { + if (argMap.keyInMap(schemaArg->name.c_str())) { + const Value* val = argMap.byKey(schemaArg->name.c_str()); + val->impl->encode(buffer); + } else { + Value val(schemaArg->typecode); + val.impl->encode(buffer); + } + } + } + } + sendBufferLH(buffer, context->exchange, context->key); + QPID_LOG(trace, "SENT MethodResponse"); +} + +void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) +{ + Mutex::ScopedLock _lock(lock); + map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); + if (iter == contextMap.end()) + return; + AgentQueryContext::Ptr context = iter->second; + + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 'g', context->sequence); + + object.impl->encodeSchemaKey(buffer); + object.impl->encodeManagedObjectData(buffer); + if (prop) + object.impl->encodeProperties(buffer); + if (stat) + object.impl->encodeStatistics(buffer); + + sendBufferLH(buffer, context->exchange, context->key); + QPID_LOG(trace, "SENT ContentIndication"); +} + +void AgentImpl::queryComplete(uint32_t sequence) +{ + Mutex::ScopedLock _lock(lock); + map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); + if (iter == contextMap.end()) + return; + + AgentQueryContext::Ptr context = iter->second; + contextMap.erase(iter); + sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK"); +} + +void AgentImpl::registerClass(SchemaObjectClass* cls) +{ + Mutex::ScopedLock _lock(lock); + SchemaObjectClassImpl* impl = cls->impl; + + map<string, ClassMaps>::iterator iter = packages.find(impl->package); + if (iter == packages.end()) { + packages[impl->package] = ClassMaps(); + iter = packages.find(impl->package); + // TODO: Indicate this package if connected + } + + SchemaClassKey key(impl->name, impl->getHash()); + iter->second.objectClasses[key] = impl; + + // TODO: Indicate this schema if connected. +} + +void AgentImpl::registerClass(SchemaEventClass* cls) +{ + Mutex::ScopedLock _lock(lock); + SchemaEventClassImpl* impl = cls->impl; + + map<string, ClassMaps>::iterator iter = packages.find(impl->package); + if (iter == packages.end()) { + packages[impl->package] = ClassMaps(); + iter = packages.find(impl->package); + // TODO: Indicate this package if connected + } + + SchemaClassKey key(impl->name, impl->getHash()); + iter->second.eventClasses[key] = impl; + + // TODO: Indicate this schema if connected. +} + +const ObjectId* AgentImpl::addObject(Object&, uint64_t) +{ + Mutex::ScopedLock _lock(lock); + return 0; +} + +const ObjectId* AgentImpl::allocObjectId(uint64_t persistId) +{ + Mutex::ScopedLock _lock(lock); + uint16_t sequence = persistId ? 0 : bootSequence; + uint64_t objectNum = persistId ? persistId : nextObjectId++; + + ObjectIdImpl* oid = new ObjectIdImpl(&attachment, 0, sequence, objectNum); + return oid->envelope; +} + +const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) +{ + return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo); +} + +void AgentImpl::raiseEvent(Event&) +{ + Mutex::ScopedLock _lock(lock); +} + +void AgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet('A'); + buf.putOctet('M'); + buf.putOctet('2'); + buf.putOctet(opcode); + buf.putLong (seq); +} + +bool AgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + if (buf.getSize() < 8) + return false; + + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); + + *opcode = buf.getOctet(); + *seq = buf.getLong(); + + return h1 == 'A' && h2 == 'M' && h3 == '2'; +} + +AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name) +{ + AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE)); + event->name = name; + + return event; +} + +AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue, + const string& key) +{ + AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND)); + event->name = queue; + event->exchange = exchange; + event->bindingKey = key; + + return event; +} + +AgentEventImpl::Ptr AgentImpl::eventSetupComplete() +{ + AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE)); + return event; +} + +AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package, + const string& cls, boost::shared_ptr<ObjectId> oid) +{ + AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); + event->sequence = num; + event->authUserId = userId; + event->query.impl->packageName = package; + event->query.impl->className = cls; + event->query.impl->oid = oid; + return event; +} + +AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method, + boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap, + SchemaObjectClass* objectClass) +{ + AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL)); + event->sequence = num; + event->authUserId = userId; + event->name = method; + event->objectId = oid; + event->arguments = argMap; + event->objectClass = objectClass; + return event; +} + +void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) +{ + uint32_t length = buf.getPosition(); + MessageImpl::Ptr message(new MessageImpl); + + buf.reset(); + buf.getRawData(message->body, length); + message->destination = destination; + message->routingKey = routingKey; + message->replyExchange = "amq.direct"; + message->replyKey = queueName; + + xmtQueue.push_back(message); +} + +void AgentImpl::sendPackageIndicationLH(const string& packageName) +{ + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 'p'); + buffer.putShortString(packageName); + sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName); +} + +void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key) +{ + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 'q'); + buffer.putOctet((int) kind); + buffer.putShortString(packageName); + buffer.putShortString(key.name); + buffer.putBin128(const_cast<uint8_t*>(key.hash)); // const_cast needed for older Qpid libraries + sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name); +} + +void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey, + uint32_t sequence, uint32_t code, const string& text) +{ + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 'z', sequence); + buffer.putLong(code); + buffer.putShortString(text); + sendBufferLH(buffer, exchange, replyToKey); + QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); +} + +void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text) +{ + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 'm', sequence); + buffer.putLong(code); + + string fulltext; + switch (code) { + case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break; + case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break; + case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break; + case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break; + default: fulltext = "Unspecified Error"; break; + } + + if (!text.empty()) { + fulltext += " ("; + fulltext += text; + fulltext += ")"; + } + + buffer.putMediumString(fulltext); + sendBufferLH(buffer, DIR_EXCHANGE, key); + QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext); +} + +void AgentImpl::handleAttachResponse(Buffer& inBuffer) +{ + Mutex::ScopedLock _lock(lock); + + assignedBrokerBank = inBuffer.getLong(); + assignedAgentBank = inBuffer.getLong(); + + QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank); + + if ((assignedBrokerBank != requestedBrokerBank) || + (assignedAgentBank != requestedAgentBank)) { + if (requestedAgentBank == 0) { + QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." << + assignedAgentBank); + } else { + QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank << + "." << assignedAgentBank); + } + //storeData(); // TODO + requestedBrokerBank = assignedBrokerBank; + requestedAgentBank = assignedAgentBank; + } + + attachment.setBanks(assignedBrokerBank, assignedAgentBank); + + // Bind to qpid.management to receive commands + stringstream key; + key << "agent." << assignedBrokerBank << "." << assignedAgentBank; + eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str())); + + // Send package indications for all local packages + for (map<string, ClassMaps>::iterator pIter = packages.begin(); + pIter != packages.end(); + pIter++) { + sendPackageIndicationLH(pIter->first); + + // Send class indications for all local classes + ClassMaps cMap = pIter->second; + for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin(); + cIter != cMap.objectClasses.end(); cIter++) + sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first); + for (EventClassMap::iterator cIter = cMap.eventClasses.begin(); + cIter != cMap.eventClasses.end(); cIter++) + sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first); + } +} + +void AgentImpl::handlePackageRequest(Buffer&) +{ + Mutex::ScopedLock _lock(lock); +} + +void AgentImpl::handleClassQuery(Buffer&) +{ + Mutex::ScopedLock _lock(lock); +} + +void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, + const string& replyExchange, const string& replyKey) +{ + Mutex::ScopedLock _lock(lock); + string rExchange(replyExchange); + string rKey(replyKey); + string packageName; + inBuffer.getShortString(packageName); + SchemaClassKey key(inBuffer); + + if (rExchange.empty()) + rExchange = QMF_EXCHANGE; + if (rKey.empty()) + rKey = BROKER_KEY; + + QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name); + + map<string, ClassMaps>::iterator pIter = packages.find(packageName); + if (pIter == packages.end()) { + sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found"); + return; + } + + ClassMaps cMap = pIter->second; + ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key); + if (ocIter != cMap.objectClasses.end()) { + SchemaObjectClassImpl* oImpl = ocIter->second; + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 's', sequence); + oImpl->encode(buffer); + sendBufferLH(buffer, rExchange, rKey); + QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name); + return; + } + + EventClassMap::iterator ecIter = cMap.eventClasses.find(key); + if (ecIter != cMap.eventClasses.end()) { + SchemaEventClassImpl* eImpl = ecIter->second; + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 's', sequence); + eImpl->encode(buffer); + sendBufferLH(buffer, rExchange, rKey); + QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name); + return; + } + + sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found"); +} + +void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId) +{ + Mutex::ScopedLock _lock(lock); + FieldTable ft; + FieldTable::ValuePtr value; + map<string, ClassMaps>::const_iterator pIter = packages.end(); + string pname; + string cname; + string oidRepr; + boost::shared_ptr<ObjectId> oid; + + ft.decode(inBuffer); + + QPID_LOG(trace, "RCVD GetQuery: map=" << ft); + + value = ft.get("_package"); + if (value.get() && value->convertsTo<string>()) { + pname = value->get<string>(); + pIter = packages.find(pname); + if (pIter == packages.end()) { + sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence); + return; + } + } + + value = ft.get("_class"); + if (value.get() && value->convertsTo<string>()) { + cname = value->get<string>(); + // TODO - check for validity of class (in package or any package) + if (pIter == packages.end()) { + } else { + + } + } + + value = ft.get("_objectid"); + if (value.get() && value->convertsTo<string>()) { + oidRepr = value->get<string>(); + oid.reset(new ObjectId()); + oid->impl->fromString(oidRepr); + } + + AgentQueryContext::Ptr context(new AgentQueryContext); + uint32_t contextNum = nextContextNum++; + context->sequence = sequence; + context->exchange = DIR_EXCHANGE; + context->key = replyTo; + contextMap[contextNum] = context; + + eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid)); +} + +void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId) +{ + Mutex::ScopedLock _lock(lock); + string pname; + string method; + ObjectIdImpl* oidImpl = new ObjectIdImpl(buffer); + boost::shared_ptr<ObjectId> oid(oidImpl->envelope); + buffer.getShortString(pname); + SchemaClassKey classKey(buffer); + buffer.getShortString(method); + + map<string, ClassMaps>::const_iterator pIter = packages.find(pname); + if (pIter == packages.end()) { + sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname); + return; + } + + ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey); + if (cIter == pIter->second.objectClasses.end()) { + sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr()); + return; + } + + const SchemaObjectClassImpl* schema = cIter->second; + vector<SchemaMethodImpl*>::const_iterator mIter = schema->methods.begin(); + for (; mIter != schema->methods.end(); mIter++) { + if ((*mIter)->name == method) + break; + } + + if (mIter == schema->methods.end()) { + sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method); + return; + } + + SchemaMethodImpl* schemaMethod = *mIter; + boost::shared_ptr<Value> argMap(new Value(TYPE_MAP)); + ValueImpl* value; + for (vector<SchemaArgumentImpl*>::const_iterator aIter = schemaMethod->arguments.begin(); + aIter != schemaMethod->arguments.end(); aIter++) { + const SchemaArgumentImpl* schemaArg = *aIter; + if (schemaArg->dir == DIR_IN || schemaArg->dir == DIR_IN_OUT) + value = new ValueImpl(schemaArg->typecode, buffer); + else + value = new ValueImpl(schemaArg->typecode); + argMap->insert(schemaArg->name.c_str(), value->envelope); + } + + AgentQueryContext::Ptr context(new AgentQueryContext); + uint32_t contextNum = nextContextNum++; + context->sequence = sequence; + context->exchange = DIR_EXCHANGE; + context->key = replyTo; + context->schemaMethod = schemaMethod; + contextMap[contextNum] = context; + + eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema->envelope)); +} + +void AgentImpl::handleConsoleAddedIndication() +{ + Mutex::ScopedLock _lock(lock); +} + +//================================================================== +// Wrappers +//================================================================== + +Agent::Agent(char* label, bool internalStore) +{ + impl = new AgentImpl(label, internalStore); +} + +Agent::~Agent() +{ + cout << "Agent::~Agent" << endl; + delete impl; +} + +void Agent::setStoreDir(char* path) +{ + impl->setStoreDir(path); +} + +void Agent::setTransferDir(char* path) +{ + impl->setTransferDir(path); +} + +void Agent::handleRcvMessage(Message& message) +{ + impl->handleRcvMessage(message); +} + +bool Agent::getXmtMessage(Message& item) +{ + return impl->getXmtMessage(item); +} + +void Agent::popXmt() +{ + impl->popXmt(); +} + +bool Agent::getEvent(AgentEvent& event) +{ + return impl->getEvent(event); +} + +void Agent::popEvent() +{ + impl->popEvent(); +} + +void Agent::newSession() +{ + impl->newSession(); +} + +void Agent::startProtocol() +{ + impl->startProtocol(); +} + +void Agent::heartbeat() +{ + impl->heartbeat(); +} + +void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) +{ + impl->methodResponse(sequence, status, text, arguments); +} + +void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) +{ + impl->queryResponse(sequence, object, prop, stat); +} + +void Agent::queryComplete(uint32_t sequence) +{ + impl->queryComplete(sequence); +} + +void Agent::registerClass(SchemaObjectClass* cls) +{ + impl->registerClass(cls); +} + +void Agent::registerClass(SchemaEventClass* cls) +{ + impl->registerClass(cls); +} + +const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) +{ + return impl->addObject(obj, persistId); +} + +const ObjectId* Agent::allocObjectId(uint64_t persistId) +{ + return impl->allocObjectId(persistId); +} + +const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) +{ + return impl->allocObjectId(persistIdLo, persistIdHi); +} + +void Agent::raiseEvent(Event& event) +{ + impl->raiseEvent(event); +} + diff --git a/cpp/src/qmf/Agent.h b/cpp/src/qmf/Agent.h new file mode 100644 index 0000000000..d8f784e9d8 --- /dev/null +++ b/cpp/src/qmf/Agent.h @@ -0,0 +1,206 @@ +#ifndef _QmfAgent_ +#define _QmfAgent_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/Schema.h> +#include <qmf/ObjectId.h> +#include <qmf/Object.h> +#include <qmf/Event.h> +#include <qmf/Query.h> +#include <qmf/Value.h> +#include <qmf/Message.h> + +namespace qmf { + + /** + * AgentEvent + * + * This structure represents a QMF event coming from the agent to + * the application. + */ + struct AgentEvent { + enum EventKind { + GET_QUERY = 1, + START_SYNC = 2, + END_SYNC = 3, + METHOD_CALL = 4, + DECLARE_QUEUE = 5, + DELETE_QUEUE = 6, + BIND = 7, + UNBIND = 8, + SETUP_COMPLETE = 9 + }; + + EventKind kind; + uint32_t sequence; // Protocol sequence (for all kinds) + char* authUserId; // Authenticated user ID (for all kinds) + char* authToken; // Authentication token if issued (for all kinds) + char* name; // Name of the method/sync query + // (METHOD_CALL, START_SYNC, END_SYNC, DECLARE_QUEUE, BIND, UNBIND) + Object* object; // Object involved in method call (METHOD_CALL) + ObjectId* objectId; // ObjectId for method call (METHOD_CALL) + Query* query; // Query parameters (GET_QUERY, START_SYNC) + Value* arguments; // Method parameters (METHOD_CALL) + char* exchange; // Exchange for bind (BIND, UNBIND) + char* bindingKey; // Key for bind (BIND, UNBIND) + SchemaObjectClass* objectClass; // (METHOD_CALL) + }; + + class AgentImpl; + + /** + * Agent - Protocol engine for the QMF agent + */ + class Agent { + public: + Agent(char* label, bool internalStore=true); + ~Agent(); + + /** + * Configure the directory path for storing persistent data. + *@param path Null-terminated string containing a directory path where files can be + * created, written, and read. If NULL, no persistent storage will be + * attempted. + */ + void setStoreDir(char* path); + + /** + * Configure the directory path for files transferred over QMF. + *@param path Null-terminated string containing a directory path where files can be + * created, deleted, written, and read. If NULL, file transfers shall not + * be permitted. + */ + void setTransferDir(char* path); + + /** + * Pass messages received from the AMQP session to the Agent engine. + *@param message AMQP messages received on the agent session. + */ + void handleRcvMessage(Message& message); + + /** + * Get the next message to be sent to the AMQP network. + *@param item The Message structure describing the message to be produced. + *@return true if the Message is valid, false if there are no messages to send. + */ + bool getXmtMessage(Message& item); + + /** + * Remove and discard one message from the head of the transmit queue. + */ + void popXmt(); + + /** + * Get the next application event from the agent engine. + *@param event The event iff the return value is true + *@return true if event is valid, false if there are no events to process + */ + bool getEvent(AgentEvent& event); + + /** + * Remove and discard one event from the head of the event queue. + */ + void popEvent(); + + /** + * A new AMQP session has been established for Agent communication. + */ + void newSession(); + + /** + * Start the QMF Agent protocol. This should be invoked after a SETUP_COMPLETE event + * is received from the Agent engine. + */ + void startProtocol(); + + /** + * This method is called periodically so the agent can supply a heartbeat. + */ + void heartbeat(); + + /** + * Respond to a method request. + *@param sequence The sequence number from the method request event. + *@param status The method's completion status. + *@param text Status text ("OK" or an error message) + *@param arguments The list of output arguments from the method call. + */ + void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments); + + /** + * Send a content indication to the QMF bus. This is only needed for objects that are + * managed by the application. This is *NOT* needed for objects managed by the Agent + * (inserted using addObject). + *@param sequence The sequence number of the GET request or the SYNC_START request. + *@param object The object (annotated with "changed" flags) for publication. + *@param prop If true, changed object properties are transmitted. + *@param stat If true, changed object statistics are transmitted. + */ + void queryResponse(uint32_t sequence, Object& object, bool prop = true, bool stat = true); + + /** + * Indicate the completion of a query. This is not used for SYNC_START requests. + *@param sequence The sequence number of the GET request. + */ + void queryComplete(uint32_t sequence); + + /** + * Register a schema class with the Agent. + *@param cls A SchemaObejctClass object that defines data managed by the agent. + */ + void registerClass(SchemaObjectClass* cls); + + /** + * Register a schema class with the Agent. + *@param cls A SchemaEventClass object that defines events sent by the agent. + */ + void registerClass(SchemaEventClass* cls); + + /** + * Give an object to the Agent for storage and management. Once added, the agent takes + * responsibility for the life cycle of the object. + *@param obj The object to be managed by the Agent. + *@param persistId A unique non-zero value if the object-id is to be persistent. + *@return The objectId of the managed object. + */ + const ObjectId* addObject(Object& obj, uint64_t persistId); + + /** + * Allocate an objecc-id for an object that will be managed by the application. + *@param persistId A unique non-zero value if the object-id is to be persistent. + @return The objectId structure for the allocated ID. + */ + const ObjectId* allocObjectId(uint64_t persistId); + const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi); + + /** + * Raise an event into the QMF network.. + *@param event The event object for the event to be raised. + */ + void raiseEvent(Event& event); + + private: + AgentImpl* impl; + }; +} + +#endif + diff --git a/cpp/src/qmf/Console.h b/cpp/src/qmf/Console.h new file mode 100644 index 0000000000..de7949e1de --- /dev/null +++ b/cpp/src/qmf/Console.h @@ -0,0 +1,82 @@ +#ifndef _QmfConsole_ +#define _QmfConsole_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/ManagedConnection.h> +#include <qmf/Agent.h> +#include <qmf/Broker.h> +#include <qmf/Package.h> +#include <qmf/SchemaClassTable.h> +#include <qmf/Object.h> +#include <qmf/ConsoleHandler.h> +#include <set> +#include <vector> +#include <string> + +namespace qmf { + + struct ConsoleSettings { + bool rcvObjects; + bool rcvEvents; + bool rcvHeartbeats; + bool userBindings; + uint32_t methodTimeout; + uint32_t getTimeout; + + ConsoleSettings() : + rcvObjects(true), + rcvEvents(true), + rcvHeartbeats(true), + userBindings(false), + methodTimeout(20), + getTimeout(20) {} + }; + + class Console { + public: + Console(ConsoleHandler* handler = 0, ConsoleSettings settings = ConsoleSettings()); + ~Console(); + + Broker* addConnection(ManagedConnection& connection); + void delConnection(Broker* broker); + void delConnection(ManagedConnection& connection); + + const PackageMap& getPackages() const; + + void bindPackage(const Package& package); + void bindPackage(const std::string& packageName); + void bindClass(const SchemaClass& otype); + void bindClass(const std::string& packageName, const std::string& className); + + void getAgents(std::set<Agent>& agents, Broker* = 0); + void getObjects(std::vector<Object>& objects, const std::string& typeName, + const std::string& packageName = "", + Broker* broker = 0, + Agent* agent = 0); + void getObjects(std::vector<Object>& objects, + const std::map<std::string, std::string>& query, + Broker* broker = 0, + Agent* agent = 0); + }; +} + +#endif + diff --git a/cpp/src/qmf/Event.h b/cpp/src/qmf/Event.h new file mode 100644 index 0000000000..f20c6d2fb1 --- /dev/null +++ b/cpp/src/qmf/Event.h @@ -0,0 +1,30 @@ +#ifndef _QmfEvent_ +#define _QmfEvent_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace qmf { + + class Event { + }; +} + +#endif + diff --git a/cpp/src/qmf/Message.h b/cpp/src/qmf/Message.h new file mode 100644 index 0000000000..7b3745b723 --- /dev/null +++ b/cpp/src/qmf/Message.h @@ -0,0 +1,39 @@ +#ifndef _QmfMessage_ +#define _QmfMessage_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <string> + +namespace qmf { + + struct Message { + char* body; + uint32_t length; + char* destination; + char* routingKey; + char* replyExchange; + char* replyKey; + char* userId; + }; + +} + +#endif diff --git a/cpp/src/qmf/MessageImpl.cpp b/cpp/src/qmf/MessageImpl.cpp new file mode 100644 index 0000000000..0f3dd7caaf --- /dev/null +++ b/cpp/src/qmf/MessageImpl.cpp @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "MessageImpl.h" +#include <string.h> + +using namespace std; +using namespace qmf; + +#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} + +Message MessageImpl::copy() +{ + Message item; + + ::memset(&item, 0, sizeof(Message)); + item.body = const_cast<char*>(body.c_str()); + item.length = body.length(); + STRING_REF(destination); + STRING_REF(routingKey); + STRING_REF(replyExchange); + STRING_REF(replyKey); + STRING_REF(userId); + + return item; +} + diff --git a/cpp/src/qmf/MessageImpl.h b/cpp/src/qmf/MessageImpl.h new file mode 100644 index 0000000000..618e2c1940 --- /dev/null +++ b/cpp/src/qmf/MessageImpl.h @@ -0,0 +1,42 @@ +#ifndef _QmfMessageImpl_ +#define _QmfMessageImpl_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "Message.h" +#include <string> +#include <boost/shared_ptr.hpp> + +namespace qmf { + + struct MessageImpl { + typedef boost::shared_ptr<MessageImpl> Ptr; + std::string body; + std::string destination; + std::string routingKey; + std::string replyExchange; + std::string replyKey; + std::string userId; + + Message copy(); + }; +} + +#endif diff --git a/cpp/src/qmf/Object.h b/cpp/src/qmf/Object.h new file mode 100644 index 0000000000..6416aa3c8c --- /dev/null +++ b/cpp/src/qmf/Object.h @@ -0,0 +1,47 @@ +#ifndef _QmfObject_ +#define _QmfObject_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/Schema.h> +#include <qmf/ObjectId.h> +#include <qmf/Value.h> + +namespace qmf { + + class ObjectImpl; + class Object { + public: + Object(const SchemaObjectClass* type); + Object(ObjectImpl* impl); + ~Object(); + + void destroy(); + const ObjectId* getObjectId() const; + void setObjectId(ObjectId* oid); + const SchemaObjectClass* getClass() const; + Value* getValue(char* key); + + ObjectImpl* impl; + }; +} + +#endif + diff --git a/cpp/src/qmf/ObjectId.h b/cpp/src/qmf/ObjectId.h new file mode 100644 index 0000000000..d27c804773 --- /dev/null +++ b/cpp/src/qmf/ObjectId.h @@ -0,0 +1,53 @@ +#ifndef _QmfObjectId_ +#define _QmfObjectId_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdint.h> + +namespace qmf { + + // TODO: Add to/from string and << operator + + class ObjectIdImpl; + class ObjectId { + public: + ObjectId(); + ObjectId(ObjectIdImpl* impl); + ~ObjectId(); + + uint64_t getObjectNum() const; + uint32_t getObjectNumHi() const; + uint32_t getObjectNumLo() const; + bool isDurable() const; + + bool operator==(const ObjectId& other) const; + bool operator!=(const ObjectId& other) const; + bool operator<(const ObjectId& other) const; + bool operator>(const ObjectId& other) const; + bool operator<=(const ObjectId& other) const; + bool operator>=(const ObjectId& other) const; + + ObjectIdImpl* impl; + }; +} + +#endif + diff --git a/cpp/src/qmf/ObjectIdImpl.cpp b/cpp/src/qmf/ObjectIdImpl.cpp new file mode 100644 index 0000000000..83fd6cc34f --- /dev/null +++ b/cpp/src/qmf/ObjectIdImpl.cpp @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "ObjectIdImpl.h" +#include <stdlib.h> + +using namespace std; +using namespace qmf; +using qpid::framing::Buffer; + + +void AgentAttachment::setBanks(uint32_t broker, uint32_t agent) +{ + first = + ((uint64_t) (broker & 0x000fffff)) << 28 | + ((uint64_t) (agent & 0x0fffffff)); +} + +ObjectIdImpl::ObjectIdImpl(Buffer& buffer) : envelope(new ObjectId(this)), agent(0) +{ + decode(buffer); +} + +ObjectIdImpl::ObjectIdImpl(AgentAttachment* a, uint8_t flags, uint16_t seq, uint64_t object) : + envelope(new ObjectId(this)), agent(a) +{ + first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48; + second = object; +} + +void ObjectIdImpl::decode(Buffer& buffer) +{ + first = buffer.getLongLong(); + second = buffer.getLongLong(); +} + +void ObjectIdImpl::encode(Buffer& buffer) const +{ + if (agent == 0) + buffer.putLongLong(first); + else + buffer.putLongLong(first | agent->first); + buffer.putLongLong(second); +} + +void ObjectIdImpl::fromString(const std::string& repr) +{ +#define FIELDS 5 +#if defined (_WIN32) && !defined (atoll) +# define atoll(X) _atoi64(X) +#endif + + std::string copy(repr.c_str()); + char* cText; + char* field[FIELDS]; + bool atFieldStart = true; + int idx = 0; + + cText = const_cast<char*>(copy.c_str()); + for (char* cursor = cText; *cursor; cursor++) { + if (atFieldStart) { + if (idx >= FIELDS) + return; // TODO error + field[idx++] = cursor; + atFieldStart = false; + } else { + if (*cursor == '-') { + *cursor = '\0'; + atFieldStart = true; + } + } + } + + if (idx != FIELDS) + return; // TODO error + + first = (atoll(field[0]) << 60) + + (atoll(field[1]) << 48) + + (atoll(field[2]) << 28) + + atoll(field[3]); + second = atoll(field[4]); + agent = 0; +} + +bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const +{ + uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + + return first == otherFirst && second == other.second; +} + +bool ObjectIdImpl::operator<(const ObjectIdImpl& other) const +{ + uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + + return (first < otherFirst) || ((first == otherFirst) && (second < other.second)); +} + +bool ObjectIdImpl::operator>(const ObjectIdImpl& other) const +{ + uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + + return (first > otherFirst) || ((first == otherFirst) && (second > other.second)); +} + + +//================================================================== +// Wrappers +//================================================================== + +ObjectId::ObjectId() +{ + impl = new ObjectIdImpl(this); +} + +ObjectId::ObjectId(ObjectIdImpl* i) +{ + impl = i; +} + +ObjectId::~ObjectId() +{ + delete impl; +} + +uint64_t ObjectId::getObjectNum() const +{ + return impl->getObjectNum(); +} + +uint32_t ObjectId::getObjectNumHi() const +{ + return impl->getObjectNumHi(); +} + +uint32_t ObjectId::getObjectNumLo() const +{ + return impl->getObjectNumLo(); +} + +bool ObjectId::isDurable() const +{ + return impl->isDurable(); +} + +bool ObjectId::operator==(const ObjectId& other) const +{ + return *impl == *other.impl; +} + +bool ObjectId::operator!=(const ObjectId& other) const +{ + return !(*impl == *other.impl); +} + +bool ObjectId::operator<(const ObjectId& other) const +{ + return *impl < *other.impl; +} + +bool ObjectId::operator>(const ObjectId& other) const +{ + return *impl > *other.impl; +} + +bool ObjectId::operator<=(const ObjectId& other) const +{ + return !(*impl > *other.impl); +} + +bool ObjectId::operator>=(const ObjectId& other) const +{ + return !(*impl < *other.impl); +} diff --git a/cpp/src/qmf/ObjectIdImpl.h b/cpp/src/qmf/ObjectIdImpl.h new file mode 100644 index 0000000000..5d8ee59aee --- /dev/null +++ b/cpp/src/qmf/ObjectIdImpl.h @@ -0,0 +1,66 @@ +#ifndef _QmfObjectIdImpl_ +#define _QmfObjectIdImpl_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/ObjectId.h> +#include <qpid/framing/Buffer.h> + +namespace qmf { + + struct AgentAttachment { + uint64_t first; + + AgentAttachment() : first(0) {} + void setBanks(uint32_t broker, uint32_t bank); + uint64_t getFirst() const { return first; } + }; + + struct ObjectIdImpl { + ObjectId* envelope; + AgentAttachment* agent; + uint64_t first; + uint64_t second; + + ObjectIdImpl(ObjectId* e) : envelope(e), agent(0) {} + ObjectIdImpl(qpid::framing::Buffer& buffer); + ObjectIdImpl(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object); + + void decode(qpid::framing::Buffer& buffer); + void encode(qpid::framing::Buffer& buffer) const; + void fromString(const std::string& repr); + uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; } + uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; } + uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; } + uint32_t getAgentBank() const { return first & 0x000000000FFFFFFFLL; } + uint64_t getObjectNum() const { return second; } + uint32_t getObjectNumHi() const { return (uint32_t) (second >> 32); } + uint32_t getObjectNumLo() const { return (uint32_t) (second & 0x00000000FFFFFFFFLL); } + bool isDurable() const { return getSequence() == 0; } + void setValue(uint64_t f, uint64_t s) { first = f; second = s; } + + bool operator==(const ObjectIdImpl& other) const; + bool operator<(const ObjectIdImpl& other) const; + bool operator>(const ObjectIdImpl& other) const; + }; +} + +#endif + diff --git a/cpp/src/qmf/ObjectImpl.cpp b/cpp/src/qmf/ObjectImpl.cpp new file mode 100644 index 0000000000..d00dd89b3d --- /dev/null +++ b/cpp/src/qmf/ObjectImpl.cpp @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "ObjectImpl.h" +#include "ValueImpl.h" +#include <qpid/sys/Time.h> + +using namespace std; +using namespace qmf; +using namespace qpid::sys; +using qpid::framing::Buffer; + +ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) : + envelope(e), objectClass(type), createTime(uint64_t(Duration(now()))), + destroyTime(0), lastUpdatedTime(createTime) +{ + int propCount = objectClass->getPropertyCount(); + int statCount = objectClass->getStatisticCount(); + int idx; + + for (idx = 0; idx < propCount; idx++) { + const SchemaProperty* prop = objectClass->getProperty(idx); + properties[prop->getName()] = ValuePtr(new Value(prop->getType())); + } + + for (idx = 0; idx < statCount; idx++) { + const SchemaStatistic* stat = objectClass->getStatistic(idx); + statistics[stat->getName()] = ValuePtr(new Value(stat->getType())); + } +} + +ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer) : + envelope(new Object(this)), objectClass(type), createTime(uint64_t(Duration(now()))), + destroyTime(0), lastUpdatedTime(createTime) +{ + int propCount = objectClass->getPropertyCount(); + int statCount = objectClass->getStatisticCount(); + int idx; + set<string> excludes; + + parsePresenceMasks(buffer, excludes); + for (idx = 0; idx < propCount; idx++) { + const SchemaProperty* prop = objectClass->getProperty(idx); + if (excludes.count(prop->getName()) != 0) { + properties[prop->getName()] = ValuePtr(new Value(prop->getType())); + } else { + ValueImpl* pval = new ValueImpl(prop->getType(), buffer); + properties[prop->getName()] = ValuePtr(pval->envelope); + } + } + + for (idx = 0; idx < statCount; idx++) { + const SchemaStatistic* stat = objectClass->getStatistic(idx); + ValueImpl* sval = new ValueImpl(stat->getType(), buffer); + statistics[stat->getName()] = ValuePtr(sval->envelope); + } +} + +ObjectImpl::~ObjectImpl() +{ +} + +void ObjectImpl::destroy() +{ + destroyTime = uint64_t(Duration(now())); + // TODO - flag deletion +} + +Value* ObjectImpl::getValue(const string& key) +{ + map<string, ValuePtr>::const_iterator iter; + + iter = properties.find(key); + if (iter != properties.end()) + return iter->second.get(); + + iter = statistics.find(key); + if (iter != statistics.end()) + return iter->second.get(); + + return 0; +} + +void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList) +{ + int propCount = objectClass->getPropertyCount(); + excludeList.clear(); + uint8_t bit = 0; + uint8_t mask = 0; + + for (int idx = 0; idx < propCount; idx++) { + const SchemaProperty* prop = objectClass->getProperty(idx); + if (prop->isOptional()) { + if (bit == 0) { + mask = buffer.getOctet(); + bit = 1; + } + if ((mask & bit) == 0) + excludeList.insert(string(prop->getName())); + if (bit == 0x80) + bit = 0; + else + bit = bit << 1; + } + } +} + +void ObjectImpl::encodeSchemaKey(qpid::framing::Buffer& buffer) const +{ + buffer.putShortString(objectClass->getPackage()); + buffer.putShortString(objectClass->getName()); + buffer.putBin128(const_cast<uint8_t*>(objectClass->getHash())); +} + +void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const +{ + buffer.putLongLong(lastUpdatedTime); + buffer.putLongLong(createTime); + buffer.putLongLong(destroyTime); + objectId->impl->encode(buffer); +} + +void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const +{ + int propCount = objectClass->getPropertyCount(); + uint8_t bit = 0; + uint8_t mask = 0; + ValuePtr value; + + for (int idx = 0; idx < propCount; idx++) { + const SchemaProperty* prop = objectClass->getProperty(idx); + if (prop->isOptional()) { + value = properties[prop->getName()]; + if (bit == 0) + bit = 1; + if (!value->isNull()) + mask |= bit; + if (bit == 0x80) { + buffer.putOctet(mask); + bit = 0; + mask = 0; + } else + bit = bit << 1; + } + } + if (bit != 0) { + buffer.putOctet(mask); + } + + for (int idx = 0; idx < propCount; idx++) { + const SchemaProperty* prop = objectClass->getProperty(idx); + value = properties[prop->getName()]; + if (!prop->isOptional() || !value->isNull()) { + value->impl->encode(buffer); + } + } +} + +void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const +{ + int statCount = objectClass->getStatisticCount(); + for (int idx = 0; idx < statCount; idx++) { + const SchemaStatistic* stat = objectClass->getStatistic(idx); + ValuePtr value = statistics[stat->getName()]; + value->impl->encode(buffer); + } +} + +//================================================================== +// Wrappers +//================================================================== + +Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(this, type)) {} + +Object::Object(ObjectImpl* i) : impl(i) {} + +Object::~Object() +{ + delete impl; +} + +void Object::destroy() +{ + impl->destroy(); +} + +const ObjectId* Object::getObjectId() const +{ + return impl->getObjectId(); +} + +void Object::setObjectId(ObjectId* oid) +{ + impl->setObjectId(oid); +} + +const SchemaObjectClass* Object::getClass() const +{ + return impl->getClass(); +} + +Value* Object::getValue(char* key) +{ + return impl->getValue(key); +} + diff --git a/cpp/src/qmf/ObjectImpl.h b/cpp/src/qmf/ObjectImpl.h new file mode 100644 index 0000000000..4dc2170bfc --- /dev/null +++ b/cpp/src/qmf/ObjectImpl.h @@ -0,0 +1,62 @@ +#ifndef _QmfObjectImpl_ +#define _QmfObjectImpl_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/Object.h> +#include <map> +#include <set> +#include <string> +#include <qpid/framing/Buffer.h> +#include <boost/shared_ptr.hpp> + +namespace qmf { + + struct ObjectImpl { + typedef boost::shared_ptr<Value> ValuePtr; + Object* envelope; + const SchemaObjectClass* objectClass; + boost::shared_ptr<ObjectId> objectId; + uint64_t createTime; + uint64_t destroyTime; + uint64_t lastUpdatedTime; + mutable std::map<std::string, ValuePtr> properties; + mutable std::map<std::string, ValuePtr> statistics; + + ObjectImpl(Object* e, const SchemaObjectClass* type); + ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer); + ~ObjectImpl(); + + void destroy(); + const ObjectId* getObjectId() const { return objectId.get(); } + void setObjectId(ObjectId* oid) { objectId.reset(oid); } + const SchemaObjectClass* getClass() const { return objectClass; } + Value* getValue(const std::string& key); + + void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList); + void encodeSchemaKey(qpid::framing::Buffer& buffer) const; + void encodeManagedObjectData(qpid::framing::Buffer& buffer) const; + void encodeProperties(qpid::framing::Buffer& buffer) const; + void encodeStatistics(qpid::framing::Buffer& buffer) const; + }; +} + +#endif + diff --git a/cpp/src/qmf/Query.h b/cpp/src/qmf/Query.h new file mode 100644 index 0000000000..346d8c336b --- /dev/null +++ b/cpp/src/qmf/Query.h @@ -0,0 +1,54 @@ +#ifndef _QmfQuery_ +#define _QmfQuery_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/ObjectId.h> +#include <qmf/Value.h> + +namespace qmf { + + class QueryImpl; + class Query { + public: + Query(); + Query(QueryImpl* impl); + ~Query(); + + const char* getPackage() const; + const char* getClass() const; + const ObjectId* getObjectId() const; + + enum Oper { + OPER_AND = 1, + OPER_OR = 2 + }; + + int whereCount() const; + Oper whereOper() const; + const char* whereKey() const; + const Value* whereValue() const; + + QueryImpl* impl; + }; +} + +#endif + diff --git a/cpp/src/qmf/QueryImpl.cpp b/cpp/src/qmf/QueryImpl.cpp new file mode 100644 index 0000000000..f74d9238ae --- /dev/null +++ b/cpp/src/qmf/QueryImpl.cpp @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "QueryImpl.h" + +using namespace std; +using namespace qmf; + +//================================================================== +// Wrappers +//================================================================== + +Query::Query() : impl(new QueryImpl(this)) {} +Query::Query(QueryImpl* i) : impl(i) {} + +Query::~Query() +{ + delete impl; +} + +const char* Query::getPackage() const +{ + return impl->getPackage(); +} + +const char* Query::getClass() const +{ + return impl->getClass(); +} + +const ObjectId* Query::getObjectId() const +{ + return impl->getObjectId(); +} + +int Query::whereCount() const +{ + return impl->whereCount(); +} + +Query::Oper Query::whereOper() const +{ + return impl->whereOper(); +} + +const char* Query::whereKey() const +{ + return impl->whereKey(); +} + +const Value* Query::whereValue() const +{ + return impl->whereValue(); +} + diff --git a/cpp/src/qmf/QueryImpl.h b/cpp/src/qmf/QueryImpl.h new file mode 100644 index 0000000000..1cb9bfe554 --- /dev/null +++ b/cpp/src/qmf/QueryImpl.h @@ -0,0 +1,48 @@ +#ifndef _QmfQueryImpl_ +#define _QmfQueryImpl_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/Query.h> +#include <string> +#include <boost/shared_ptr.hpp> + +namespace qmf { + + struct QueryImpl { + Query* envelope; + std::string packageName; + std::string className; + boost::shared_ptr<ObjectId> oid; + + QueryImpl(Query* e) : envelope(e) {} + + const char* getPackage() const { return packageName.empty() ? 0 : packageName.c_str(); } + const char* getClass() const { return className.empty() ? 0 : className.c_str(); } + const ObjectId* getObjectId() const { return oid.get(); } + + int whereCount() const { return 0;} + Query::Oper whereOper() const { return Query::OPER_AND; } + const char* whereKey() const { return 0; } + const Value* whereValue() const { return 0; } + }; +} + +#endif diff --git a/cpp/src/qmf/ResilientConnection.cpp b/cpp/src/qmf/ResilientConnection.cpp new file mode 100644 index 0000000000..7ebd0a47c1 --- /dev/null +++ b/cpp/src/qmf/ResilientConnection.cpp @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "ResilientConnection.h" +#include "MessageImpl.h" +#include <qpid/client/Session.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> +#include <qpid/client/Message.h> +#include <qpid/sys/Thread.h> +#include <qpid/sys/Runnable.h> +#include <qpid/sys/Mutex.h> +#include <qpid/sys/Condition.h> +#include <qpid/log/Statement.h> +#include <qpid/RefCounted.h> +#include <boost/bind.hpp> +#include <string> +#include <deque> +#include <vector> +#include <set> +#include <boost/intrusive_ptr.hpp> + +using namespace std; +using namespace qmf; +using namespace qpid::client; +using qpid::sys::Mutex; + +namespace qmf { + struct ResilientConnectionEventImpl { + ResilientConnectionEvent::EventKind kind; + void* sessionContext; + string errorText; + MessageImpl message; + + ResilientConnectionEventImpl(ResilientConnectionEvent::EventKind k, + const MessageImpl& m = MessageImpl()) : + kind(k), sessionContext(0), message(m) {} + ResilientConnectionEvent copy(); + }; + + struct RCSession : public MessageListener, public qpid::sys::Runnable, public qpid::RefCounted { + typedef boost::intrusive_ptr<RCSession> Ptr; + ResilientConnectionImpl& connImpl; + string name; + Connection& connection; + Session session; + SubscriptionManager* subscriptions; + void* userContext; + vector<string> dests; + qpid::sys::Thread thread; + + RCSession(ResilientConnectionImpl& ci, const string& n, Connection& c, void* uc) : + connImpl(ci), name(n), connection(c), session(connection.newSession(name)), + subscriptions(new SubscriptionManager(session)), userContext(uc), thread(*this) {} + ~RCSession(); + void received(qpid::client::Message& msg); + void run(); + void stop(); + }; + + class ResilientConnectionImpl : public qpid::sys::Runnable { + public: + ResilientConnectionImpl(ConnectionSettings& settings, + int dmin, int dmax, int dfactor); + ~ResilientConnectionImpl(); + + bool isConnected() const; + bool getEvent(ResilientConnectionEvent& event); + void popEvent(); + bool createSession(const char* name, void* sessionContext, SessionHandle& handle); + void destroySession(SessionHandle handle); + void sendMessage(SessionHandle handle, qmf::Message& message); + void declareQueue(SessionHandle handle, char* queue); + void deleteQueue(SessionHandle handle, char* queue); + void bind(SessionHandle handle, char* exchange, char* queue, char* key); + void unbind(SessionHandle handle, char* exchange, char* queue, char* key); + void setNotifyFd(int fd); + + void run(); + void failure(); + void sessionClosed(RCSession* sess); + + void EnqueueEvent(ResilientConnectionEvent::EventKind kind, + void* sessionContext = 0, + const MessageImpl& message = MessageImpl(), + const string& errorText = ""); + + private: + int notifyFd; + bool connected; + bool shutdown; + string lastError; + ConnectionSettings settings; + Connection connection; + mutable qpid::sys::Mutex lock; + int delayMin; + int delayMax; + int delayFactor; + qpid::sys::Condition cond; + qpid::sys::Thread connThread; + deque<ResilientConnectionEventImpl> eventQueue; + set<RCSession::Ptr> sessions; + }; +} + +ResilientConnectionEvent ResilientConnectionEventImpl::copy() +{ + ResilientConnectionEvent item; + + ::memset(&item, 0, sizeof(ResilientConnectionEvent)); + item.kind = kind; + item.sessionContext = sessionContext; + item.message = message.copy(); + item.errorText = const_cast<char*>(errorText.c_str()); + + return item; +} + +RCSession::~RCSession() +{ + subscriptions->stop(); + thread.join(); + session.close(); + delete subscriptions; +} + +void RCSession::run() +{ + try { + subscriptions->run(); + } catch (exception& e) { + connImpl.sessionClosed(this); + } +} + +void RCSession::stop() +{ + subscriptions->stop(); +} + +void RCSession::received(qpid::client::Message& msg) +{ + qmf::MessageImpl qmsg; + qmsg.body = msg.getData(); + + qpid::framing::MessageProperties p = msg.getMessageProperties(); + if (p.hasReplyTo()) { + const qpid::framing::ReplyTo& rt = p.getReplyTo(); + qmsg.replyExchange = rt.getExchange(); + qmsg.replyKey = rt.getRoutingKey(); + } + + if (p.hasUserId()) { + qmsg.userId = p.getUserId(); + } + + connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg); +} + +ResilientConnectionImpl::ResilientConnectionImpl(ConnectionSettings& _settings, + int dmin, int dmax, int dfactor) : + notifyFd(-1), connected(false), shutdown(false), settings(_settings), + delayMin(dmin), delayMax(dmax), delayFactor(dfactor), connThread(*this) +{ + connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this)); +} + +ResilientConnectionImpl::~ResilientConnectionImpl() +{ + shutdown = true; + connected = false; + cond.notify(); + connThread.join(); + connection.close(); +} + +bool ResilientConnectionImpl::isConnected() const +{ + Mutex::ScopedLock _lock(lock); + return connected; +} + +bool ResilientConnectionImpl::getEvent(ResilientConnectionEvent& event) +{ + Mutex::ScopedLock _lock(lock); + if (eventQueue.empty()) + return false; + event = eventQueue.front().copy(); + return true; +} + +void ResilientConnectionImpl::popEvent() +{ + Mutex::ScopedLock _lock(lock); + if (!eventQueue.empty()) + eventQueue.pop_front(); +} + +bool ResilientConnectionImpl::createSession(const char* name, void* sessionContext, + SessionHandle& handle) +{ + Mutex::ScopedLock _lock(lock); + if (!connected) + return false; + + RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext)); + + handle.handle = (void*) sess.get(); + sessions.insert(sess); + + return true; +} + +void ResilientConnectionImpl::destroySession(SessionHandle handle) +{ + Mutex::ScopedLock _lock(lock); + RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.handle); + set<RCSession::Ptr>::iterator iter = sessions.find(sess); + if (iter != sessions.end()) { + for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++) + sess->subscriptions->cancel(dIter->c_str()); + sess->subscriptions->stop(); + sess->subscriptions->wait(); + + sessions.erase(iter); + return; + } +} + +void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& message) +{ + Mutex::ScopedLock _lock(lock); + RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.handle); + set<RCSession::Ptr>::iterator iter = sessions.find(sess); + qpid::client::Message msg; + string data(message.body, message.length); + msg.getDeliveryProperties().setRoutingKey(message.routingKey); + msg.getMessageProperties().setReplyTo(qpid::framing::ReplyTo(message.replyExchange, message.replyKey)); + msg.setData(data); + + try { + sess->session.messageTransfer(arg::content=msg, arg::destination=message.destination); + } catch(exception& e) { + QPID_LOG(error, "Session Exception during message-transfer: " << e.what()); + sessions.erase(iter); + EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, (*iter)->userContext); + } +} + +void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue) +{ + Mutex::ScopedLock _lock(lock); + RCSession* sess = (RCSession*) handle.handle; + + sess->session.queueDeclare(arg::queue=queue, arg::autoDelete=true, arg::exclusive=true); + sess->subscriptions->subscribe(*sess, queue, queue); + sess->dests.push_back(string(queue)); +} + +void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue) +{ + Mutex::ScopedLock _lock(lock); + RCSession* sess = (RCSession*) handle.handle; + + sess->session.queueDelete(arg::queue=queue); + for (vector<string>::iterator iter = sess->dests.begin(); + iter != sess->dests.end(); iter++) + if (*iter == queue) { + sess->subscriptions->cancel(queue); + sess->dests.erase(iter); + break; + } +} + +void ResilientConnectionImpl::bind(SessionHandle handle, + char* exchange, char* queue, char* key) +{ + Mutex::ScopedLock _lock(lock); + RCSession* sess = (RCSession*) handle.handle; + + sess->session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=key); +} + +void ResilientConnectionImpl::unbind(SessionHandle handle, + char* exchange, char* queue, char* key) +{ + Mutex::ScopedLock _lock(lock); + RCSession* sess = (RCSession*) handle.handle; + + sess->session.exchangeUnbind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=key); +} + +void ResilientConnectionImpl::setNotifyFd(int fd) +{ + notifyFd = fd; +} + +void ResilientConnectionImpl::run() +{ + int delay(delayMin); + + while (true) { + try { + connection.open(settings); + { + Mutex::ScopedLock _lock(lock); + connected = true; + EnqueueEvent(ResilientConnectionEvent::CONNECTED); + + while (connected) + cond.wait(lock); + + while (!sessions.empty()) { + set<RCSession::Ptr>::iterator iter = sessions.begin(); + RCSession::Ptr sess = *iter; + sessions.erase(iter); + EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext); + Mutex::ScopedUnlock _u(lock); + sess->stop(); + } + + EnqueueEvent(ResilientConnectionEvent::DISCONNECTED); + + if (shutdown) + return; + } + delay = delayMin; + connection.close(); + } catch (exception &e) { + QPID_LOG(debug, "connection.open exception: " << e.what()); + Mutex::ScopedLock _lock(lock); + lastError = e.what(); + if (delay < delayMax) + delay *= delayFactor; + } + + ::sleep(delay); + } +} + +void ResilientConnectionImpl::failure() +{ + Mutex::ScopedLock _lock(lock); + + connected = false; + lastError = "Closed by Peer"; + cond.notify(); +} + +void ResilientConnectionImpl::sessionClosed(RCSession*) +{ + Mutex::ScopedLock _lock(lock); + connected = false; + lastError = "Closed due to Session failure"; + cond.notify(); +} + +void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind kind, + void* sessionContext, + const MessageImpl& message, + const string& errorText) +{ + Mutex::ScopedLock _lock(lock); + ResilientConnectionEventImpl event(kind, message); + + event.sessionContext = sessionContext; + event.errorText = errorText; + + eventQueue.push_back(event); + if (notifyFd != -1) + ::write(notifyFd, ".", 1); +} + + +//================================================================== +// Wrappers +//================================================================== + +ResilientConnection::ResilientConnection(ConnectionSettings& settings, + int delayMin, int delayMax, int delayFactor) +{ + impl = new ResilientConnectionImpl(settings, delayMin, delayMax, delayFactor); +} + +ResilientConnection::~ResilientConnection() +{ + delete impl; +} + +bool ResilientConnection::isConnected() const +{ + return impl->isConnected(); +} + +bool ResilientConnection::getEvent(ResilientConnectionEvent& event) +{ + return impl->getEvent(event); +} + +void ResilientConnection::popEvent() +{ + impl->popEvent(); +} + +bool ResilientConnection::createSession(const char* name, void* sessionContext, SessionHandle& handle) +{ + return impl->createSession(name, sessionContext, handle); +} + +void ResilientConnection::destroySession(SessionHandle handle) +{ + impl->destroySession(handle); +} + +void ResilientConnection::sendMessage(SessionHandle handle, qmf::Message& message) +{ + impl->sendMessage(handle, message); +} + +void ResilientConnection::declareQueue(SessionHandle handle, char* queue) +{ + impl->declareQueue(handle, queue); +} + +void ResilientConnection::deleteQueue(SessionHandle handle, char* queue) +{ + impl->deleteQueue(handle, queue); +} + +void ResilientConnection::bind(SessionHandle handle, char* exchange, char* queue, char* key) +{ + impl->bind(handle, exchange, queue, key); +} + +void ResilientConnection::unbind(SessionHandle handle, char* exchange, char* queue, char* key) +{ + impl->unbind(handle, exchange, queue, key); +} + +void ResilientConnection::setNotifyFd(int fd) +{ + impl->setNotifyFd(fd); +} + diff --git a/cpp/src/qmf/ResilientConnection.h b/cpp/src/qmf/ResilientConnection.h new file mode 100644 index 0000000000..bb565e27ae --- /dev/null +++ b/cpp/src/qmf/ResilientConnection.h @@ -0,0 +1,166 @@ +#ifndef _QmfResilientConnection_ +#define _QmfResilientConnection_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/Message.h> +#include <qpid/client/Connection.h> +#include <qpid/client/ConnectionSettings.h> +#include <string> + +namespace qmf { + + /** + * Represents events that occur, unsolicited, from ResilientConnection. + */ + struct ResilientConnectionEvent { + enum EventKind { + CONNECTED = 1, + DISCONNECTED = 2, + SESSION_CLOSED = 3, + RECV = 4 + }; + + EventKind kind; + void* sessionContext; // SESSION_CLOSED, RECV + char* errorText; // DISCONNECTED, SESSION_CLOSED + Message message; // RECV + }; + + struct SessionHandle { + void* handle; + }; + + class ResilientConnectionImpl; + + /** + * ResilientConnection represents a Qpid connection that is resilient. + * + * Upon creation, ResilientConnection attempts to establish a connection to the + * messaging broker. If it fails, it will continue to retry at an interval that + * increases over time (to a maximum interval). If an extablished connection is + * dropped, a reconnect will be attempted. + */ + class ResilientConnection { + public: + + /** + * Create a new resilient connection. + *@param settings Settings that define how the connection is to be made. + *@param delayMin Minimum delay (in seconds) between retries. + *@param delayMax Maximum delay (in seconds) between retries. + *@param delayFactor Factor to multiply retry delay by after each failure. + */ + ResilientConnection(qpid::client::ConnectionSettings& settings, + int delayMin = 1, + int delayMax = 128, + int delayFactor = 2); + ~ResilientConnection(); + + /** + * Get the connected status of the resilient connection. + *@return true iff the connection is established. + */ + bool isConnected() const; + + /** + * Get the next event (if present) from the connection. + *@param event Returned event if one is available. + *@return true if event is valid, false if there are no more events to handle. + */ + bool getEvent(ResilientConnectionEvent& event); + + /** + * Discard the event on the front of the queue. This should be invoked after processing + * the event from getEvent. + */ + void popEvent(); + + /** + * Create a new AMQP session. + *@param name Unique name for the session. + *@param sessionContext Optional user-context value that will be provided in events + * pertaining to this session. + *@param handle Output handle to be stored and used in subsequent calls pertaining to + * this session. + *@return true iff the session was successfully created. + */ + bool createSession(const char* name, void* sessionContext, SessionHandle& handle); + + /** + * Destroy a created session. + *@param handle SessionHandle returned by createSession. + */ + void destroySession(SessionHandle handle); + + /** + * Send a message into the AMQP broker via a session. + *@param handle The session handle of the session to transmit through. + *@param message The QMF message to transmit. + */ + void sendMessage(SessionHandle handle, Message& message); + + /** + * Declare an exclusive, auto-delete queue for a session. + *@param handle The session handle for the owner of the queue. + *@param queue The name of the queue. + */ + void declareQueue(SessionHandle handle, char* queue); + + /** + * Delete a queue. + *@param handle The session handle for the owner of the queue. + *@param queue The name of the queue. + */ + void deleteQueue(SessionHandle handle, char* queue); + + /** + * Bind a queue to an exchange. + *@param handle The session handle of the session to use for binding. + *@param exchange The name of the exchange for binding. + *@param queue The name of the queue for binding. + *@param key The binding key. + */ + void bind(SessionHandle handle, char* exchange, char* queue, char* key); + + /** + * Remove a binding. + *@param handle The session handle of the session to use for un-binding. + *@param exchange The name of the exchange. + *@param queue The name of the queue. + *@param key The binding key. + */ + void unbind(SessionHandle handle, char* exchange, char* queue, char* key); + + /** + * Establish a file descriptor for event notification. + *@param fd A file descriptor into which the connection shall write a character each + * time an event is enqueued. This fd may be in a pair, the other fd of which + * is used in a select loop to control execution. + */ + void setNotifyFd(int fd); + + private: + ResilientConnectionImpl* impl; + }; +} + +#endif + diff --git a/cpp/src/qmf/Schema.h b/cpp/src/qmf/Schema.h new file mode 100644 index 0000000000..c4106393ff --- /dev/null +++ b/cpp/src/qmf/Schema.h @@ -0,0 +1,160 @@ +#ifndef _QmfSchema_ +#define _QmfSchema_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/Typecode.h> +#include <stdint.h> + +namespace qmf { + + enum Access { ACCESS_READ_CREATE = 1, ACCESS_READ_WRITE = 2, ACCESS_READ_ONLY = 3 }; + enum Direction { DIR_IN = 1, DIR_OUT = 2, DIR_IN_OUT = 3 }; + enum ClassKind { CLASS_OBJECT = 1, CLASS_EVENT = 2 }; + + class SchemaArgumentImpl; + class SchemaMethodImpl; + class SchemaPropertyImpl; + class SchemaStatisticImpl; + class SchemaObjectClassImpl; + class SchemaEventClassImpl; + + /** + */ + class SchemaArgument { + public: + SchemaArgument(const char* name, Typecode typecode); + SchemaArgument(SchemaArgumentImpl* impl); + ~SchemaArgument(); + void setDirection(Direction dir); + void setUnit(const char* val); + void setDesc(const char* desc); + const char* getName() const; + Typecode getType() const; + Direction getDirection() const; + const char* getUnit() const; + const char* getDesc() const; + + SchemaArgumentImpl* impl; + }; + + /** + */ + class SchemaMethod { + public: + SchemaMethod(const char* name); + SchemaMethod(SchemaMethodImpl* impl); + ~SchemaMethod(); + void addArgument(const SchemaArgument& argument); + void setDesc(const char* desc); + const char* getName() const; + const char* getDesc() const; + int getArgumentCount() const; + const SchemaArgument* getArgument(int idx) const; + + SchemaMethodImpl* impl; + }; + + /** + */ + class SchemaProperty { + public: + SchemaProperty(const char* name, Typecode typecode); + SchemaProperty(SchemaPropertyImpl* impl); + ~SchemaProperty(); + void setAccess(Access access); + void setIndex(bool val); + void setOptional(bool val); + void setUnit(const char* val); + void setDesc(const char* desc); + const char* getName() const; + Typecode getType() const; + Access getAccess() const; + bool isIndex() const; + bool isOptional() const; + const char* getUnit() const; + const char* getDesc() const; + + SchemaPropertyImpl* impl; + }; + + /** + */ + class SchemaStatistic { + public: + SchemaStatistic(const char* name, Typecode typecode); + SchemaStatistic(SchemaStatisticImpl* impl); + ~SchemaStatistic(); + void setUnit(const char* val); + void setDesc(const char* desc); + const char* getName() const; + Typecode getType() const; + const char* getUnit() const; + const char* getDesc() const; + + SchemaStatisticImpl* impl; + }; + + /** + */ + class SchemaObjectClass { + public: + SchemaObjectClass(const char* package, const char* name); + SchemaObjectClass(SchemaObjectClassImpl* impl); + ~SchemaObjectClass(); + void addProperty(const SchemaProperty& property); + void addStatistic(const SchemaStatistic& statistic); + void addMethod(const SchemaMethod& method); + + const char* getPackage() const; + const char* getName() const; + const uint8_t* getHash() const; + int getPropertyCount() const; + int getStatisticCount() const; + int getMethodCount() const; + const SchemaProperty* getProperty(int idx) const; + const SchemaStatistic* getStatistic(int idx) const; + const SchemaMethod* getMethod(int idx) const; + + SchemaObjectClassImpl* impl; + }; + + /** + */ + class SchemaEventClass { + public: + SchemaEventClass(const char* package, const char* name); + SchemaEventClass(SchemaEventClassImpl* impl); + ~SchemaEventClass(); + void addArgument(const SchemaArgument& argument); + void setDesc(const char* desc); + + const char* getPackage() const; + const char* getName() const; + const uint8_t* getHash() const; + int getArgumentCount() const; + const SchemaArgument* getArgument(int idx) const; + + SchemaEventClassImpl* impl; + }; +} + +#endif + diff --git a/cpp/src/qmf/SchemaImpl.cpp b/cpp/src/qmf/SchemaImpl.cpp new file mode 100644 index 0000000000..57d6148cac --- /dev/null +++ b/cpp/src/qmf/SchemaImpl.cpp @@ -0,0 +1,740 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "SchemaImpl.h" +#include <qpid/framing/Buffer.h> +#include <qpid/framing/FieldTable.h> +#include <string> +#include <vector> + +using namespace std; +using namespace qmf; +using qpid::framing::Buffer; +using qpid::framing::FieldTable; + +SchemaHash::SchemaHash() +{ + for (int idx = 0; idx < 16; idx++) + hash[idx] = 0x5A; +} + +void SchemaHash::encode(Buffer& buffer) +{ + buffer.putBin128(hash); +} + +void SchemaHash::decode(Buffer& buffer) +{ + buffer.getBin128(hash); +} + +void SchemaHash::update(uint8_t data) +{ + update((char*) &data, 1); +} + +void SchemaHash::update(const char* data, uint32_t len) +{ + uint64_t* first = (uint64_t*) hash; + uint64_t* second = (uint64_t*) hash + 1; + + for (uint32_t idx = 0; idx < len; idx++) { + *first = *first ^ (uint64_t) data[idx]; + *second = *second << 1; + *second |= ((*first & 0x8000000000000000LL) >> 63); + *first = *first << 1; + *first = *first ^ *second; + } +} + +SchemaArgumentImpl::SchemaArgumentImpl(Buffer& buffer) : envelope(new SchemaArgument(this)) +{ + FieldTable map; + map.decode(buffer); + + name = map.getAsString("name"); + typecode = (Typecode) map.getAsInt("type"); + unit = map.getAsString("unit"); + description = map.getAsString("desc"); + + dir = DIR_IN; + string dstr(map.getAsString("dir")); + if (dstr == "O") + dir = DIR_OUT; + else if (dstr == "IO") + dir = DIR_IN_OUT; +} + +void SchemaArgumentImpl::encode(Buffer& buffer) const +{ + FieldTable map; + + map.setString("name", name); + map.setInt("type", (int) typecode); + if (dir == DIR_IN) + map.setString("dir", "I"); + else if (dir == DIR_OUT) + map.setString("dir", "O"); + else + map.setString("dir", "IO"); + if (!unit.empty()) + map.setString("unit", unit); + if (!description.empty()) + map.setString("desc", description); + + map.encode(buffer); +} + +void SchemaArgumentImpl::updateHash(SchemaHash& hash) const +{ + hash.update(name); + hash.update(typecode); + hash.update(dir); + hash.update(unit); + hash.update(description); +} + +SchemaMethodImpl::SchemaMethodImpl(Buffer& buffer) : envelope(new SchemaMethod(this)) +{ + FieldTable map; + int argCount; + + map.decode(buffer); + name = map.getAsString("name"); + argCount = map.getAsInt("argCount"); + description = map.getAsString("desc"); + + for (int idx = 0; idx < argCount; idx++) { + SchemaArgumentImpl* arg = new SchemaArgumentImpl(buffer); + addArgument(*arg->envelope); + } +} + +void SchemaMethodImpl::encode(Buffer& buffer) const +{ + FieldTable map; + + map.setString("name", name); + map.setInt("argCount", arguments.size()); + if (!description.empty()) + map.setString("desc", description); + map.encode(buffer); + + for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin(); + iter != arguments.end(); iter++) + (*iter)->encode(buffer); +} + +void SchemaMethodImpl::addArgument(const SchemaArgument& argument) +{ + arguments.push_back(argument.impl); +} + +const SchemaArgument* SchemaMethodImpl::getArgument(int idx) const +{ + int count = 0; + for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin(); + iter != arguments.end(); iter++, count++) + if (idx == count) + return (*iter)->envelope; + return 0; +} + +void SchemaMethodImpl::updateHash(SchemaHash& hash) const +{ + hash.update(name); + hash.update(description); + for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin(); + iter != arguments.end(); iter++) + (*iter)->updateHash(hash); +} + +SchemaPropertyImpl::SchemaPropertyImpl(Buffer& buffer) : envelope(new SchemaProperty(this)) +{ + FieldTable map; + map.decode(buffer); + + name = map.getAsString("name"); + typecode = (Typecode) map.getAsInt("type"); + access = (Access) map.getAsInt("access"); + index = map.getAsInt("index") != 0; + optional = map.getAsInt("optional") != 0; + unit = map.getAsString("unit"); + description = map.getAsString("desc"); +} + +void SchemaPropertyImpl::encode(Buffer& buffer) const +{ + FieldTable map; + + map.setString("name", name); + map.setInt("type", (int) typecode); + map.setInt("access", (int) access); + map.setInt("index", index ? 1 : 0); + map.setInt("optional", optional ? 1 : 0); + if (!unit.empty()) + map.setString("unit", unit); + if (!description.empty()) + map.setString("desc", description); + + map.encode(buffer); +} + +void SchemaPropertyImpl::updateHash(SchemaHash& hash) const +{ + hash.update(name); + hash.update(typecode); + hash.update(access); + hash.update(index); + hash.update(optional); + hash.update(unit); + hash.update(description); +} + +SchemaStatisticImpl::SchemaStatisticImpl(Buffer& buffer) : envelope(new SchemaStatistic(this)) +{ + FieldTable map; + map.decode(buffer); + + name = map.getAsString("name"); + typecode = (Typecode) map.getAsInt("type"); + unit = map.getAsString("unit"); + description = map.getAsString("desc"); +} + +void SchemaStatisticImpl::encode(Buffer& buffer) const +{ + FieldTable map; + + map.setString("name", name); + map.setInt("type", (int) typecode); + if (!unit.empty()) + map.setString("unit", unit); + if (!description.empty()) + map.setString("desc", description); + + map.encode(buffer); +} + +void SchemaStatisticImpl::updateHash(SchemaHash& hash) const +{ + hash.update(name); + hash.update(typecode); + hash.update(unit); + hash.update(description); +} + +SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : envelope(new SchemaObjectClass(this)), hasHash(true) +{ + buffer.getShortString(package); + buffer.getShortString(name); + hash.decode(buffer); + + uint16_t propCount = buffer.getShort(); + uint16_t statCount = buffer.getShort(); + uint16_t methodCount = buffer.getShort(); + + for (uint16_t idx = 0; idx < propCount; idx++) { + SchemaPropertyImpl* property = new SchemaPropertyImpl(buffer); + addProperty(*property->envelope); + } + + for (uint16_t idx = 0; idx < statCount; idx++) { + SchemaStatisticImpl* statistic = new SchemaStatisticImpl(buffer); + addStatistic(*statistic->envelope); + } + + for (uint16_t idx = 0; idx < methodCount; idx++) { + SchemaMethodImpl* method = new SchemaMethodImpl(buffer); + addMethod(*method->envelope); + } +} + +void SchemaObjectClassImpl::encode(Buffer& buffer) const +{ + buffer.putOctet((uint8_t) CLASS_OBJECT); + buffer.putShortString(package); + buffer.putShortString(name); + hash.encode(buffer); + buffer.putShort((uint16_t) properties.size()); + buffer.putShort((uint16_t) statistics.size()); + buffer.putShort((uint16_t) methods.size()); + + for (vector<SchemaPropertyImpl*>::const_iterator iter = properties.begin(); + iter != properties.end(); iter++) + (*iter)->encode(buffer); + for (vector<SchemaStatisticImpl*>::const_iterator iter = statistics.begin(); + iter != statistics.end(); iter++) + (*iter)->encode(buffer); + for (vector<SchemaMethodImpl*>::const_iterator iter = methods.begin(); + iter != methods.end(); iter++) + (*iter)->encode(buffer); +} + +const uint8_t* SchemaObjectClassImpl::getHash() const +{ + if (!hasHash) { + hasHash = true; + hash.update(package); + hash.update(name); + for (vector<SchemaPropertyImpl*>::const_iterator iter = properties.begin(); + iter != properties.end(); iter++) + (*iter)->updateHash(hash); + for (vector<SchemaStatisticImpl*>::const_iterator iter = statistics.begin(); + iter != statistics.end(); iter++) + (*iter)->updateHash(hash); + for (vector<SchemaMethodImpl*>::const_iterator iter = methods.begin(); + iter != methods.end(); iter++) + (*iter)->updateHash(hash); + } + + return hash.get(); +} + +void SchemaObjectClassImpl::addProperty(const SchemaProperty& property) +{ + properties.push_back(property.impl); +} + +void SchemaObjectClassImpl::addStatistic(const SchemaStatistic& statistic) +{ + statistics.push_back(statistic.impl); +} + +void SchemaObjectClassImpl::addMethod(const SchemaMethod& method) +{ + methods.push_back(method.impl); +} + +const SchemaProperty* SchemaObjectClassImpl::getProperty(int idx) const +{ + int count = 0; + for (vector<SchemaPropertyImpl*>::const_iterator iter = properties.begin(); + iter != properties.end(); iter++, count++) + if (idx == count) + return (*iter)->envelope; + return 0; +} + +const SchemaStatistic* SchemaObjectClassImpl::getStatistic(int idx) const +{ + int count = 0; + for (vector<SchemaStatisticImpl*>::const_iterator iter = statistics.begin(); + iter != statistics.end(); iter++, count++) + if (idx == count) + return (*iter)->envelope; + return 0; +} + +const SchemaMethod* SchemaObjectClassImpl::getMethod(int idx) const +{ + int count = 0; + for (vector<SchemaMethodImpl*>::const_iterator iter = methods.begin(); + iter != methods.end(); iter++, count++) + if (idx == count) + return (*iter)->envelope; + return 0; +} + +SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : envelope(new SchemaEventClass(this)), hasHash(true) +{ + buffer.getShortString(package); + buffer.getShortString(name); + hash.decode(buffer); + + uint16_t argCount = buffer.getShort(); + + for (uint16_t idx = 0; idx < argCount; idx++) { + SchemaArgumentImpl* argument = new SchemaArgumentImpl(buffer); + addArgument(*argument->envelope); + } +} + +void SchemaEventClassImpl::encode(Buffer& buffer) const +{ + buffer.putOctet((uint8_t) CLASS_EVENT); + buffer.putShortString(package); + buffer.putShortString(name); + hash.encode(buffer); + buffer.putShort((uint16_t) arguments.size()); + + for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin(); + iter != arguments.end(); iter++) + (*iter)->encode(buffer); +} + +const uint8_t* SchemaEventClassImpl::getHash() const +{ + if (!hasHash) { + hasHash = true; + hash.update(package); + hash.update(name); + for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin(); + iter != arguments.end(); iter++) + (*iter)->updateHash(hash); + } + return hash.get(); +} + +void SchemaEventClassImpl::addArgument(const SchemaArgument& argument) +{ + arguments.push_back(argument.impl); +} + +const SchemaArgument* SchemaEventClassImpl::getArgument(int idx) const +{ + int count = 0; + for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin(); + iter != arguments.end(); iter++, count++) + if (idx == count) + return (*iter)->envelope; + return 0; +} + +//================================================================== +// Wrappers +//================================================================== + +SchemaArgument::SchemaArgument(const char* name, Typecode typecode) +{ + impl = new SchemaArgumentImpl(this, name, typecode); +} + +SchemaArgument::SchemaArgument(SchemaArgumentImpl* i) : impl(i) {} + +SchemaArgument::~SchemaArgument() +{ + delete impl; +} + +void SchemaArgument::setDirection(Direction dir) +{ + impl->setDirection(dir); +} + +void SchemaArgument::setUnit(const char* val) +{ + impl->setUnit(val); +} + +void SchemaArgument::setDesc(const char* desc) +{ + impl->setDesc(desc); +} + +const char* SchemaArgument::getName() const +{ + return impl->getName().c_str(); +} + +Typecode SchemaArgument::getType() const +{ + return impl->getType(); +} + +Direction SchemaArgument::getDirection() const +{ + return impl->getDirection(); +} + +const char* SchemaArgument::getUnit() const +{ + return impl->getUnit().c_str(); +} + +const char* SchemaArgument::getDesc() const +{ + return impl->getDesc().c_str(); +} + +SchemaMethod::SchemaMethod(const char* name) +{ + impl = new SchemaMethodImpl(this, name); +} + +SchemaMethod::SchemaMethod(SchemaMethodImpl* i) : impl(i) {} + +SchemaMethod::~SchemaMethod() +{ + delete impl; +} + +void SchemaMethod::addArgument(const SchemaArgument& argument) +{ + impl->addArgument(argument); +} + +void SchemaMethod::setDesc(const char* desc) +{ + impl->setDesc(desc); +} + +const char* SchemaMethod::getName() const +{ + return impl->getName().c_str(); +} + +const char* SchemaMethod::getDesc() const +{ + return impl->getDesc().c_str(); +} + +int SchemaMethod::getArgumentCount() const +{ + return impl->getArgumentCount(); +} + +const SchemaArgument* SchemaMethod::getArgument(int idx) const +{ + return impl->getArgument(idx); +} + +SchemaProperty::SchemaProperty(const char* name, Typecode typecode) +{ + impl = new SchemaPropertyImpl(this, name, typecode); +} + +SchemaProperty::SchemaProperty(SchemaPropertyImpl* i) : impl(i) {} + +SchemaProperty::~SchemaProperty() +{ + delete impl; +} + +void SchemaProperty::setAccess(Access access) +{ + impl->setAccess(access); +} + +void SchemaProperty::setIndex(bool val) +{ + impl->setIndex(val); +} + +void SchemaProperty::setOptional(bool val) +{ + impl->setOptional(val); +} + +void SchemaProperty::setUnit(const char* val) +{ + impl->setUnit(val); +} + +void SchemaProperty::setDesc(const char* desc) +{ + impl->setDesc(desc); +} + +const char* SchemaProperty::getName() const +{ + return impl->getName().c_str(); +} + +Typecode SchemaProperty::getType() const +{ + return impl->getType(); +} + +Access SchemaProperty::getAccess() const +{ + return impl->getAccess(); +} + +bool SchemaProperty::isIndex() const +{ + return impl->isIndex(); +} + +bool SchemaProperty::isOptional() const +{ + return impl->isOptional(); +} + +const char* SchemaProperty::getUnit() const +{ + return impl->getUnit().c_str(); +} + +const char* SchemaProperty::getDesc() const +{ + return impl->getDesc().c_str(); +} + +SchemaStatistic::SchemaStatistic(const char* name, Typecode typecode) +{ + impl = new SchemaStatisticImpl(this, name, typecode); +} + +SchemaStatistic::SchemaStatistic(SchemaStatisticImpl* i) : impl(i) {} + +SchemaStatistic::~SchemaStatistic() +{ + delete impl; +} + +void SchemaStatistic::setUnit(const char* val) +{ + impl->setUnit(val); +} + +void SchemaStatistic::setDesc(const char* desc) +{ + impl->setDesc(desc); +} + +const char* SchemaStatistic::getName() const +{ + return impl->getName().c_str(); +} + +Typecode SchemaStatistic::getType() const +{ + return impl->getType(); +} + +const char* SchemaStatistic::getUnit() const +{ + return impl->getUnit().c_str(); +} + +const char* SchemaStatistic::getDesc() const +{ + return impl->getDesc().c_str(); +} + +SchemaObjectClass::SchemaObjectClass(const char* package, const char* name) +{ + impl = new SchemaObjectClassImpl(this, package, name); +} + +SchemaObjectClass::SchemaObjectClass(SchemaObjectClassImpl* i) : impl(i) {} + +SchemaObjectClass::~SchemaObjectClass() +{ + delete impl; +} + +void SchemaObjectClass::addProperty(const SchemaProperty& property) +{ + impl->addProperty(property); +} + +void SchemaObjectClass::addStatistic(const SchemaStatistic& statistic) +{ + impl->addStatistic(statistic); +} + +void SchemaObjectClass::addMethod(const SchemaMethod& method) +{ + impl->addMethod(method); +} + +const char* SchemaObjectClass::getPackage() const +{ + return impl->getPackage().c_str(); +} + +const char* SchemaObjectClass::getName() const +{ + return impl->getName().c_str(); +} + +const uint8_t* SchemaObjectClass::getHash() const +{ + return impl->getHash(); +} + +int SchemaObjectClass::getPropertyCount() const +{ + return impl->getPropertyCount(); +} + +int SchemaObjectClass::getStatisticCount() const +{ + return impl->getStatisticCount(); +} + +int SchemaObjectClass::getMethodCount() const +{ + return impl->getMethodCount(); +} + +const SchemaProperty* SchemaObjectClass::getProperty(int idx) const +{ + return impl->getProperty(idx); +} + +const SchemaStatistic* SchemaObjectClass::getStatistic(int idx) const +{ + return impl->getStatistic(idx); +} + +const SchemaMethod* SchemaObjectClass::getMethod(int idx) const +{ + return impl->getMethod(idx); +} + +SchemaEventClass::SchemaEventClass(const char* package, const char* name) +{ + impl = new SchemaEventClassImpl(this, package, name); +} + +SchemaEventClass::SchemaEventClass(SchemaEventClassImpl* i) : impl(i) {} + +SchemaEventClass::~SchemaEventClass() +{ + delete impl; +} + +void SchemaEventClass::addArgument(const SchemaArgument& argument) +{ + impl->addArgument(argument); +} + +void SchemaEventClass::setDesc(const char* desc) +{ + impl->setDesc(desc); +} + +const char* SchemaEventClass::getPackage() const +{ + return impl->getPackage().c_str(); +} + +const char* SchemaEventClass::getName() const +{ + return impl->getName().c_str(); +} + +const uint8_t* SchemaEventClass::getHash() const +{ + return impl->getHash(); +} + +int SchemaEventClass::getArgumentCount() const +{ + return impl->getArgumentCount(); +} + +const SchemaArgument* SchemaEventClass::getArgument(int idx) const +{ + return impl->getArgument(idx); +} + diff --git a/cpp/src/qmf/SchemaImpl.h b/cpp/src/qmf/SchemaImpl.h new file mode 100644 index 0000000000..fbc156c538 --- /dev/null +++ b/cpp/src/qmf/SchemaImpl.h @@ -0,0 +1,195 @@ +#ifndef _QmfSchemaImpl_ +#define _QmfSchemaImpl_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "Schema.h" +#include <string> +#include <vector> +#include <qpid/framing/Buffer.h> + +namespace qmf { + + // TODO: Destructors for schema classes + // TODO: Add "frozen" attribute for schema classes so they can't be modified after + // they've been registered. + + class SchemaHash { + uint8_t hash[16]; + public: + SchemaHash(); + void encode(qpid::framing::Buffer& buffer); + void decode(qpid::framing::Buffer& buffer); + void update(const char* data, uint32_t len); + void update(uint8_t data); + void update(const std::string& data) { update(data.c_str(), data.size()); } + void update(Typecode t) { update((uint8_t) t); } + void update(Direction d) { update((uint8_t) d); } + void update(Access a) { update((uint8_t) a); } + void update(bool b) { update((uint8_t) (b ? 1 : 0)); } + const uint8_t* get() const { return hash; } + }; + + struct SchemaArgumentImpl { + SchemaArgument* envelope; + std::string name; + Typecode typecode; + Direction dir; + std::string unit; + std::string description; + + SchemaArgumentImpl(SchemaArgument* e, const char* n, Typecode t) : + envelope(e), name(n), typecode(t), dir(DIR_IN) {} + SchemaArgumentImpl(qpid::framing::Buffer& buffer); + void encode(qpid::framing::Buffer& buffer) const; + void setDirection(Direction d) { dir = d; } + void setUnit(const char* val) { unit = val; } + void setDesc(const char* desc) { description = desc; } + const std::string& getName() const { return name; } + Typecode getType() const { return typecode; } + Direction getDirection() const { return dir; } + const std::string& getUnit() const { return unit; } + const std::string& getDesc() const { return description; } + void updateHash(SchemaHash& hash) const; + }; + + struct SchemaMethodImpl { + SchemaMethod* envelope; + std::string name; + std::string description; + std::vector<SchemaArgumentImpl*> arguments; + + SchemaMethodImpl(SchemaMethod* e, const char* n) : envelope(e), name(n) {} + SchemaMethodImpl(qpid::framing::Buffer& buffer); + void encode(qpid::framing::Buffer& buffer) const; + void addArgument(const SchemaArgument& argument); + void setDesc(const char* desc) { description = desc; } + const std::string& getName() const { return name; } + const std::string& getDesc() const { return description; } + int getArgumentCount() const { return arguments.size(); } + const SchemaArgument* getArgument(int idx) const; + void updateHash(SchemaHash& hash) const; + }; + + struct SchemaPropertyImpl { + SchemaProperty* envelope; + std::string name; + Typecode typecode; + Access access; + bool index; + bool optional; + std::string unit; + std::string description; + + SchemaPropertyImpl(SchemaProperty* e, const char* n, Typecode t) : + envelope(e), name(n), typecode(t), access(ACCESS_READ_ONLY), + index(false), optional(false) {} + SchemaPropertyImpl(qpid::framing::Buffer& buffer); + void encode(qpid::framing::Buffer& buffer) const; + void setAccess(Access a) { access = a; } + void setIndex(bool val) { index = val; } + void setOptional(bool val) { optional = val; } + void setUnit(const char* val) { unit = val; } + void setDesc(const char* desc) { description = desc; } + const std::string& getName() const { return name; } + Typecode getType() const { return typecode; } + Access getAccess() const { return access; } + bool isIndex() const { return index; } + bool isOptional() const { return optional; } + const std::string& getUnit() const { return unit; } + const std::string& getDesc() const { return description; } + void updateHash(SchemaHash& hash) const; + }; + + struct SchemaStatisticImpl { + SchemaStatistic* envelope; + std::string name; + Typecode typecode; + std::string unit; + std::string description; + + SchemaStatisticImpl(SchemaStatistic* e, const char* n, Typecode t) : + envelope(e), name(n), typecode(t) {} + SchemaStatisticImpl(qpid::framing::Buffer& buffer); + void encode(qpid::framing::Buffer& buffer) const; + void setUnit(const char* val) { unit = val; } + void setDesc(const char* desc) { description = desc; } + const std::string& getName() const { return name; } + Typecode getType() const { return typecode; } + const std::string& getUnit() const { return unit; } + const std::string& getDesc() const { return description; } + void updateHash(SchemaHash& hash) const; + }; + + struct SchemaObjectClassImpl { + SchemaObjectClass* envelope; + std::string package; + std::string name; + mutable SchemaHash hash; + mutable bool hasHash; + std::vector<SchemaPropertyImpl*> properties; + std::vector<SchemaStatisticImpl*> statistics; + std::vector<SchemaMethodImpl*> methods; + + SchemaObjectClassImpl(SchemaObjectClass* e, const char* p, const char* n) : + envelope(e), package(p), name(n), hasHash(false) {} + SchemaObjectClassImpl(qpid::framing::Buffer& buffer); + void encode(qpid::framing::Buffer& buffer) const; + void addProperty(const SchemaProperty& property); + void addStatistic(const SchemaStatistic& statistic); + void addMethod(const SchemaMethod& method); + + const std::string& getPackage() const { return package; } + const std::string& getName() const { return name; } + const uint8_t* getHash() const; + int getPropertyCount() const { return properties.size(); } + int getStatisticCount() const { return statistics.size(); } + int getMethodCount() const { return methods.size(); } + const SchemaProperty* getProperty(int idx) const; + const SchemaStatistic* getStatistic(int idx) const; + const SchemaMethod* getMethod(int idx) const; + }; + + struct SchemaEventClassImpl { + SchemaEventClass* envelope; + std::string package; + std::string name; + mutable SchemaHash hash; + mutable bool hasHash; + std::string description; + std::vector<SchemaArgumentImpl*> arguments; + + SchemaEventClassImpl(SchemaEventClass* e, const char* p, const char* n) : + envelope(e), package(p), name(n), hasHash(false) {} + SchemaEventClassImpl(qpid::framing::Buffer& buffer); + void encode(qpid::framing::Buffer& buffer) const; + void addArgument(const SchemaArgument& argument); + void setDesc(const char* desc) { description = desc; } + + const std::string& getPackage() const { return package; } + const std::string& getName() const { return name; } + const uint8_t* getHash() const; + int getArgumentCount() const { return arguments.size(); } + const SchemaArgument* getArgument(int idx) const; + }; +} + +#endif + diff --git a/cpp/src/qmf/Typecode.h b/cpp/src/qmf/Typecode.h new file mode 100644 index 0000000000..94614d2977 --- /dev/null +++ b/cpp/src/qmf/Typecode.h @@ -0,0 +1,51 @@ +#ifndef _QmfTypecode_ +#define _QmfTypecode_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace qmf { + + enum Typecode { + TYPE_UINT8 = 1, + TYPE_UINT16 = 2, + TYPE_UINT32 = 3, + TYPE_UINT64 = 4, + TYPE_SSTR = 6, + TYPE_LSTR = 7, + TYPE_ABSTIME = 8, + TYPE_DELTATIME = 9, + TYPE_REF = 10, + TYPE_BOOL = 11, + TYPE_FLOAT = 12, + TYPE_DOUBLE = 13, + TYPE_UUID = 14, + TYPE_MAP = 15, + TYPE_INT8 = 16, + TYPE_INT16 = 17, + TYPE_INT32 = 18, + TYPE_INT64 = 19, + TYPE_OBJECT = 20, + TYPE_LIST = 21, + TYPE_ARRAY = 22 + }; +} + +#endif + diff --git a/cpp/src/qmf/Value.h b/cpp/src/qmf/Value.h new file mode 100644 index 0000000000..7d54293e08 --- /dev/null +++ b/cpp/src/qmf/Value.h @@ -0,0 +1,113 @@ +#ifndef _QmfValue_ +#define _QmfValue_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/ObjectId.h> +#include <qmf/Typecode.h> + +namespace qmf { + + class Object; + class ValueImpl; + + class Value { + public: + Value(); + Value(Typecode t, Typecode arrayType = TYPE_UINT8); + Value(ValueImpl* impl); + ~Value(); + + Typecode getType() const; + bool isNull() const; + void setNull(); + + bool isObjectId() const; + const ObjectId& asObjectId() const; + void setObjectId(const ObjectId& oid); + + bool isUint() const; + uint32_t asUint() const; + void setUint(uint32_t val); + + bool isInt() const; + int32_t asInt() const; + void setInt(int32_t val); + + bool isUint64() const; + uint64_t asUint64() const; + void setUint64(uint64_t val); + + bool isInt64() const; + int64_t asInt64() const; + void setInt64(int64_t val); + + bool isString() const; + const char* asString() const; + void setString(const char* val); + + bool isBool() const; + bool asBool() const; + void setBool(bool val); + + bool isFloat() const; + float asFloat() const; + void setFloat(float val); + + bool isDouble() const; + double asDouble() const; + void setDouble(double val); + + bool isUuid() const; + const uint8_t* asUuid() const; + void setUuid(const uint8_t* val); + + bool isObject() const; + Object* asObject() const; + void setObject(Object* val); + + bool isMap() const; + bool keyInMap(const char* key) const; + Value* byKey(const char* key); + const Value* byKey(const char* key) const; + void deleteKey(const char* key); + void insert(const char* key, Value* val); + uint32_t keyCount() const; + const char* key(uint32_t idx) const; + + bool isList() const; + uint32_t listItemCount() const; + Value* listItem(uint32_t idx); + void appendToList(Value* val); + void deleteListItem(uint32_t idx); + + bool isArray() const; + Typecode arrayType() const; + uint32_t arrayItemCount() const; + Value* arrayItem(uint32_t idx); + void appendToArray(Value* val); + void deleteArrayItem(uint32_t idx); + + ValueImpl* impl; + }; +} + +#endif + diff --git a/cpp/src/qmf/ValueImpl.cpp b/cpp/src/qmf/ValueImpl.cpp new file mode 100644 index 0000000000..538255ea20 --- /dev/null +++ b/cpp/src/qmf/ValueImpl.cpp @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "ValueImpl.h" +#include <qpid/framing/FieldTable.h> + +using namespace std; +using namespace qmf; +using qpid::framing::Buffer; + +ValueImpl::ValueImpl(Typecode t, Buffer& buf) : envelope(new Value(this)), typecode(t) +{ + uint64_t first; + uint64_t second; + qpid::framing::FieldTable ft; + + switch (typecode) { + case TYPE_UINT8 : value.u32 = (uint32_t) buf.getOctet(); break; + case TYPE_UINT16 : value.u32 = (uint32_t) buf.getShort(); break; + case TYPE_UINT32 : value.u32 = (uint32_t) buf.getLong(); break; + case TYPE_UINT64 : value.u64 = buf.getLongLong(); break; + case TYPE_SSTR : buf.getShortString(stringVal); break; + case TYPE_LSTR : buf.getMediumString(stringVal); break; + case TYPE_ABSTIME : value.s64 = buf.getLongLong(); break; + case TYPE_DELTATIME : value.u64 = buf.getLongLong(); break; + case TYPE_BOOL : value.boolVal = (buf.getOctet() != 0); break; + case TYPE_FLOAT : value.floatVal = buf.getFloat(); break; + case TYPE_DOUBLE : value.doubleVal = buf.getDouble(); break; + case TYPE_INT8 : value.s32 = (int32_t) buf.getOctet(); break; + case TYPE_INT16 : value.s32 = (int32_t) buf.getShort(); break; + case TYPE_INT32 : value.s32 = (int32_t) buf.getLong(); break; + case TYPE_INT64 : value.s64 = buf.getLongLong(); break; + case TYPE_UUID : buf.getBin128(value.uuidVal); break; + case TYPE_REF: + first = buf.getLongLong(); + second = buf.getLongLong(); + refVal.impl->setValue(first, second); + break; + + case TYPE_MAP: + ft.decode(buf); + // TODO: either update to recursively use QMF types or reduce to int/string/... + // (maybe use another ctor with a FieldValue argument) + break; + + case TYPE_LIST: + case TYPE_ARRAY: + case TYPE_OBJECT: + default: + break; + } +} + +ValueImpl::ValueImpl(Typecode t) : envelope(new Value(this)), typecode(t) +{ + ::memset(&value, 0, sizeof(value)); +} + +ValueImpl::~ValueImpl() +{ +} + +void ValueImpl::encode(Buffer& buf) const +{ + switch (typecode) { + case TYPE_UINT8 : buf.putOctet((uint8_t) value.u32); break; + case TYPE_UINT16 : buf.putShort((uint16_t) value.u32); break; + case TYPE_UINT32 : buf.putLong(value.u32); break; + case TYPE_UINT64 : buf.putLongLong(value.u64); break; + case TYPE_SSTR : buf.putShortString(stringVal); break; + case TYPE_LSTR : buf.putMediumString(stringVal); break; + case TYPE_ABSTIME : buf.putLongLong(value.s64); break; + case TYPE_DELTATIME : buf.putLongLong(value.u64); break; + case TYPE_BOOL : buf.putOctet(value.boolVal ? 1 : 0); break; + case TYPE_FLOAT : buf.putFloat(value.floatVal); break; + case TYPE_DOUBLE : buf.putDouble(value.doubleVal); break; + case TYPE_INT8 : buf.putOctet((uint8_t) value.s32); break; + case TYPE_INT16 : buf.putShort((uint16_t) value.s32); break; + case TYPE_INT32 : buf.putLong(value.s32); break; + case TYPE_INT64 : buf.putLongLong(value.s64); break; + case TYPE_UUID : buf.putBin128(value.uuidVal); break; + case TYPE_REF : refVal.impl->encode(buf); break; + case TYPE_MAP: // TODO + case TYPE_LIST: + case TYPE_ARRAY: + case TYPE_OBJECT: + default: + break; + } +} + +bool ValueImpl::keyInMap(const char* key) const +{ + return typecode == TYPE_MAP && mapVal.count(key) > 0; +} + +Value* ValueImpl::byKey(const char* key) +{ + if (keyInMap(key)) { + map<std::string, VPtr>::iterator iter = mapVal.find(key); + if (iter != mapVal.end()) + return iter->second.get(); + } + return 0; +} + +const Value* ValueImpl::byKey(const char* key) const +{ + if (keyInMap(key)) { + map<std::string, VPtr>::const_iterator iter = mapVal.find(key); + if (iter != mapVal.end()) + return iter->second.get(); + } + return 0; +} + +void ValueImpl::deleteKey(const char* key) +{ + mapVal.erase(key); +} + +void ValueImpl::insert(const char* key, Value* val) +{ + mapVal[key] = VPtr(val); +} + +const char* ValueImpl::key(uint32_t idx) const +{ + map<std::string, VPtr>::const_iterator iter = mapVal.begin(); + for (uint32_t i = 0; i < idx; i++) { + if (iter == mapVal.end()) + break; + iter++; + } + + if (iter == mapVal.end()) + return 0; + else + return iter->first.c_str(); +} + +Value* ValueImpl::listItem(uint32_t) +{ + return 0; +} + +void ValueImpl::appendToList(Value*) +{ +} + +void ValueImpl::deleteListItem(uint32_t) +{ +} + +Value* ValueImpl::arrayItem(uint32_t) +{ + return 0; +} + +void ValueImpl::appendToArray(Value*) +{ +} + +void ValueImpl::deleteArrayItem(uint32_t) +{ +} + + +//================================================================== +// Wrappers +//================================================================== + +Value::Value(Typecode t, Typecode at) +{ + impl = new ValueImpl(this, t, at); +} + +Value::Value(ValueImpl* i) +{ + impl = i; +} + +Value::~Value() +{ + delete impl; +} + +Typecode Value::getType() const +{ + return impl->getType(); +} + +bool Value::isNull() const +{ + return impl->isNull(); +} + +void Value::setNull() +{ + impl->setNull(); +} + +bool Value::isObjectId() const +{ + return impl->isObjectId(); +} + +const ObjectId& Value::asObjectId() const +{ + return impl->asObjectId(); +} + +void Value::setObjectId(const ObjectId& oid) +{ + impl->setObjectId(oid); +} + +bool Value::isUint() const +{ + return impl->isUint(); +} + +uint32_t Value::asUint() const +{ + return impl->asUint(); +} + +void Value::setUint(uint32_t val) +{ + impl->setUint(val); +} + +bool Value::isInt() const +{ + return impl->isInt(); +} + +int32_t Value::asInt() const +{ + return impl->asInt(); +} + +void Value::setInt(int32_t val) +{ + impl->setInt(val); +} + +bool Value::isUint64() const +{ + return impl->isUint64(); +} + +uint64_t Value::asUint64() const +{ + return impl->asUint64(); +} + +void Value::setUint64(uint64_t val) +{ + impl->setUint64(val); +} + +bool Value::isInt64() const +{ + return impl->isInt64(); +} + +int64_t Value::asInt64() const +{ + return impl->asInt64(); +} + +void Value::setInt64(int64_t val) +{ + impl->setInt64(val); +} + +bool Value::isString() const +{ + return impl->isString(); +} + +const char* Value::asString() const +{ + return impl->asString(); +} + +void Value::setString(const char* val) +{ + impl->setString(val); +} + +bool Value::isBool() const +{ + return impl->isBool(); +} + +bool Value::asBool() const +{ + return impl->asBool(); +} + +void Value::setBool(bool val) +{ + impl->setBool(val); +} + +bool Value::isFloat() const +{ + return impl->isFloat(); +} + +float Value::asFloat() const +{ + return impl->asFloat(); +} + +void Value::setFloat(float val) +{ + impl->setFloat(val); +} + +bool Value::isDouble() const +{ + return impl->isDouble(); +} + +double Value::asDouble() const +{ + return impl->asDouble(); +} + +void Value::setDouble(double val) +{ + impl->setDouble(val); +} + +bool Value::isUuid() const +{ + return impl->isUuid(); +} + +const uint8_t* Value::asUuid() const +{ + return impl->asUuid(); +} + +void Value::setUuid(const uint8_t* val) +{ + impl->setUuid(val); +} + +bool Value::isObject() const +{ + return impl->isObject(); +} + +Object* Value::asObject() const +{ + return impl->asObject(); +} + +void Value::setObject(Object* val) +{ + impl->setObject(val); +} + +bool Value::isMap() const +{ + return impl->isMap(); +} + +bool Value::keyInMap(const char* key) const +{ + return impl->keyInMap(key); +} + +Value* Value::byKey(const char* key) +{ + return impl->byKey(key); +} + +const Value* Value::byKey(const char* key) const +{ + return impl->byKey(key); +} + +void Value::deleteKey(const char* key) +{ + impl->deleteKey(key); +} + +void Value::insert(const char* key, Value* val) +{ + impl->insert(key, val); +} + +uint32_t Value::keyCount() const +{ + return impl->keyCount(); +} + +const char* Value::key(uint32_t idx) const +{ + return impl->key(idx); +} + +bool Value::isList() const +{ + return impl->isList(); +} + +uint32_t Value::listItemCount() const +{ + return impl->listItemCount(); +} + +Value* Value::listItem(uint32_t idx) +{ + return impl->listItem(idx); +} + +void Value::appendToList(Value* val) +{ + impl->appendToList(val); +} + +void Value::deleteListItem(uint32_t idx) +{ + impl->deleteListItem(idx); +} + +bool Value::isArray() const +{ + return impl->isArray(); +} + +Typecode Value::arrayType() const +{ + return impl->arrayType(); +} + +uint32_t Value::arrayItemCount() const +{ + return impl->arrayItemCount(); +} + +Value* Value::arrayItem(uint32_t idx) +{ + return impl->arrayItem(idx); +} + +void Value::appendToArray(Value* val) +{ + impl->appendToArray(val); +} + +void Value::deleteArrayItem(uint32_t idx) +{ + impl->deleteArrayItem(idx); +} + diff --git a/cpp/src/qmf/ValueImpl.h b/cpp/src/qmf/ValueImpl.h new file mode 100644 index 0000000000..cf33035bf7 --- /dev/null +++ b/cpp/src/qmf/ValueImpl.h @@ -0,0 +1,144 @@ +#ifndef _QmfValueImpl_ +#define _QmfValueImpl_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/Value.h> +#include <qmf/ObjectIdImpl.h> +#include <qmf/Object.h> +#include <qpid/framing/Buffer.h> +#include <string> +#include <string.h> +#include <map> +#include <vector> +#include <boost/shared_ptr.hpp> + +namespace qmf { + + // TODO: set valid flag on all value settors + // TODO: add a modified flag and accessors + + struct ValueImpl { + typedef boost::shared_ptr<Value> VPtr; + typedef boost::shared_ptr<Object> OPtr; + Value* envelope; + const Typecode typecode; + bool valid; + + ObjectId refVal; + std::string stringVal; + OPtr objectVal; + std::map<std::string, VPtr> mapVal; + std::vector<VPtr> vectorVal; + Typecode arrayTypecode; + + union { + uint32_t u32; + uint64_t u64; + int32_t s32; + int64_t s64; + bool boolVal; + float floatVal; + double doubleVal; + uint8_t uuidVal[16]; + } value; + + ValueImpl(Value* e, Typecode t, Typecode at) : + envelope(e), typecode(t), valid(false), arrayTypecode(at) {} + ValueImpl(Typecode t, qpid::framing::Buffer& b); + ValueImpl(Typecode t); + ~ValueImpl(); + + void encode(qpid::framing::Buffer& b) const; + + Typecode getType() const { return typecode; } + bool isNull() const { return !valid; } + void setNull() { valid = false; } + + bool isObjectId() const { return typecode == TYPE_REF; } + const ObjectId& asObjectId() const { return refVal; } + void setObjectId(const ObjectId& o) { refVal = o; } // TODO + + bool isUint() const { return typecode >= TYPE_UINT8 && typecode <= TYPE_UINT32; } + uint32_t asUint() const { return value.u32; } + void setUint(uint32_t val) { value.u32 = val; } + + bool isInt() const { return typecode >= TYPE_INT8 && typecode <= TYPE_INT32; } + int32_t asInt() const { return value.s32; } + void setInt(int32_t val) { value.s32 = val; } + + bool isUint64() const { return typecode == TYPE_UINT64 || typecode == TYPE_DELTATIME; } + uint64_t asUint64() const { return value.u64; } + void setUint64(uint64_t val) { value.u64 = val; } + + bool isInt64() const { return typecode == TYPE_INT64 || typecode == TYPE_ABSTIME; } + int64_t asInt64() const { return value.s64; } + void setInt64(int64_t val) { value.s64 = val; } + + bool isString() const { return typecode == TYPE_SSTR || typecode == TYPE_LSTR; } + const char* asString() const { return stringVal.c_str(); } + void setString(const char* val) { stringVal = val; } + + bool isBool() const { return typecode == TYPE_BOOL; } + bool asBool() const { return value.boolVal; } + void setBool(bool val) { value.boolVal = val; } + + bool isFloat() const { return typecode == TYPE_FLOAT; } + float asFloat() const { return value.floatVal; } + void setFloat(float val) { value.floatVal = val; } + + bool isDouble() const { return typecode == TYPE_DOUBLE; } + double asDouble() const { return value.doubleVal; } + void setDouble(double val) { value.doubleVal = val; } + + bool isUuid() const { return typecode == TYPE_UUID; } + const uint8_t* asUuid() const { return value.uuidVal; } + void setUuid(const uint8_t* val) { ::memcpy(value.uuidVal, val, 16); } + + bool isObject() const { return typecode == TYPE_OBJECT; } + Object* asObject() const { return objectVal.get(); } + void setObject(Object* val) { objectVal.reset(val); } + + bool isMap() const { return typecode == TYPE_MAP; } + bool keyInMap(const char* key) const; + Value* byKey(const char* key); + const Value* byKey(const char* key) const; + void deleteKey(const char* key); + void insert(const char* key, Value* val); + uint32_t keyCount() const { return mapVal.size(); } + const char* key(uint32_t idx) const; + + bool isList() const { return typecode == TYPE_LIST; } + uint32_t listItemCount() const { return vectorVal.size(); } + Value* listItem(uint32_t idx); + void appendToList(Value* val); + void deleteListItem(uint32_t idx); + + bool isArray() const { return typecode == TYPE_ARRAY; } + Typecode arrayType() const { return arrayTypecode; } + uint32_t arrayItemCount() const { return vectorVal.size(); } + Value* arrayItem(uint32_t idx); + void appendToArray(Value* val); + void deleteArrayItem(uint32_t idx); + }; +} + +#endif + |