summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf')
-rw-r--r--qpid/cpp/src/qmf/AgentEngine.h207
-rw-r--r--qpid/cpp/src/qmf/ConsoleEngine.cpp1091
-rw-r--r--qpid/cpp/src/qmf/ConsoleEngine.h222
-rw-r--r--qpid/cpp/src/qmf/Event.h30
-rw-r--r--qpid/cpp/src/qmf/Message.h39
-rw-r--r--qpid/cpp/src/qmf/Object.h48
-rw-r--r--qpid/cpp/src/qmf/ObjectId.h53
-rw-r--r--qpid/cpp/src/qmf/Query.h104
-rw-r--r--qpid/cpp/src/qmf/ResilientConnection.h163
-rw-r--r--qpid/cpp/src/qmf/Schema.h171
-rw-r--r--qpid/cpp/src/qmf/Typecode.h51
-rw-r--r--qpid/cpp/src/qmf/Value.h113
-rw-r--r--qpid/cpp/src/qmf/engine/Agent.cpp (renamed from qpid/cpp/src/qmf/AgentEngine.cpp)248
-rw-r--r--qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp763
-rw-r--r--qpid/cpp/src/qmf/engine/BrokerProxyImpl.h239
-rw-r--r--qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp (renamed from qpid/cpp/src/qmf/ConnectionSettingsImpl.cpp)104
-rw-r--r--qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h (renamed from qpid/cpp/src/qmf/ConnectionSettingsImpl.h)19
-rw-r--r--qpid/cpp/src/qmf/engine/ConsoleImpl.cpp419
-rw-r--r--qpid/cpp/src/qmf/engine/ConsoleImpl.h145
-rw-r--r--qpid/cpp/src/qmf/engine/MessageImpl.cpp (renamed from qpid/cpp/src/qmf/MessageImpl.cpp)4
-rw-r--r--qpid/cpp/src/qmf/engine/MessageImpl.h (renamed from qpid/cpp/src/qmf/MessageImpl.h)8
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp (renamed from qpid/cpp/src/qmf/ObjectIdImpl.cpp)96
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectIdImpl.h (renamed from qpid/cpp/src/qmf/ObjectIdImpl.h)17
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectImpl.cpp (renamed from qpid/cpp/src/qmf/ObjectImpl.cpp)56
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectImpl.h (renamed from qpid/cpp/src/qmf/ObjectImpl.h)33
-rw-r--r--qpid/cpp/src/qmf/engine/Protocol.cpp (renamed from qpid/cpp/src/qmf/Protocol.cpp)4
-rw-r--r--qpid/cpp/src/qmf/engine/Protocol.h (renamed from qpid/cpp/src/qmf/Protocol.h)6
-rw-r--r--qpid/cpp/src/qmf/engine/QueryImpl.cpp (renamed from qpid/cpp/src/qmf/QueryImpl.cpp)15
-rw-r--r--qpid/cpp/src/qmf/engine/QueryImpl.h (renamed from qpid/cpp/src/qmf/QueryImpl.h)34
-rw-r--r--qpid/cpp/src/qmf/engine/ResilientConnection.cpp (renamed from qpid/cpp/src/qmf/ResilientConnection.cpp)53
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.cpp (renamed from qpid/cpp/src/qmf/SchemaImpl.cpp)221
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.h (renamed from qpid/cpp/src/qmf/SchemaImpl.h)75
-rw-r--r--qpid/cpp/src/qmf/engine/SequenceManager.cpp (renamed from qpid/cpp/src/qmf/SequenceManager.cpp)10
-rw-r--r--qpid/cpp/src/qmf/engine/SequenceManager.h (renamed from qpid/cpp/src/qmf/SequenceManager.h)10
-rw-r--r--qpid/cpp/src/qmf/engine/ValueImpl.cpp (renamed from qpid/cpp/src/qmf/ValueImpl.cpp)390
-rw-r--r--qpid/cpp/src/qmf/engine/ValueImpl.h (renamed from qpid/cpp/src/qmf/ValueImpl.h)32
36 files changed, 2210 insertions, 3083 deletions
diff --git a/qpid/cpp/src/qmf/AgentEngine.h b/qpid/cpp/src/qmf/AgentEngine.h
deleted file mode 100644
index c88ef33657..0000000000
--- a/qpid/cpp/src/qmf/AgentEngine.h
+++ /dev/null
@@ -1,207 +0,0 @@
-#ifndef _QmfAgentEngine_
-#define _QmfAgentEngine_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/Schema.h>
-#include <qmf/ObjectId.h>
-#include <qmf/Object.h>
-#include <qmf/Event.h>
-#include <qmf/Query.h>
-#include <qmf/Value.h>
-#include <qmf/Message.h>
-
-namespace qmf {
-
- /**
- * AgentEvent
- *
- * This structure represents a QMF event coming from the agent to
- * the application.
- */
- struct AgentEvent {
- enum EventKind {
- GET_QUERY = 1,
- START_SYNC = 2,
- END_SYNC = 3,
- METHOD_CALL = 4,
- DECLARE_QUEUE = 5,
- DELETE_QUEUE = 6,
- BIND = 7,
- UNBIND = 8,
- SETUP_COMPLETE = 9
- };
-
- EventKind kind;
- uint32_t sequence; // Protocol sequence (for all kinds)
- char* authUserId; // Authenticated user ID (for all kinds)
- char* authToken; // Authentication token if issued (for all kinds)
- char* name; // Name of the method/sync query
- // (METHOD_CALL, START_SYNC, END_SYNC, DECLARE_QUEUE, BIND, UNBIND)
- Object* object; // Object involved in method call (METHOD_CALL)
- ObjectId* objectId; // ObjectId for method call (METHOD_CALL)
- Query* query; // Query parameters (GET_QUERY, START_SYNC)
- Value* arguments; // Method parameters (METHOD_CALL)
- char* exchange; // Exchange for bind (BIND, UNBIND)
- char* bindingKey; // Key for bind (BIND, UNBIND)
- SchemaObjectClass* objectClass; // (METHOD_CALL)
- };
-
- class AgentEngineImpl;
-
- /**
- * AgentEngine - Protocol engine for the QMF agent
- */
- class AgentEngine {
- public:
- AgentEngine(char* label, bool internalStore=true);
- ~AgentEngine();
-
- /**
- * Configure the directory path for storing persistent data.
- *@param path Null-terminated string containing a directory path where files can be
- * created, written, and read. If NULL, no persistent storage will be
- * attempted.
- */
- void setStoreDir(const char* path);
-
- /**
- * Configure the directory path for files transferred over QMF.
- *@param path Null-terminated string containing a directory path where files can be
- * created, deleted, written, and read. If NULL, file transfers shall not
- * be permitted.
- */
- void setTransferDir(const char* path);
-
- /**
- * Pass messages received from the AMQP session to the Agent engine.
- *@param message AMQP messages received on the agent session.
- */
- void handleRcvMessage(Message& message);
-
- /**
- * Get the next message to be sent to the AMQP network.
- *@param item The Message structure describing the message to be produced.
- *@return true if the Message is valid, false if there are no messages to send.
- */
- bool getXmtMessage(Message& item) const;
-
- /**
- * Remove and discard one message from the head of the transmit queue.
- */
- void popXmt();
-
- /**
- * Get the next application event from the agent engine.
- *@param event The event iff the return value is true
- *@return true if event is valid, false if there are no events to process
- */
- bool getEvent(AgentEvent& event) const;
-
- /**
- * Remove and discard one event from the head of the event queue.
- */
- void popEvent();
-
- /**
- * A new AMQP session has been established for Agent communication.
- */
- void newSession();
-
- /**
- * Start the QMF Agent protocol. This should be invoked after a SETUP_COMPLETE event
- * is received from the Agent engine.
- */
- void startProtocol();
-
- /**
- * This method is called periodically so the agent can supply a heartbeat.
- */
- void heartbeat();
-
- /**
- * Respond to a method request.
- *@param sequence The sequence number from the method request event.
- *@param status The method's completion status.
- *@param text Status text ("OK" or an error message)
- *@param arguments The list of output arguments from the method call.
- */
- void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
-
- /**
- * Send a content indication to the QMF bus. This is only needed for objects that are
- * managed by the application. This is *NOT* needed for objects managed by the Agent
- * (inserted using addObject).
- *@param sequence The sequence number of the GET request or the SYNC_START request.
- *@param object The object (annotated with "changed" flags) for publication.
- *@param prop If true, changed object properties are transmitted.
- *@param stat If true, changed object statistics are transmitted.
- */
- void queryResponse(uint32_t sequence, Object& object, bool prop = true, bool stat = true);
-
- /**
- * Indicate the completion of a query. This is not used for SYNC_START requests.
- *@param sequence The sequence number of the GET request.
- */
- void queryComplete(uint32_t sequence);
-
- /**
- * Register a schema class with the Agent.
- *@param cls A SchemaObejctClass object that defines data managed by the agent.
- */
- void registerClass(SchemaObjectClass* cls);
-
- /**
- * Register a schema class with the Agent.
- *@param cls A SchemaEventClass object that defines events sent by the agent.
- */
- void registerClass(SchemaEventClass* cls);
-
- /**
- * Give an object to the Agent for storage and management. Once added, the agent takes
- * responsibility for the life cycle of the object.
- *@param obj The object to be managed by the Agent.
- *@param persistId A unique non-zero value if the object-id is to be persistent.
- *@return The objectId of the managed object.
- */
- const ObjectId* addObject(Object& obj, uint64_t persistId);
- // const ObjectId* addObject(Object& obj, uint32_t persistIdLo, uint32_t persistIdHi);
-
- /**
- * Allocate an object-id for an object that will be managed by the application.
- *@param persistId A unique non-zero value if the object-id is to be persistent.
- *@return The objectId structure for the allocated ID.
- */
- const ObjectId* allocObjectId(uint64_t persistId);
- const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
-
- /**
- * Raise an event into the QMF network..
- *@param event The event object for the event to be raised.
- */
- void raiseEvent(Event& event);
-
- private:
- AgentEngineImpl* impl;
- };
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/ConsoleEngine.cpp b/qpid/cpp/src/qmf/ConsoleEngine.cpp
deleted file mode 100644
index e7991328ee..0000000000
--- a/qpid/cpp/src/qmf/ConsoleEngine.cpp
+++ /dev/null
@@ -1,1091 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/ConsoleEngine.h"
-#include "qmf/MessageImpl.h"
-#include "qmf/SchemaImpl.h"
-#include "qmf/Typecode.h"
-#include "qmf/ObjectImpl.h"
-#include "qmf/ObjectIdImpl.h"
-#include "qmf/QueryImpl.h"
-#include "qmf/ValueImpl.h"
-#include "qmf/Protocol.h"
-#include "qmf/SequenceManager.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
-#include <qpid/sys/Mutex.h>
-#include <qpid/log/Statement.h>
-#include <qpid/sys/Time.h>
-#include <qpid/sys/SystemInfo.h>
-#include <string.h>
-#include <string>
-#include <deque>
-#include <map>
-#include <vector>
-#include <iostream>
-#include <fstream>
-#include <boost/shared_ptr.hpp>
-
-using namespace std;
-using namespace qmf;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-namespace qmf {
-
- struct MethodResponseImpl {
- typedef boost::shared_ptr<MethodResponseImpl> Ptr;
- MethodResponse* envelope;
- uint32_t status;
- auto_ptr<Value> exception;
- auto_ptr<Value> arguments;
-
- MethodResponseImpl(Buffer& buf);
- ~MethodResponseImpl() { delete envelope; }
- uint32_t getStatus() const { return status; }
- const Value* getException() const { return exception.get(); }
- const Value* getArgs() const { return arguments.get(); }
- };
-
- struct QueryResponseImpl {
- typedef boost::shared_ptr<QueryResponseImpl> Ptr;
- QueryResponse *envelope;
- uint32_t status;
- auto_ptr<Value> exception;
- vector<ObjectImpl::Ptr> results;
-
- QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {}
- ~QueryResponseImpl() { delete envelope; }
- uint32_t getStatus() const { return status; }
- const Value* getException() const { return exception.get(); }
- uint32_t getObjectCount() const { return results.size(); }
- const Object* getObject(uint32_t idx) const;
- };
-
- struct ConsoleEventImpl {
- typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
- ConsoleEvent::EventKind kind;
- boost::shared_ptr<AgentProxyImpl> agent;
- string name;
- boost::shared_ptr<SchemaClassKey> classKey;
- Object* object;
- void* context;
- Event* event;
- uint64_t timestamp;
- uint32_t methodHandle;
- MethodResponseImpl::Ptr methodResponse;
-
- ConsoleEventImpl(ConsoleEvent::EventKind k) :
- kind(k), object(0), context(0), event(0), timestamp(0), methodHandle(0) {}
- ~ConsoleEventImpl() {}
- ConsoleEvent copy();
- };
-
- struct BrokerEventImpl {
- typedef boost::shared_ptr<BrokerEventImpl> Ptr;
- BrokerEvent::EventKind kind;
- string name;
- string exchange;
- string bindingKey;
- void* context;
- QueryResponseImpl::Ptr queryResponse;
-
- BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {}
- ~BrokerEventImpl() {}
- BrokerEvent copy();
- };
-
- struct AgentProxyImpl {
- typedef boost::shared_ptr<AgentProxyImpl> Ptr;
- AgentProxy* envelope;
- ConsoleEngineImpl* console;
- BrokerProxyImpl* broker;
- uint32_t agentBank;
- string label;
-
- AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const string& l) :
- envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {}
- ~AgentProxyImpl() {}
- const string& getLabel() const { return label; }
- };
-
- class BrokerProxyImpl {
- public:
- typedef boost::shared_ptr<BrokerProxyImpl> Ptr;
-
- BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console);
- ~BrokerProxyImpl() {}
-
- void sessionOpened(SessionHandle& sh);
- void sessionClosed();
- void startProtocol();
-
- void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
-
- bool getEvent(BrokerEvent& event) const;
- void popEvent();
-
- uint32_t agentCount() const;
- const AgentProxy* getAgent(uint32_t idx) const;
- void sendQuery(const Query& query, void* context, const AgentProxy* agent);
- void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent);
-
- void addBinding(const string& exchange, const string& key);
- void staticRelease() { decOutstanding(); }
-
- private:
- friend class StaticContext;
- friend class QueryContext;
- mutable Mutex lock;
- BrokerProxy* envelope;
- ConsoleEngineImpl* console;
- string queueName;
- Uuid brokerId;
- SequenceManager seqMgr;
- uint32_t requestsOutstanding;
- bool topicBound;
- vector<AgentProxyImpl::Ptr> agentList;
- deque<MessageImpl::Ptr> xmtQueue;
- deque<BrokerEventImpl::Ptr> eventQueue;
-
-# define MA_BUFFER_SIZE 65536
- char outputBuffer[MA_BUFFER_SIZE];
-
- BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName);
- BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
- BrokerEventImpl::Ptr eventSetupComplete();
- BrokerEventImpl::Ptr eventStable();
- BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response);
-
- void handleBrokerResponse(Buffer& inBuffer, uint32_t seq);
- void handlePackageIndication(Buffer& inBuffer, uint32_t seq);
- void handleCommandComplete(Buffer& inBuffer, uint32_t seq);
- void handleClassIndication(Buffer& inBuffer, uint32_t seq);
- void handleMethodResponse(Buffer& inBuffer, uint32_t seq);
- void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq);
- void handleEventIndication(Buffer& inBuffer, uint32_t seq);
- void handleSchemaResponse(Buffer& inBuffer, uint32_t seq);
- ObjectImpl::Ptr handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
- void incOutstandingLH();
- void decOutstanding();
- };
-
- struct StaticContext : public SequenceContext {
- StaticContext(BrokerProxyImpl& b) : broker(b) {}
- ~StaticContext() {}
- void reserve() {}
- void release() { broker.staticRelease(); }
- bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
- BrokerProxyImpl& broker;
- };
-
- struct QueryContext : public SequenceContext {
- QueryContext(BrokerProxyImpl& b, void* u) :
- broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {}
- ~QueryContext() {}
- void reserve();
- void release();
- bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer);
-
- mutable Mutex lock;
- BrokerProxyImpl& broker;
- void* userContext;
- uint32_t requestsOutstanding;
- QueryResponseImpl::Ptr queryResponse;
- };
-
- class ConsoleEngineImpl {
- public:
- ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings());
- ~ConsoleEngineImpl();
-
- bool getEvent(ConsoleEvent& event) const;
- void popEvent();
-
- void addConnection(BrokerProxy& broker, void* context);
- void delConnection(BrokerProxy& broker);
-
- uint32_t packageCount() const;
- const string& getPackageName(uint32_t idx) const;
-
- uint32_t classCount(const char* packageName) const;
- const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
-
- ClassKind getClassKind(const SchemaClassKey* key) const;
- const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const;
- const SchemaEventClass* getEventClass(const SchemaClassKey* key) const;
-
- void bindPackage(const char* packageName);
- void bindClass(const SchemaClassKey* key);
- void bindClass(const char* packageName, const char* className);
-
- /*
- void startSync(const Query& query, void* context, SyncQuery& sync);
- void touchSync(SyncQuery& sync);
- void endSync(SyncQuery& sync);
- */
-
- private:
- friend class BrokerProxyImpl;
- ConsoleEngine* envelope;
- const ConsoleSettings& settings;
- mutable Mutex lock;
- deque<ConsoleEventImpl::Ptr> eventQueue;
- vector<BrokerProxyImpl*> brokerList;
- vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
-
- // Declare a compare class for the class maps that compares the dereferenced
- // class key pointers. The default behavior would be to compare the pointer
- // addresses themselves.
- struct KeyCompare {
- bool operator()(const SchemaClassKeyImpl* left, const SchemaClassKeyImpl* right) const {
- return *left < *right;
- }
- };
-
- typedef map<const SchemaClassKeyImpl*, SchemaObjectClassImpl::Ptr, KeyCompare> ObjectClassList;
- typedef map<const SchemaClassKeyImpl*, SchemaEventClassImpl::Ptr, KeyCompare> EventClassList;
- typedef map<string, pair<ObjectClassList, EventClassList> > PackageList;
-
- PackageList packages;
-
- void learnPackage(const string& packageName);
- void learnClass(SchemaObjectClassImpl::Ptr cls);
- void learnClass(SchemaEventClassImpl::Ptr cls);
- bool haveClass(const SchemaClassKeyImpl& key) const;
- SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const;
- };
-}
-
-namespace {
- const char* QMF_EXCHANGE = "qpid.management";
- const char* DIR_EXCHANGE = "amq.direct";
- const char* BROKER_KEY = "broker";
- const char* BROKER_PACKAGE = "org.apache.qpid.broker";
- const char* AGENT_CLASS = "agent";
- const char* BROKER_AGENT_KEY = "agent.1.0";
-}
-
-const Object* QueryResponseImpl::getObject(uint32_t idx) const
-{
- vector<ObjectImpl::Ptr>::const_iterator iter = results.begin();
-
- while (idx > 0) {
- if (iter == results.end())
- return 0;
- iter++;
- idx--;
- }
-
- return (*iter)->envelope;
-}
-
-#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
-
-ConsoleEvent ConsoleEventImpl::copy()
-{
- ConsoleEvent item;
-
- ::memset(&item, 0, sizeof(ConsoleEvent));
- item.kind = kind;
- item.agent = agent.get() ? agent->envelope : 0;
- item.classKey = classKey.get();
- item.object = object;
- item.context = context;
- item.event = event;
- item.timestamp = timestamp;
- item.methodHandle = methodHandle;
- item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0;
-
- STRING_REF(name);
-
- return item;
-}
-
-BrokerEvent BrokerEventImpl::copy()
-{
- BrokerEvent item;
-
- ::memset(&item, 0, sizeof(BrokerEvent));
- item.kind = kind;
-
- STRING_REF(name);
- STRING_REF(exchange);
- STRING_REF(bindingKey);
- item.context = context;
- item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0;
-
- return item;
-}
-
-BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) :
- envelope(e), console(_console.impl)
-{
- stringstream qn;
- qpid::TcpAddress addr;
-
- SystemInfo::getLocalHostname(addr);
- qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
- queueName = qn.str();
-
- seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
-}
-
-void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
- eventQueue.push_back(eventDeclareQueue(queueName));
- eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
- eventQueue.push_back(eventSetupComplete());
-
- // TODO: Store session handle
-}
-
-void BrokerProxyImpl::sessionClosed()
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
-}
-
-void BrokerProxyImpl::startProtocol()
-{
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker")));
-
- requestsOutstanding = 1;
- topicBound = false;
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
-}
-
-void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
-{
- uint32_t length = buf.getPosition();
- MessageImpl::Ptr message(new MessageImpl);
-
- buf.reset();
- buf.getRawData(message->body, length);
- message->destination = destination;
- message->routingKey = routingKey;
- message->replyExchange = DIR_EXCHANGE;
- message->replyKey = queueName;
-
- xmtQueue.push_back(message);
-}
-
-void BrokerProxyImpl::handleRcvMessage(Message& message)
-{
- Buffer inBuffer(message.body, message.length);
- uint8_t opcode;
- uint32_t sequence;
-
- while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
- seqMgr.dispatch(opcode, sequence, inBuffer);
-}
-
-bool BrokerProxyImpl::getXmtMessage(Message& item) const
-{
- Mutex::ScopedLock _lock(lock);
- if (xmtQueue.empty())
- return false;
- item = xmtQueue.front()->copy();
- return true;
-}
-
-void BrokerProxyImpl::popXmt()
-{
- Mutex::ScopedLock _lock(lock);
- if (!xmtQueue.empty())
- xmtQueue.pop_front();
-}
-
-bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front()->copy();
- return true;
-}
-
-void BrokerProxyImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-uint32_t BrokerProxyImpl::agentCount() const
-{
- Mutex::ScopedLock _lock(lock);
- return agentList.size();
-}
-
-const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
-{
- Mutex::ScopedLock _lock(lock);
- for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
- iter != agentList.end(); iter++)
- if (idx-- == 0)
- return (*iter)->envelope;
- return 0;
-}
-
-void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
-{
- SequenceContext::Ptr queryContext(new QueryContext(*this, context));
- Mutex::ScopedLock _lock(lock);
- if (agent != 0) {
- sendGetRequestLH(queryContext, query, agent->impl);
- } else {
- // TODO (optimization) only send queries to agents that have the requested class+package
- for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin();
- iter != agentList.end(); iter++) {
- sendGetRequestLH(queryContext, query, (*iter).get());
- }
- }
-}
-
-void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent)
-{
- stringstream key;
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(queryContext));
-
- Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
- query.impl->encode(outBuffer);
- key << "agent.1." << agent->agentBank;
- sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
- QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
-}
-
-void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
-{
- eventQueue.push_back(eventBind(exchange, queueName, key));
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
- event->name = queueName;
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
- event->name = queue;
- event->exchange = exchange;
- event->bindingKey = key;
-
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
- event->context = context;
- event->queryResponse = response;
- return event;
-}
-
-void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
-{
- brokerId.decode(inBuffer);
- QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
- Mutex::ScopedLock _lock(lock);
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- incOutstandingLH();
- Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
-}
-
-void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
-{
- string package;
-
- inBuffer.getShortString(package);
- QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
- console->learnPackage(package);
-
- Mutex::ScopedLock _lock(lock);
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- incOutstandingLH();
- Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
- outBuffer.putShortString(package);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
-}
-
-void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
-{
- string text;
- uint32_t code = inBuffer.getLong();
- inBuffer.getShortString(text);
- QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
-}
-
-void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
-{
- uint8_t kind = inBuffer.getOctet();
- SchemaClassKeyImpl classKey(inBuffer);
-
- QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str());
-
- if (!console->haveClass(classKey)) {
- Mutex::ScopedLock _lock(lock);
- incOutstandingLH();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
- classKey.encode(outBuffer);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str());
- }
-}
-
-void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
- // TODO
-}
-
-void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
- // TODO
-}
-
-void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
-{
- // TODO
-}
-
-void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
-{
- SchemaObjectClassImpl::Ptr oClassPtr;
- SchemaEventClassImpl::Ptr eClassPtr;
- uint8_t kind = inBuffer.getOctet();
- const SchemaClassKeyImpl* key;
- if (kind == CLASS_OBJECT) {
- oClassPtr.reset(new SchemaObjectClassImpl(inBuffer));
- console->learnClass(oClassPtr);
- key = oClassPtr->getClassKey()->impl;
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str());
-
- //
- // If we have just learned about the org.apache.qpid.broker:agent class, send a get
- // request for the current list of agents so we can have it on-hand before we declare
- // this session "stable".
- //
- if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) {
- Mutex::ScopedLock _lock(lock);
- incOutstandingLH();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
- FieldTable ft;
- ft.setString("_class", AGENT_CLASS);
- ft.setString("_package", BROKER_PACKAGE);
- ft.encode(outBuffer);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
- QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
- }
- } else if (kind == CLASS_EVENT) {
- eClassPtr.reset(new SchemaEventClassImpl(inBuffer));
- console->learnClass(eClassPtr);
- key = eClassPtr->getClassKey()->impl;
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str());
- }
- else {
- QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
- }
-}
-
-ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
-{
- SchemaClassKeyImpl classKey(inBuffer);
- QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str());
-
- SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey);
- if (schema.get() == 0) {
- QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str());
- return ObjectImpl::Ptr();
- }
-
- return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, inBuffer, prop, stat, true));
-}
-
-void BrokerProxyImpl::incOutstandingLH()
-{
- requestsOutstanding++;
-}
-
-void BrokerProxyImpl::decOutstanding()
-{
- Mutex::ScopedLock _lock(lock);
- requestsOutstanding--;
- if (requestsOutstanding == 0 && !topicBound) {
- topicBound = true;
- for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin();
- iter != console->bindingList.end(); iter++) {
- string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
- string key(iter->second);
- eventQueue.push_back(eventBind(exchange, queueName, key));
- }
- eventQueue.push_back(eventStable());
- }
-}
-
-MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this))
-{
- string text;
-
- status = buf.getLong();
- buf.getMediumString(text);
- exception.reset(new Value(TYPE_LSTR));
- exception->setString(text.c_str());
-
- // TODO: Parse schema-specific output arguments.
- arguments.reset(new Value(TYPE_MAP));
-}
-
-bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
-{
- bool completeContext = false;
- if (opcode == Protocol::OP_BROKER_RESPONSE) {
- broker.handleBrokerResponse(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
- broker.handleCommandComplete(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
- broker.handleSchemaResponse(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_PACKAGE_INDICATION)
- broker.handlePackageIndication(buffer, sequence);
- else if (opcode == Protocol::OP_CLASS_INDICATION)
- broker.handleClassIndication(buffer, sequence);
- else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
- broker.handleHeartbeatIndication(buffer, sequence);
- else if (opcode == Protocol::OP_EVENT_INDICATION)
- broker.handleEventIndication(buffer, sequence);
- else if (opcode == Protocol::OP_PROPERTY_INDICATION)
- broker.handleObjectIndication(buffer, sequence, true, false);
- else if (opcode == Protocol::OP_STATISTIC_INDICATION)
- broker.handleObjectIndication(buffer, sequence, false, true);
- else if (opcode == Protocol::OP_OBJECT_INDICATION)
- broker.handleObjectIndication(buffer, sequence, true, true);
- else {
- QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
- completeContext = true;
- }
-
- return completeContext;
-}
-
-void QueryContext::reserve()
-{
- Mutex::ScopedLock _lock(lock);
- requestsOutstanding++;
-}
-
-void QueryContext::release()
-{
- Mutex::ScopedLock _lock(lock);
- if (--requestsOutstanding == 0) {
- broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
- }
-}
-
-bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
-{
- bool completeContext = false;
- ObjectImpl::Ptr object;
-
- if (opcode == Protocol::OP_COMMAND_COMPLETE) {
- broker.handleCommandComplete(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_OBJECT_INDICATION) {
- object = broker.handleObjectIndication(buffer, sequence, true, true);
- if (object.get() != 0)
- queryResponse->results.push_back(object);
- }
- else {
- QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
- completeContext = true;
- }
-
- return completeContext;
-}
-
-ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) :
- envelope(e), settings(s)
-{
- bindingList.push_back(pair<string, string>(string(), "schema.#"));
- if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
- bindingList.push_back(pair<string, string>(string(), "console.#"));
- } else {
- if (settings.rcvObjects && !settings.userBindings)
- bindingList.push_back(pair<string, string>(string(), "console.obj.#"));
- else
- bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent"));
- if (settings.rcvEvents)
- bindingList.push_back(pair<string, string>(string(), "console.event.#"));
- if (settings.rcvHeartbeats)
- bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#"));
- }
-}
-
-ConsoleEngineImpl::~ConsoleEngineImpl()
-{
- // This function intentionally left blank.
-}
-
-bool ConsoleEngineImpl::getEvent(ConsoleEvent& event) const
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front()->copy();
- return true;
-}
-
-void ConsoleEngineImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-void ConsoleEngineImpl::addConnection(BrokerProxy& broker, void* /*context*/)
-{
- Mutex::ScopedLock _lock(lock);
- brokerList.push_back(broker.impl);
-}
-
-void ConsoleEngineImpl::delConnection(BrokerProxy& broker)
-{
- Mutex::ScopedLock _lock(lock);
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- if (*iter == broker.impl) {
- brokerList.erase(iter);
- break;
- }
-}
-
-uint32_t ConsoleEngineImpl::packageCount() const
-{
- Mutex::ScopedLock _lock(lock);
- return packages.size();
-}
-
-const string& ConsoleEngineImpl::getPackageName(uint32_t idx) const
-{
- const static string empty;
-
- Mutex::ScopedLock _lock(lock);
- if (idx >= packages.size())
- return empty;
-
- PackageList::const_iterator iter = packages.begin();
- for (uint32_t i = 0; i < idx; i++) iter++;
- return iter->first;
-}
-
-uint32_t ConsoleEngineImpl::classCount(const char* packageName) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(packageName);
- if (pIter == packages.end())
- return 0;
-
- const ObjectClassList& oList = pIter->second.first;
- const EventClassList& eList = pIter->second.second;
-
- return oList.size() + eList.size();
-}
-
-const SchemaClassKey* ConsoleEngineImpl::getClass(const char* packageName, uint32_t idx) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(packageName);
- if (pIter == packages.end())
- return 0;
-
- const ObjectClassList& oList = pIter->second.first;
- const EventClassList& eList = pIter->second.second;
- uint32_t count = 0;
-
- for (ObjectClassList::const_iterator oIter = oList.begin();
- oIter != oList.end(); oIter++) {
- if (count == idx)
- return oIter->second->getClassKey();
- count++;
- }
-
- for (EventClassList::const_iterator eIter = eList.begin();
- eIter != eList.end(); eIter++) {
- if (count == idx)
- return eIter->second->getClassKey();
- count++;
- }
-
- return 0;
-}
-
-ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey* key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return CLASS_OBJECT;
-
- const EventClassList& eList = pIter->second.second;
- if (eList.find(key->impl) != eList.end())
- return CLASS_EVENT;
- return CLASS_OBJECT;
-}
-
-const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey* key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return 0;
-
- const ObjectClassList& oList = pIter->second.first;
- ObjectClassList::const_iterator iter = oList.find(key->impl);
- if (iter == oList.end())
- return 0;
- return iter->second->envelope;
-}
-
-const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey* key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return 0;
-
- const EventClassList& eList = pIter->second.second;
- EventClassList::const_iterator iter = eList.find(key->impl);
- if (iter == eList.end())
- return 0;
- return iter->second->envelope;
-}
-
-void ConsoleEngineImpl::bindPackage(const char* packageName)
-{
- stringstream key;
- key << "console.obj.*.*." << packageName << ".#";
- Mutex::ScopedLock _lock(lock);
- bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- (*iter)->addBinding(QMF_EXCHANGE, key.str());
-}
-
-void ConsoleEngineImpl::bindClass(const SchemaClassKey* classKey)
-{
- stringstream key;
- key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#";
- Mutex::ScopedLock _lock(lock);
- bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- (*iter)->addBinding(QMF_EXCHANGE, key.str());
-}
-
-void ConsoleEngineImpl::bindClass(const char* packageName, const char* className)
-{
- stringstream key;
- key << "console.obj.*.*." << packageName << "." << className << ".#";
- Mutex::ScopedLock _lock(lock);
- bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- (*iter)->addBinding(QMF_EXCHANGE, key.str());
-}
-
-/*
-void ConsoleEngineImpl::startSync(const Query& query, void* context, SyncQuery& sync)
-{
-}
-
-void ConsoleEngineImpl::touchSync(SyncQuery& sync)
-{
-}
-
-void ConsoleEngineImpl::endSync(SyncQuery& sync)
-{
-}
-*/
-
-void ConsoleEngineImpl::learnPackage(const string& packageName)
-{
- Mutex::ScopedLock _lock(lock);
- if (packages.find(packageName) == packages.end())
- packages.insert(pair<string, pair<ObjectClassList, EventClassList> >
- (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList())));
-}
-
-void ConsoleEngineImpl::learnClass(SchemaObjectClassImpl::Ptr cls)
-{
- Mutex::ScopedLock _lock(lock);
- const SchemaClassKey* key = cls->getClassKey();
- PackageList::iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return;
-
- ObjectClassList& list = pIter->second.first;
- if (list.find(key->impl) == list.end())
- list[key->impl] = cls;
-}
-
-void ConsoleEngineImpl::learnClass(SchemaEventClassImpl::Ptr cls)
-{
- Mutex::ScopedLock _lock(lock);
- const SchemaClassKey* key = cls->getClassKey();
- PackageList::iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return;
-
- EventClassList& list = pIter->second.second;
- if (list.find(key->impl) == list.end())
- list[key->impl] = cls;
-}
-
-bool ConsoleEngineImpl::haveClass(const SchemaClassKeyImpl& key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key.getPackageName());
- if (pIter == packages.end())
- return false;
-
- const ObjectClassList& oList = pIter->second.first;
- const EventClassList& eList = pIter->second.second;
-
- return oList.find(&key) != oList.end() || eList.find(&key) != eList.end();
-}
-
-SchemaObjectClassImpl::Ptr ConsoleEngineImpl::getSchema(const SchemaClassKeyImpl& key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key.getPackageName());
- if (pIter == packages.end())
- return SchemaObjectClassImpl::Ptr();
-
- const ObjectClassList& oList = pIter->second.first;
- ObjectClassList::const_iterator iter = oList.find(&key);
- if (iter == oList.end())
- return SchemaObjectClassImpl::Ptr();
-
- return iter->second;
-}
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
-AgentProxy::~AgentProxy() { delete impl; }
-const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
-
-BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {}
-BrokerProxy::~BrokerProxy() { delete impl; }
-void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
-void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
-void BrokerProxy::startProtocol() { impl->startProtocol(); }
-void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void BrokerProxy::popXmt() { impl->popXmt(); }
-bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
-void BrokerProxy::popEvent() { impl->popEvent(); }
-uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
-const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
-void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
-
-MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
-MethodResponse::~MethodResponse() {}
-uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
-const Value* MethodResponse::getException() const { return impl->getException(); }
-const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
-
-QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
-QueryResponse::~QueryResponse() {}
-uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
-const Value* QueryResponse::getException() const { return impl->getException(); }
-uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
-const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
-
-ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {}
-ConsoleEngine::~ConsoleEngine() { delete impl; }
-bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }
-void ConsoleEngine::popEvent() { impl->popEvent(); }
-void ConsoleEngine::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); }
-void ConsoleEngine::delConnection(BrokerProxy& broker) { impl->delConnection(broker); }
-uint32_t ConsoleEngine::packageCount() const { return impl->packageCount(); }
-const char* ConsoleEngine::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); }
-uint32_t ConsoleEngine::classCount(const char* packageName) const { return impl->classCount(packageName); }
-const SchemaClassKey* ConsoleEngine::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); }
-ClassKind ConsoleEngine::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); }
-const SchemaObjectClass* ConsoleEngine::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); }
-const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); }
-void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); }
-void ConsoleEngine::bindClass(const SchemaClassKey* key) { impl->bindClass(key); }
-void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); }
-//void ConsoleEngine::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); }
-//void ConsoleEngine::touchSync(SyncQuery& sync) { impl->touchSync(sync); }
-//void ConsoleEngine::endSync(SyncQuery& sync) { impl->endSync(sync); }
-
-
diff --git a/qpid/cpp/src/qmf/ConsoleEngine.h b/qpid/cpp/src/qmf/ConsoleEngine.h
deleted file mode 100644
index 457e83ad58..0000000000
--- a/qpid/cpp/src/qmf/ConsoleEngine.h
+++ /dev/null
@@ -1,222 +0,0 @@
-#ifndef _QmfConsoleEngine_
-#define _QmfConsoleEngine_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/ResilientConnection.h>
-#include <qmf/Schema.h>
-#include <qmf/ObjectId.h>
-#include <qmf/Object.h>
-#include <qmf/Event.h>
-#include <qmf/Query.h>
-#include <qmf/Value.h>
-#include <qmf/Message.h>
-
-namespace qmf {
-
- class ConsoleEngine;
- class ConsoleEngineImpl;
- class BrokerProxyImpl;
- class AgentProxy;
- class AgentProxyImpl;
- class MethodResponseImpl;
- class QueryResponseImpl;
- class QueryContext;
-
- /**
- *
- */
- class MethodResponse {
- public:
- MethodResponse(MethodResponseImpl* impl);
- ~MethodResponse();
- uint32_t getStatus() const;
- const Value* getException() const;
- const Value* getArgs() const;
-
- private:
- friend class ConsoleEngineImpl;
- MethodResponseImpl* impl;
- };
-
- /**
- *
- */
- class QueryResponse {
- public:
- QueryResponse(QueryResponseImpl* impl);
- ~QueryResponse();
- uint32_t getStatus() const;
- const Value* getException() const;
- uint32_t getObjectCount() const;
- const Object* getObject(uint32_t idx) const;
-
- private:
- friend class QueryContext;
- QueryResponseImpl *impl;
- };
-
- /**
- *
- */
- struct ConsoleEvent {
- enum EventKind {
- AGENT_ADDED = 1,
- AGENT_DELETED = 2,
- NEW_PACKAGE = 3,
- NEW_CLASS = 4,
- OBJECT_UPDATE = 5,
- EVENT_RECEIVED = 7,
- AGENT_HEARTBEAT = 8,
- METHOD_RESPONSE = 9
- };
-
- EventKind kind;
- AgentProxy* agent; // (AGENT_[ADDED|DELETED|HEARTBEAT])
- char* name; // (NEW_PACKAGE)
- SchemaClassKey* classKey; // (NEW_CLASS)
- Object* object; // (OBJECT_UPDATE)
- void* context; // (OBJECT_UPDATE)
- Event* event; // (EVENT_RECEIVED)
- uint64_t timestamp; // (AGENT_HEARTBEAT)
- uint32_t methodHandle; // (METHOD_RESPONSE)
- MethodResponse* methodResponse; // (METHOD_RESPONSE)
- QueryResponse* queryResponse; // (QUERY_COMPLETE)
- };
-
- /**
- *
- */
- struct BrokerEvent {
- enum EventKind {
- BROKER_INFO = 10,
- DECLARE_QUEUE = 11,
- DELETE_QUEUE = 12,
- BIND = 13,
- UNBIND = 14,
- SETUP_COMPLETE = 15,
- STABLE = 16,
- QUERY_COMPLETE = 17
- };
-
- EventKind kind;
- char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
- char* exchange; // ([UN]BIND)
- char* bindingKey; // ([UN]BIND)
- void* context; // (QUERY_COMPLETE)
- QueryResponse* queryResponse; // (QUERY_COMPLETE)
- };
-
- /**
- *
- */
- class AgentProxy {
- public:
- AgentProxy(AgentProxyImpl* impl);
- ~AgentProxy();
- const char* getLabel() const;
-
- private:
- friend class BrokerProxyImpl;
- AgentProxyImpl* impl;
- };
-
- /**
- *
- */
- class BrokerProxy {
- public:
- BrokerProxy(ConsoleEngine& console);
- ~BrokerProxy();
-
- void sessionOpened(SessionHandle& sh);
- void sessionClosed();
- void startProtocol();
-
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
-
- bool getEvent(BrokerEvent& event) const;
- void popEvent();
-
- uint32_t agentCount() const;
- const AgentProxy* getAgent(uint32_t idx) const;
- void sendQuery(const Query& query, void* context, const AgentProxy* agent = 0);
-
- private:
- friend class ConsoleEngineImpl;
- BrokerProxyImpl* impl;
- };
-
- // TODO - move this to a public header
- struct ConsoleSettings {
- bool rcvObjects;
- bool rcvEvents;
- bool rcvHeartbeats;
- bool userBindings;
-
- ConsoleSettings() :
- rcvObjects(true),
- rcvEvents(true),
- rcvHeartbeats(true),
- userBindings(false) {}
- };
-
- class ConsoleEngine {
- public:
- ConsoleEngine(const ConsoleSettings& settings = ConsoleSettings());
- ~ConsoleEngine();
-
- bool getEvent(ConsoleEvent& event) const;
- void popEvent();
-
- void addConnection(BrokerProxy& broker, void* context);
- void delConnection(BrokerProxy& broker);
-
- uint32_t packageCount() const;
- const char* getPackageName(uint32_t idx) const;
-
- uint32_t classCount(const char* packageName) const;
- const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
-
- ClassKind getClassKind(const SchemaClassKey* key) const;
- const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const;
- const SchemaEventClass* getEventClass(const SchemaClassKey* key) const;
-
- void bindPackage(const char* packageName);
- void bindClass(const SchemaClassKey* key);
- void bindClass(const char* packageName, const char* className);
-
- /*
- void startSync(const Query& query, void* context, SyncQuery& sync);
- void touchSync(SyncQuery& sync);
- void endSync(SyncQuery& sync);
- */
-
- private:
- friend class BrokerProxyImpl;
- friend class AgentProxyImpl;
- ConsoleEngineImpl* impl;
- };
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/Event.h b/qpid/cpp/src/qmf/Event.h
deleted file mode 100644
index f20c6d2fb1..0000000000
--- a/qpid/cpp/src/qmf/Event.h
+++ /dev/null
@@ -1,30 +0,0 @@
-#ifndef _QmfEvent_
-#define _QmfEvent_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-namespace qmf {
-
- class Event {
- };
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/Message.h b/qpid/cpp/src/qmf/Message.h
deleted file mode 100644
index 52b8ba72d3..0000000000
--- a/qpid/cpp/src/qmf/Message.h
+++ /dev/null
@@ -1,39 +0,0 @@
-#ifndef _QmfMessage_
-#define _QmfMessage_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qpid/sys/IntegerTypes.h"
-
-namespace qmf {
-
- struct Message {
- char* body;
- uint32_t length;
- char* destination;
- char* routingKey;
- char* replyExchange;
- char* replyKey;
- char* userId;
- };
-
-}
-
-#endif
diff --git a/qpid/cpp/src/qmf/Object.h b/qpid/cpp/src/qmf/Object.h
deleted file mode 100644
index 9cb3224d9b..0000000000
--- a/qpid/cpp/src/qmf/Object.h
+++ /dev/null
@@ -1,48 +0,0 @@
-#ifndef _QmfObject_
-#define _QmfObject_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/Schema.h>
-#include <qmf/ObjectId.h>
-#include <qmf/Value.h>
-
-namespace qmf {
-
- struct ObjectImpl;
- class Object {
- public:
- Object(const SchemaObjectClass* type);
- Object(ObjectImpl* impl);
- Object(const Object& from);
- virtual ~Object();
-
- void destroy();
- const ObjectId* getObjectId() const;
- void setObjectId(ObjectId* oid);
- const SchemaObjectClass* getClass() const;
- Value* getValue(char* key) const;
-
- ObjectImpl* impl;
- };
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/ObjectId.h b/qpid/cpp/src/qmf/ObjectId.h
deleted file mode 100644
index e894e0b39c..0000000000
--- a/qpid/cpp/src/qmf/ObjectId.h
+++ /dev/null
@@ -1,53 +0,0 @@
-#ifndef _QmfObjectId_
-#define _QmfObjectId_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qpid/sys/IntegerTypes.h>
-
-namespace qmf {
-
- // TODO: Add to/from string and << operator
-
- struct ObjectIdImpl;
- class ObjectId {
- public:
- ObjectId();
- ObjectId(const ObjectId& from);
- ObjectId(ObjectIdImpl* impl);
- ~ObjectId();
-
- uint64_t getObjectNum() const;
- uint32_t getObjectNumHi() const;
- uint32_t getObjectNumLo() const;
- bool isDurable() const;
-
- bool operator==(const ObjectId& other) const;
- bool operator<(const ObjectId& other) const;
- bool operator>(const ObjectId& other) const;
- bool operator<=(const ObjectId& other) const;
- bool operator>=(const ObjectId& other) const;
-
- ObjectIdImpl* impl;
- };
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/Query.h b/qpid/cpp/src/qmf/Query.h
deleted file mode 100644
index 875749862e..0000000000
--- a/qpid/cpp/src/qmf/Query.h
+++ /dev/null
@@ -1,104 +0,0 @@
-#ifndef _QmfQuery_
-#define _QmfQuery_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/ObjectId.h>
-#include <qmf/Value.h>
-
-namespace qmf {
-
- struct Object;
- struct QueryElementImpl;
- struct QueryImpl;
- struct QueryExpressionImpl;
- struct SchemaClassKey;
-
- enum ValueOper {
- O_EQ = 1,
- O_NE = 2,
- O_LT = 3,
- O_LE = 4,
- O_GT = 5,
- O_GE = 6,
- O_RE_MATCH = 7,
- O_RE_NOMATCH = 8
- };
-
- struct QueryOperand {
- virtual ~QueryOperand() {}
- virtual bool evaluate(const Object* object) const = 0;
- };
-
- struct QueryElement : public QueryOperand {
- QueryElement(const char* attrName, const Value* value, ValueOper oper);
- QueryElement(QueryElementImpl* impl);
- virtual ~QueryElement();
- bool evaluate(const Object* object) const;
-
- QueryElementImpl* impl;
- };
-
- enum ExprOper {
- E_NOT = 1,
- E_AND = 2,
- E_OR = 3,
- E_XOR = 4
- };
-
- struct QueryExpression : public QueryOperand {
- QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2);
- QueryExpression(QueryExpressionImpl* impl);
- virtual ~QueryExpression();
- bool evaluate(const Object* object) const;
-
- QueryExpressionImpl* impl;
- };
-
- class Query {
- public:
- Query(const char* className, const char* packageName);
- Query(const SchemaClassKey* key);
- Query(const ObjectId* oid);
- Query(QueryImpl* impl);
- ~Query();
-
- void setSelect(const QueryOperand* criterion);
- void setLimit(uint32_t maxResults);
- void setOrderBy(const char* attrName, bool decreasing);
-
- const char* getPackage() const;
- const char* getClass() const;
- const ObjectId* getObjectId() const;
-
- bool haveSelect() const;
- bool haveLimit() const;
- bool haveOrderBy() const;
- const QueryOperand* getSelect() const;
- uint32_t getLimit() const;
- const char* getOrderBy() const;
- bool getDecreasing() const;
-
- QueryImpl* impl;
- };
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/ResilientConnection.h b/qpid/cpp/src/qmf/ResilientConnection.h
deleted file mode 100644
index 03f1b9c0d5..0000000000
--- a/qpid/cpp/src/qmf/ResilientConnection.h
+++ /dev/null
@@ -1,163 +0,0 @@
-#ifndef _QmfResilientConnection_
-#define _QmfResilientConnection_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/Message.h>
-#include <qmf/ConnectionSettings.h>
-#include <string>
-
-namespace qmf {
-
- class ResilientConnectionImpl;
-
- /**
- * Represents events that occur, unsolicited, from ResilientConnection.
- */
- struct ResilientConnectionEvent {
- enum EventKind {
- CONNECTED = 1,
- DISCONNECTED = 2,
- SESSION_CLOSED = 3,
- RECV = 4
- };
-
- EventKind kind;
- void* sessionContext; // SESSION_CLOSED, RECV
- char* errorText; // DISCONNECTED, SESSION_CLOSED
- Message message; // RECV
- };
-
- class SessionHandle {
- friend class ResilientConnectionImpl;
- void* impl;
- };
-
- /**
- * ResilientConnection represents a Qpid connection that is resilient.
- *
- * Upon creation, ResilientConnection attempts to establish a connection to the
- * messaging broker. If it fails, it will continue to retry at an interval that
- * increases over time (to a maximum interval). If an extablished connection is
- * dropped, a reconnect will be attempted.
- */
- class ResilientConnection {
- public:
-
- /**
- * Create a new resilient connection.
- *@param settings Settings that define how the connection is to be made.
- *@param delayMin Minimum delay (in seconds) between retries.
- *@param delayMax Maximum delay (in seconds) between retries.
- *@param delayFactor Factor to multiply retry delay by after each failure.
- */
- ResilientConnection(const ConnectionSettings& settings);
- ~ResilientConnection();
-
- /**
- * Get the connected status of the resilient connection.
- *@return true iff the connection is established.
- */
- bool isConnected() const;
-
- /**
- * Get the next event (if present) from the connection.
- *@param event Returned event if one is available.
- *@return true if event is valid, false if there are no more events to handle.
- */
- bool getEvent(ResilientConnectionEvent& event);
-
- /**
- * Discard the event on the front of the queue. This should be invoked after processing
- * the event from getEvent.
- */
- void popEvent();
-
- /**
- * Create a new AMQP session.
- *@param name Unique name for the session.
- *@param sessionContext Optional user-context value that will be provided in events
- * pertaining to this session.
- *@param handle Output handle to be stored and used in subsequent calls pertaining to
- * this session.
- *@return true iff the session was successfully created.
- */
- bool createSession(const char* name, void* sessionContext, SessionHandle& handle);
-
- /**
- * Destroy a created session.
- *@param handle SessionHandle returned by createSession.
- */
- void destroySession(SessionHandle handle);
-
- /**
- * Send a message into the AMQP broker via a session.
- *@param handle The session handle of the session to transmit through.
- *@param message The QMF message to transmit.
- */
- void sendMessage(SessionHandle handle, Message& message);
-
- /**
- * Declare an exclusive, auto-delete queue for a session.
- *@param handle The session handle for the owner of the queue.
- *@param queue The name of the queue.
- */
- void declareQueue(SessionHandle handle, char* queue);
-
- /**
- * Delete a queue.
- *@param handle The session handle for the owner of the queue.
- *@param queue The name of the queue.
- */
- void deleteQueue(SessionHandle handle, char* queue);
-
- /**
- * Bind a queue to an exchange.
- *@param handle The session handle of the session to use for binding.
- *@param exchange The name of the exchange for binding.
- *@param queue The name of the queue for binding.
- *@param key The binding key.
- */
- void bind(SessionHandle handle, char* exchange, char* queue, char* key);
-
- /**
- * Remove a binding.
- *@param handle The session handle of the session to use for un-binding.
- *@param exchange The name of the exchange.
- *@param queue The name of the queue.
- *@param key The binding key.
- */
- void unbind(SessionHandle handle, char* exchange, char* queue, char* key);
-
- /**
- * Establish a file descriptor for event notification.
- *@param fd A file descriptor into which the connection shall write a character each
- * time an event is enqueued. This fd may be in a pair, the other fd of which
- * is used in a select loop to control execution.
- */
- void setNotifyFd(int fd);
-
- private:
- ResilientConnectionImpl* impl;
- };
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/Schema.h b/qpid/cpp/src/qmf/Schema.h
deleted file mode 100644
index 1123acc3b8..0000000000
--- a/qpid/cpp/src/qmf/Schema.h
+++ /dev/null
@@ -1,171 +0,0 @@
-#ifndef _QmfSchema_
-#define _QmfSchema_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/Typecode.h>
-#include <qpid/sys/IntegerTypes.h>
-
-namespace qmf {
-
- enum Access { ACCESS_READ_CREATE = 1, ACCESS_READ_WRITE = 2, ACCESS_READ_ONLY = 3 };
- enum Direction { DIR_IN = 1, DIR_OUT = 2, DIR_IN_OUT = 3 };
- enum ClassKind { CLASS_OBJECT = 1, CLASS_EVENT = 2 };
-
- struct SchemaArgumentImpl;
- struct SchemaMethodImpl;
- struct SchemaPropertyImpl;
- struct SchemaStatisticImpl;
- struct SchemaObjectClassImpl;
- struct SchemaEventClassImpl;
- struct SchemaClassKeyImpl;
-
- /**
- */
- class SchemaArgument {
- public:
- SchemaArgument(const char* name, Typecode typecode);
- SchemaArgument(SchemaArgumentImpl* impl);
- ~SchemaArgument();
- void setDirection(Direction dir);
- void setUnit(const char* val);
- void setDesc(const char* desc);
- const char* getName() const;
- Typecode getType() const;
- Direction getDirection() const;
- const char* getUnit() const;
- const char* getDesc() const;
-
- SchemaArgumentImpl* impl;
- };
-
- /**
- */
- class SchemaMethod {
- public:
- SchemaMethod(const char* name);
- SchemaMethod(SchemaMethodImpl* impl);
- ~SchemaMethod();
- void addArgument(const SchemaArgument& argument);
- void setDesc(const char* desc);
- const char* getName() const;
- const char* getDesc() const;
- int getArgumentCount() const;
- const SchemaArgument* getArgument(int idx) const;
-
- SchemaMethodImpl* impl;
- };
-
- /**
- */
- class SchemaProperty {
- public:
- SchemaProperty(const char* name, Typecode typecode);
- SchemaProperty(SchemaPropertyImpl* impl);
- ~SchemaProperty();
- void setAccess(Access access);
- void setIndex(bool val);
- void setOptional(bool val);
- void setUnit(const char* val);
- void setDesc(const char* desc);
- const char* getName() const;
- Typecode getType() const;
- Access getAccess() const;
- bool isIndex() const;
- bool isOptional() const;
- const char* getUnit() const;
- const char* getDesc() const;
-
- SchemaPropertyImpl* impl;
- };
-
- /**
- */
- class SchemaStatistic {
- public:
- SchemaStatistic(const char* name, Typecode typecode);
- SchemaStatistic(SchemaStatisticImpl* impl);
- ~SchemaStatistic();
- void setUnit(const char* val);
- void setDesc(const char* desc);
- const char* getName() const;
- Typecode getType() const;
- const char* getUnit() const;
- const char* getDesc() const;
-
- SchemaStatisticImpl* impl;
- };
-
- /**
- */
- class SchemaClassKey {
- public:
- SchemaClassKey(SchemaClassKeyImpl* impl);
- ~SchemaClassKey();
-
- const char* getPackageName() const;
- const char* getClassName() const;
- const uint8_t* getHash() const;
-
- SchemaClassKeyImpl* impl;
- };
-
- /**
- */
- class SchemaObjectClass {
- public:
- SchemaObjectClass(const char* package, const char* name);
- SchemaObjectClass(SchemaObjectClassImpl* impl);
- ~SchemaObjectClass();
- void addProperty(const SchemaProperty& property);
- void addStatistic(const SchemaStatistic& statistic);
- void addMethod(const SchemaMethod& method);
-
- const SchemaClassKey* getClassKey() const;
- int getPropertyCount() const;
- int getStatisticCount() const;
- int getMethodCount() const;
- const SchemaProperty* getProperty(int idx) const;
- const SchemaStatistic* getStatistic(int idx) const;
- const SchemaMethod* getMethod(int idx) const;
-
- SchemaObjectClassImpl* impl;
- };
-
- /**
- */
- class SchemaEventClass {
- public:
- SchemaEventClass(const char* package, const char* name);
- SchemaEventClass(SchemaEventClassImpl* impl);
- ~SchemaEventClass();
- void addArgument(const SchemaArgument& argument);
- void setDesc(const char* desc);
-
- const SchemaClassKey* getClassKey() const;
- int getArgumentCount() const;
- const SchemaArgument* getArgument(int idx) const;
-
- SchemaEventClassImpl* impl;
- };
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/Typecode.h b/qpid/cpp/src/qmf/Typecode.h
deleted file mode 100644
index 94614d2977..0000000000
--- a/qpid/cpp/src/qmf/Typecode.h
+++ /dev/null
@@ -1,51 +0,0 @@
-#ifndef _QmfTypecode_
-#define _QmfTypecode_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-namespace qmf {
-
- enum Typecode {
- TYPE_UINT8 = 1,
- TYPE_UINT16 = 2,
- TYPE_UINT32 = 3,
- TYPE_UINT64 = 4,
- TYPE_SSTR = 6,
- TYPE_LSTR = 7,
- TYPE_ABSTIME = 8,
- TYPE_DELTATIME = 9,
- TYPE_REF = 10,
- TYPE_BOOL = 11,
- TYPE_FLOAT = 12,
- TYPE_DOUBLE = 13,
- TYPE_UUID = 14,
- TYPE_MAP = 15,
- TYPE_INT8 = 16,
- TYPE_INT16 = 17,
- TYPE_INT32 = 18,
- TYPE_INT64 = 19,
- TYPE_OBJECT = 20,
- TYPE_LIST = 21,
- TYPE_ARRAY = 22
- };
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/Value.h b/qpid/cpp/src/qmf/Value.h
deleted file mode 100644
index bb946d31d3..0000000000
--- a/qpid/cpp/src/qmf/Value.h
+++ /dev/null
@@ -1,113 +0,0 @@
-#ifndef _QmfValue_
-#define _QmfValue_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/ObjectId.h>
-#include <qmf/Typecode.h>
-
-namespace qmf {
-
- class Object;
- struct ValueImpl;
-
- class Value {
- public:
- // Value();
- Value(Typecode t, Typecode arrayType = TYPE_UINT8);
- Value(ValueImpl* impl);
- ~Value();
-
- Typecode getType() const;
- bool isNull() const;
- void setNull();
-
- bool isObjectId() const;
- const ObjectId& asObjectId() const;
- void setObjectId(const ObjectId& oid);
-
- bool isUint() const;
- uint32_t asUint() const;
- void setUint(uint32_t val);
-
- bool isInt() const;
- int32_t asInt() const;
- void setInt(int32_t val);
-
- bool isUint64() const;
- uint64_t asUint64() const;
- void setUint64(uint64_t val);
-
- bool isInt64() const;
- int64_t asInt64() const;
- void setInt64(int64_t val);
-
- bool isString() const;
- const char* asString() const;
- void setString(const char* val);
-
- bool isBool() const;
- bool asBool() const;
- void setBool(bool val);
-
- bool isFloat() const;
- float asFloat() const;
- void setFloat(float val);
-
- bool isDouble() const;
- double asDouble() const;
- void setDouble(double val);
-
- bool isUuid() const;
- const uint8_t* asUuid() const;
- void setUuid(const uint8_t* val);
-
- bool isObject() const;
- Object* asObject() const;
- void setObject(Object* val);
-
- bool isMap() const;
- bool keyInMap(const char* key) const;
- Value* byKey(const char* key);
- const Value* byKey(const char* key) const;
- void deleteKey(const char* key);
- void insert(const char* key, Value* val);
- uint32_t keyCount() const;
- const char* key(uint32_t idx) const;
-
- bool isList() const;
- uint32_t listItemCount() const;
- Value* listItem(uint32_t idx);
- void appendToList(Value* val);
- void deleteListItem(uint32_t idx);
-
- bool isArray() const;
- Typecode arrayType() const;
- uint32_t arrayItemCount() const;
- Value* arrayItem(uint32_t idx);
- void appendToArray(Value* val);
- void deleteArrayItem(uint32_t idx);
-
- ValueImpl* impl;
- };
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/AgentEngine.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp
index 9ea3be5907..c5d1bff2e0 100644
--- a/qpid/cpp/src/qmf/AgentEngine.cpp
+++ b/qpid/cpp/src/qmf/engine/Agent.cpp
@@ -17,15 +17,15 @@
* under the License.
*/
-#include "qmf/AgentEngine.h"
-#include "qmf/MessageImpl.h"
-#include "qmf/SchemaImpl.h"
-#include "qmf/Typecode.h"
-#include "qmf/ObjectImpl.h"
-#include "qmf/ObjectIdImpl.h"
-#include "qmf/QueryImpl.h"
-#include "qmf/ValueImpl.h"
-#include "qmf/Protocol.h"
+#include "qmf/engine/Agent.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qmf/engine/SchemaImpl.h"
+#include "qmf/engine/Typecode.h"
+#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/ObjectIdImpl.h"
+#include "qmf/engine/QueryImpl.h"
+#include "qmf/engine/ValueImpl.h"
+#include "qmf/engine/Protocol.h"
#include <qpid/framing/Buffer.h>
#include <qpid/framing/Uuid.h>
#include <qpid/framing/FieldTable.h>
@@ -40,13 +40,15 @@
#include <iostream>
#include <fstream>
#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
using namespace std;
-using namespace qmf;
+using namespace qmf::engine;
using namespace qpid::framing;
using namespace qpid::sys;
namespace qmf {
+namespace engine {
struct AgentEventImpl {
typedef boost::shared_ptr<AgentEventImpl> Ptr;
@@ -61,7 +63,7 @@ namespace qmf {
boost::shared_ptr<Value> arguments;
string exchange;
string bindingKey;
- SchemaObjectClass* objectClass;
+ const SchemaObjectClass* objectClass;
AgentEventImpl(AgentEvent::EventKind k) :
kind(k), sequence(0), object(0), objectClass(0) {}
@@ -74,14 +76,14 @@ namespace qmf {
uint32_t sequence;
string exchange;
string key;
- SchemaMethodImpl* schemaMethod;
+ const SchemaMethod* schemaMethod;
AgentQueryContext() : schemaMethod(0) {}
};
- class AgentEngineImpl {
+ class AgentImpl : public boost::noncopyable {
public:
- AgentEngineImpl(char* label, bool internalStore);
- ~AgentEngineImpl();
+ AgentImpl(char* label, bool internalStore);
+ ~AgentImpl();
void setStoreDir(const char* path);
void setTransferDir(const char* path);
@@ -163,8 +165,8 @@ namespace qmf {
}
};
- typedef map<AgentClassKey, SchemaObjectClassImpl*, AgentClassKeyComp> ObjectClassMap;
- typedef map<AgentClassKey, SchemaEventClassImpl*, AgentClassKeyComp> EventClassMap;
+ typedef map<AgentClassKey, SchemaObjectClass*, AgentClassKeyComp> ObjectClassMap;
+ typedef map<AgentClassKey, SchemaEventClass*, AgentClassKeyComp> EventClassMap;
struct ClassMaps {
ObjectClassMap objectClasses;
@@ -180,7 +182,7 @@ namespace qmf {
boost::shared_ptr<ObjectId> oid);
AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
- SchemaObjectClass* objectClass);
+ const SchemaObjectClass* objectClass);
void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
void sendPackageIndicationLH(const string& packageName);
@@ -198,10 +200,11 @@ namespace qmf {
void handleConsoleAddedIndication();
};
}
+}
-const char* AgentEngineImpl::QMF_EXCHANGE = "qpid.management";
-const char* AgentEngineImpl::DIR_EXCHANGE = "amq.direct";
-const char* AgentEngineImpl::BROKER_KEY = "broker";
+const char* AgentImpl::QMF_EXCHANGE = "qpid.management";
+const char* AgentImpl::DIR_EXCHANGE = "amq.direct";
+const char* AgentImpl::BROKER_KEY = "broker";
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
@@ -227,7 +230,7 @@ AgentEvent AgentEventImpl::copy()
return item;
}
-AgentEngineImpl::AgentEngineImpl(char* _label, bool i) :
+AgentImpl::AgentImpl(char* _label, bool i) :
label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1),
requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0),
@@ -236,11 +239,11 @@ AgentEngineImpl::AgentEngineImpl(char* _label, bool i) :
queueName += label;
}
-AgentEngineImpl::~AgentEngineImpl()
+AgentImpl::~AgentImpl()
{
}
-void AgentEngineImpl::setStoreDir(const char* path)
+void AgentImpl::setStoreDir(const char* path)
{
Mutex::ScopedLock _lock(lock);
if (path)
@@ -249,7 +252,7 @@ void AgentEngineImpl::setStoreDir(const char* path)
storeDir.clear();
}
-void AgentEngineImpl::setTransferDir(const char* path)
+void AgentImpl::setTransferDir(const char* path)
{
Mutex::ScopedLock _lock(lock);
if (path)
@@ -258,7 +261,7 @@ void AgentEngineImpl::setTransferDir(const char* path)
transferDir.clear();
}
-void AgentEngineImpl::handleRcvMessage(Message& message)
+void AgentImpl::handleRcvMessage(Message& message)
{
Buffer inBuffer(message.body, message.length);
uint8_t opcode;
@@ -274,13 +277,13 @@ void AgentEngineImpl::handleRcvMessage(Message& message)
else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId);
else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId);
else {
- QPID_LOG(error, "AgentEngineImpl::handleRcvMessage invalid opcode=" << opcode);
+ QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode);
break;
}
}
}
-bool AgentEngineImpl::getXmtMessage(Message& item) const
+bool AgentImpl::getXmtMessage(Message& item) const
{
Mutex::ScopedLock _lock(lock);
if (xmtQueue.empty())
@@ -289,14 +292,14 @@ bool AgentEngineImpl::getXmtMessage(Message& item) const
return true;
}
-void AgentEngineImpl::popXmt()
+void AgentImpl::popXmt()
{
Mutex::ScopedLock _lock(lock);
if (!xmtQueue.empty())
xmtQueue.pop_front();
}
-bool AgentEngineImpl::getEvent(AgentEvent& event) const
+bool AgentImpl::getEvent(AgentEvent& event) const
{
Mutex::ScopedLock _lock(lock);
if (eventQueue.empty())
@@ -305,14 +308,14 @@ bool AgentEngineImpl::getEvent(AgentEvent& event) const
return true;
}
-void AgentEngineImpl::popEvent()
+void AgentImpl::popEvent()
{
Mutex::ScopedLock _lock(lock);
if (!eventQueue.empty())
eventQueue.pop_front();
}
-void AgentEngineImpl::newSession()
+void AgentImpl::newSession()
{
Mutex::ScopedLock _lock(lock);
eventQueue.clear();
@@ -322,7 +325,7 @@ void AgentEngineImpl::newSession()
eventQueue.push_back(eventSetupComplete());
}
-void AgentEngineImpl::startProtocol()
+void AgentImpl::startProtocol()
{
Mutex::ScopedLock _lock(lock);
char rawbuffer[512];
@@ -338,7 +341,7 @@ void AgentEngineImpl::startProtocol()
" reqAgent=" << requestedAgentBank);
}
-void AgentEngineImpl::heartbeat()
+void AgentImpl::heartbeat()
{
Mutex::ScopedLock _lock(lock);
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
@@ -351,7 +354,7 @@ void AgentEngineImpl::heartbeat()
QPID_LOG(trace, "SENT HeartbeatIndication");
}
-void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
+void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
const Value& argMap)
{
Mutex::ScopedLock _lock(lock);
@@ -366,15 +369,15 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t
buffer.putLong(status);
buffer.putMediumString(text);
if (status == 0) {
- for (vector<SchemaArgumentImpl*>::const_iterator aIter = context->schemaMethod->arguments.begin();
- aIter != context->schemaMethod->arguments.end(); aIter++) {
- const SchemaArgumentImpl* schemaArg = *aIter;
- if (schemaArg->dir == DIR_OUT || schemaArg->dir == DIR_IN_OUT) {
- if (argMap.keyInMap(schemaArg->name.c_str())) {
- const Value* val = argMap.byKey(schemaArg->name.c_str());
+ for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin();
+ aIter != context->schemaMethod->impl->arguments.end(); aIter++) {
+ const SchemaArgument* schemaArg = *aIter;
+ if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) {
+ if (argMap.keyInMap(schemaArg->getName())) {
+ const Value* val = argMap.byKey(schemaArg->getName());
val->impl->encode(buffer);
} else {
- Value val(schemaArg->typecode);
+ Value val(schemaArg->getType());
val.impl->encode(buffer);
}
}
@@ -384,7 +387,7 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t
QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
}
-void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
+void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
{
Mutex::ScopedLock _lock(lock);
map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
@@ -406,7 +409,7 @@ void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop
QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
}
-void AgentEngineImpl::queryComplete(uint32_t sequence)
+void AgentImpl::queryComplete(uint32_t sequence)
{
Mutex::ScopedLock _lock(lock);
map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
@@ -418,69 +421,67 @@ void AgentEngineImpl::queryComplete(uint32_t sequence)
sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
}
-void AgentEngineImpl::registerClass(SchemaObjectClass* cls)
+void AgentImpl::registerClass(SchemaObjectClass* cls)
{
Mutex::ScopedLock _lock(lock);
- SchemaObjectClassImpl* impl = cls->impl;
- map<string, ClassMaps>::iterator iter = packages.find(impl->package);
+ map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName());
if (iter == packages.end()) {
- packages[impl->package] = ClassMaps();
- iter = packages.find(impl->getClassKey()->getPackageName());
+ packages[cls->getClassKey()->getPackageName()] = ClassMaps();
+ iter = packages.find(cls->getClassKey()->getPackageName());
// TODO: Indicate this package if connected
}
- AgentClassKey key(impl->getClassKey()->getClassName(), impl->getClassKey()->getHash());
- iter->second.objectClasses[key] = impl;
+ AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash());
+ iter->second.objectClasses[key] = cls;
// TODO: Indicate this schema if connected.
}
-void AgentEngineImpl::registerClass(SchemaEventClass* cls)
+void AgentImpl::registerClass(SchemaEventClass* cls)
{
Mutex::ScopedLock _lock(lock);
- SchemaEventClassImpl* impl = cls->impl;
- map<string, ClassMaps>::iterator iter = packages.find(impl->package);
+ map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName());
if (iter == packages.end()) {
- packages[impl->package] = ClassMaps();
- iter = packages.find(impl->getClassKey()->getPackageName());
+ packages[cls->getClassKey()->getPackageName()] = ClassMaps();
+ iter = packages.find(cls->getClassKey()->getPackageName());
// TODO: Indicate this package if connected
}
- AgentClassKey key(impl->getClassKey()->getClassName(), impl->getClassKey()->getHash());
- iter->second.eventClasses[key] = impl;
+ AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash());
+ iter->second.eventClasses[key] = cls;
// TODO: Indicate this schema if connected.
}
-const ObjectId* AgentEngineImpl::addObject(Object&, uint64_t)
+const ObjectId* AgentImpl::addObject(Object&, uint64_t)
{
Mutex::ScopedLock _lock(lock);
return 0;
}
-const ObjectId* AgentEngineImpl::allocObjectId(uint64_t persistId)
+const ObjectId* AgentImpl::allocObjectId(uint64_t persistId)
{
Mutex::ScopedLock _lock(lock);
uint16_t sequence = persistId ? 0 : bootSequence;
uint64_t objectNum = persistId ? persistId : nextObjectId++;
- ObjectIdImpl* oid = new ObjectIdImpl(&attachment, 0, sequence, objectNum);
- return oid->envelope;
+ ObjectId* oid = ObjectIdImpl::factory(&attachment, 0, sequence, objectNum);
+ return oid;
}
-const ObjectId* AgentEngineImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
+const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
{
return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo);
}
-void AgentEngineImpl::raiseEvent(Event&)
+void AgentImpl::raiseEvent(Event&)
{
Mutex::ScopedLock _lock(lock);
}
-AgentEventImpl::Ptr AgentEngineImpl::eventDeclareQueue(const string& name)
+AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
event->name = name;
@@ -488,7 +489,7 @@ AgentEventImpl::Ptr AgentEngineImpl::eventDeclareQueue(const string& name)
return event;
}
-AgentEventImpl::Ptr AgentEngineImpl::eventBind(const string& exchange, const string& queue,
+AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue,
const string& key)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND));
@@ -499,13 +500,13 @@ AgentEventImpl::Ptr AgentEngineImpl::eventBind(const string& exchange, const str
return event;
}
-AgentEventImpl::Ptr AgentEngineImpl::eventSetupComplete()
+AgentEventImpl::Ptr AgentImpl::eventSetupComplete()
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE));
return event;
}
-AgentEventImpl::Ptr AgentEngineImpl::eventQuery(uint32_t num, const string& userId, const string& package,
+AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package,
const string& cls, boost::shared_ptr<ObjectId> oid)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
@@ -518,9 +519,9 @@ AgentEventImpl::Ptr AgentEngineImpl::eventQuery(uint32_t num, const string& user
return event;
}
-AgentEventImpl::Ptr AgentEngineImpl::eventMethod(uint32_t num, const string& userId, const string& method,
+AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method,
boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
- SchemaObjectClass* objectClass)
+ const SchemaObjectClass* objectClass)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL));
event->sequence = num;
@@ -532,7 +533,7 @@ AgentEventImpl::Ptr AgentEngineImpl::eventMethod(uint32_t num, const string& use
return event;
}
-void AgentEngineImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
{
uint32_t length = buf.getPosition();
MessageImpl::Ptr message(new MessageImpl);
@@ -547,7 +548,7 @@ void AgentEngineImpl::sendBufferLH(Buffer& buf, const string& destination, const
xmtQueue.push_back(message);
}
-void AgentEngineImpl::sendPackageIndicationLH(const string& packageName)
+void AgentImpl::sendPackageIndicationLH(const string& packageName)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION);
@@ -556,7 +557,7 @@ void AgentEngineImpl::sendPackageIndicationLH(const string& packageName)
QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
}
-void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
+void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION);
@@ -568,7 +569,7 @@ void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packag
QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name);
}
-void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey,
+void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey,
uint32_t sequence, uint32_t code, const string& text)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
@@ -579,7 +580,7 @@ void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string
QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
}
-void AgentEngineImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
+void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
{
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence);
@@ -605,7 +606,7 @@ void AgentEngineImpl::sendMethodErrorLH(uint32_t sequence, const string& key, ui
QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext);
}
-void AgentEngineImpl::handleAttachResponse(Buffer& inBuffer)
+void AgentImpl::handleAttachResponse(Buffer& inBuffer)
{
Mutex::ScopedLock _lock(lock);
@@ -652,17 +653,17 @@ void AgentEngineImpl::handleAttachResponse(Buffer& inBuffer)
}
}
-void AgentEngineImpl::handlePackageRequest(Buffer&)
+void AgentImpl::handlePackageRequest(Buffer&)
{
Mutex::ScopedLock _lock(lock);
}
-void AgentEngineImpl::handleClassQuery(Buffer&)
+void AgentImpl::handleClassQuery(Buffer&)
{
Mutex::ScopedLock _lock(lock);
}
-void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
+void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
const string& replyExchange, const string& replyKey)
{
Mutex::ScopedLock _lock(lock);
@@ -688,10 +689,10 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
ClassMaps cMap = pIter->second;
ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key);
if (ocIter != cMap.objectClasses.end()) {
- SchemaObjectClassImpl* oImpl = ocIter->second;
+ SchemaObjectClass* oImpl = ocIter->second;
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
- oImpl->encode(buffer);
+ oImpl->impl->encode(buffer);
sendBufferLH(buffer, rExchange, rKey);
QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
return;
@@ -699,10 +700,10 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
EventClassMap::iterator ecIter = cMap.eventClasses.find(key);
if (ecIter != cMap.eventClasses.end()) {
- SchemaEventClassImpl* eImpl = ecIter->second;
+ SchemaEventClass* eImpl = ecIter->second;
Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
- eImpl->encode(buffer);
+ eImpl->impl->encode(buffer);
sendBufferLH(buffer, rExchange, rKey);
QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
return;
@@ -711,7 +712,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found");
}
-void AgentEngineImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId)
+void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId)
{
Mutex::ScopedLock _lock(lock);
FieldTable ft;
@@ -763,13 +764,12 @@ void AgentEngineImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const
eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid));
}
-void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId)
+void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId)
{
Mutex::ScopedLock _lock(lock);
string pname;
string method;
- ObjectIdImpl* oidImpl = new ObjectIdImpl(buffer);
- boost::shared_ptr<ObjectId> oid(oidImpl->envelope);
+ boost::shared_ptr<ObjectId> oid(ObjectIdImpl::factory(buffer));
buffer.getShortString(pname);
AgentClassKey classKey(buffer);
buffer.getShortString(method);
@@ -788,29 +788,29 @@ void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, con
return;
}
- const SchemaObjectClassImpl* schema = cIter->second;
- vector<SchemaMethodImpl*>::const_iterator mIter = schema->methods.begin();
- for (; mIter != schema->methods.end(); mIter++) {
- if ((*mIter)->name == method)
+ const SchemaObjectClass* schema = cIter->second;
+ vector<const SchemaMethod*>::const_iterator mIter = schema->impl->methods.begin();
+ for (; mIter != schema->impl->methods.end(); mIter++) {
+ if ((*mIter)->getName() == method)
break;
}
- if (mIter == schema->methods.end()) {
+ if (mIter == schema->impl->methods.end()) {
sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method);
return;
}
- SchemaMethodImpl* schemaMethod = *mIter;
+ const SchemaMethod* schemaMethod = *mIter;
boost::shared_ptr<Value> argMap(new Value(TYPE_MAP));
- ValueImpl* value;
- for (vector<SchemaArgumentImpl*>::const_iterator aIter = schemaMethod->arguments.begin();
- aIter != schemaMethod->arguments.end(); aIter++) {
- const SchemaArgumentImpl* schemaArg = *aIter;
- if (schemaArg->dir == DIR_IN || schemaArg->dir == DIR_IN_OUT)
- value = new ValueImpl(schemaArg->typecode, buffer);
+ Value* value;
+ for (vector<const SchemaArgument*>::const_iterator aIter = schemaMethod->impl->arguments.begin();
+ aIter != schemaMethod->impl->arguments.end(); aIter++) {
+ const SchemaArgument* schemaArg = *aIter;
+ if (schemaArg->getDirection() == DIR_IN || schemaArg->getDirection() == DIR_IN_OUT)
+ value = ValueImpl::factory(schemaArg->getType(), buffer);
else
- value = new ValueImpl(schemaArg->typecode);
- argMap->insert(schemaArg->name.c_str(), value->envelope);
+ value = ValueImpl::factory(schemaArg->getType());
+ argMap->insert(schemaArg->getName(), value);
}
AgentQueryContext::Ptr context(new AgentQueryContext);
@@ -821,10 +821,10 @@ void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, con
context->schemaMethod = schemaMethod;
contextMap[contextNum] = context;
- eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema->envelope));
+ eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema));
}
-void AgentEngineImpl::handleConsoleAddedIndication()
+void AgentImpl::handleConsoleAddedIndication()
{
Mutex::ScopedLock _lock(lock);
}
@@ -833,25 +833,25 @@ void AgentEngineImpl::handleConsoleAddedIndication()
// Wrappers
//==================================================================
-AgentEngine::AgentEngine(char* label, bool internalStore) { impl = new AgentEngineImpl(label, internalStore); }
-AgentEngine::~AgentEngine() { delete impl; }
-void AgentEngine::setStoreDir(const char* path) { impl->setStoreDir(path); }
-void AgentEngine::setTransferDir(const char* path) { impl->setTransferDir(path); }
-void AgentEngine::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool AgentEngine::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void AgentEngine::popXmt() { impl->popXmt(); }
-bool AgentEngine::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
-void AgentEngine::popEvent() { impl->popEvent(); }
-void AgentEngine::newSession() { impl->newSession(); }
-void AgentEngine::startProtocol() { impl->startProtocol(); }
-void AgentEngine::heartbeat() { impl->heartbeat(); }
-void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); }
-void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); }
-void AgentEngine::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
-void AgentEngine::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); }
-void AgentEngine::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); }
-const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); }
-const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); }
-const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); }
-void AgentEngine::raiseEvent(Event& event) { impl->raiseEvent(event); }
+Agent::Agent(char* label, bool internalStore) { impl = new AgentImpl(label, internalStore); }
+Agent::~Agent() { delete impl; }
+void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); }
+void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }
+void Agent::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool Agent::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void Agent::popXmt() { impl->popXmt(); }
+bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
+void Agent::popEvent() { impl->popEvent(); }
+void Agent::newSession() { impl->newSession(); }
+void Agent::startProtocol() { impl->startProtocol(); }
+void Agent::heartbeat() { impl->heartbeat(); }
+void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); }
+void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); }
+void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
+void Agent::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); }
+void Agent::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); }
+const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); }
+const ObjectId* Agent::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); }
+const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); }
+void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
new file mode 100644
index 0000000000..1a2b3e6555
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/BrokerProxyImpl.h"
+#include "qmf/engine/ConsoleImpl.h"
+#include "qmf/engine/Protocol.h"
+#include "qpid/Address.h"
+#include "qpid/sys/SystemInfo.h"
+#include <qpid/log/Statement.h>
+#include <qpid/StringUtils.h>
+#include <string.h>
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace {
+ const char* QMF_EXCHANGE = "qpid.management";
+ const char* DIR_EXCHANGE = "amq.direct";
+ const char* BROKER_KEY = "broker";
+ const char* BROKER_PACKAGE = "org.apache.qpid.broker";
+ const char* AGENT_CLASS = "agent";
+ const char* BROKER_AGENT_KEY = "agent.1.0";
+}
+
+const Object* QueryResponseImpl::getObject(uint32_t idx) const
+{
+ vector<ObjectPtr>::const_iterator iter = results.begin();
+
+ while (idx > 0) {
+ if (iter == results.end())
+ return 0;
+ iter++;
+ idx--;
+ }
+
+ return iter->get();
+}
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+BrokerEvent BrokerEventImpl::copy()
+{
+ BrokerEvent item;
+
+ ::memset(&item, 0, sizeof(BrokerEvent));
+ item.kind = kind;
+
+ STRING_REF(name);
+ STRING_REF(exchange);
+ STRING_REF(bindingKey);
+ item.context = context;
+ item.queryResponse = queryResponse.get();
+ item.methodResponse = methodResponse.get();
+
+ return item;
+}
+
+BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, Console& _console) : publicObject(pub), console(_console)
+{
+ stringstream qn;
+ qpid::TcpAddress addr;
+
+ SystemInfo::getLocalHostname(addr);
+ qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
+ queueName = qn.str();
+
+ seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
+}
+
+void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
+{
+ Mutex::ScopedLock _lock(lock);
+ agentList.clear();
+ eventQueue.clear();
+ xmtQueue.clear();
+ eventQueue.push_back(eventDeclareQueue(queueName));
+ eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
+ eventQueue.push_back(eventSetupComplete());
+
+ // TODO: Store session handle
+}
+
+void BrokerProxyImpl::sessionClosed()
+{
+ Mutex::ScopedLock _lock(lock);
+ agentList.clear();
+ eventQueue.clear();
+ xmtQueue.clear();
+}
+
+void BrokerProxyImpl::startProtocol()
+{
+ AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
+ {
+ Mutex::ScopedLock _lock(lock);
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ agentList[0] = agent;
+
+ requestsOutstanding = 1;
+ topicBound = false;
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
+ }
+
+ console.impl->eventAgentAdded(agent);
+}
+
+void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+{
+ uint32_t length = buf.getPosition();
+ MessageImpl::Ptr message(new MessageImpl);
+
+ buf.reset();
+ buf.getRawData(message->body, length);
+ message->destination = destination;
+ message->routingKey = routingKey;
+ message->replyExchange = DIR_EXCHANGE;
+ message->replyKey = queueName;
+
+ xmtQueue.push_back(message);
+}
+
+void BrokerProxyImpl::handleRcvMessage(Message& message)
+{
+ Buffer inBuffer(message.body, message.length);
+ uint8_t opcode;
+ uint32_t sequence;
+
+ while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
+ seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer);
+}
+
+bool BrokerProxyImpl::getXmtMessage(Message& item) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (xmtQueue.empty())
+ return false;
+ item = xmtQueue.front()->copy();
+ return true;
+}
+
+void BrokerProxyImpl::popXmt()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!xmtQueue.empty())
+ xmtQueue.pop_front();
+}
+
+bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void BrokerProxyImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+uint32_t BrokerProxyImpl::agentCount() const
+{
+ Mutex::ScopedLock _lock(lock);
+ return agentList.size();
+}
+
+const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
+{
+ Mutex::ScopedLock _lock(lock);
+ for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++)
+ if (idx-- == 0)
+ return iter->second.get();
+ return 0;
+}
+
+void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
+{
+ SequenceContext::Ptr queryContext(new QueryContext(*this, context));
+ Mutex::ScopedLock _lock(lock);
+ if (agent != 0) {
+ sendGetRequestLH(queryContext, query, agent);
+ } else {
+ // TODO (optimization) only send queries to agents that have the requested class+package
+ for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
+ iter != agentList.end(); iter++) {
+ sendGetRequestLH(queryContext, query, iter->second.get());
+ }
+ }
+}
+
+void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent)
+{
+ stringstream key;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(queryContext));
+ agent->impl->addSequence(sequence);
+
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ query.impl->encode(outBuffer);
+ key << "agent.1." << agent->impl->agentBank;
+ sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
+}
+
+string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const Value* argmap, Buffer& buffer)
+{
+ int argCount = schema->getArgumentCount();
+
+ if (argmap == 0 || !argmap->isMap())
+ return string("Arguments must be in a map value");
+
+ for (int aIdx = 0; aIdx < argCount; aIdx++) {
+ const SchemaArgument* arg(schema->getArgument(aIdx));
+ if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) {
+ if (argmap->keyInMap(arg->getName())) {
+ const Value* argVal(argmap->byKey(arg->getName()));
+ if (argVal->getType() != arg->getType())
+ return string("Argument is the wrong type: ") + arg->getName();
+ argVal->impl->encode(buffer);
+ } else {
+ Value defaultValue(arg->getType());
+ defaultValue.impl->encode(buffer);
+ }
+ }
+ }
+
+ return string();
+}
+
+void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls,
+ const string& methodName, const Value* args, void* userContext)
+{
+ int methodCount = cls->getMethodCount();
+ int idx;
+ for (idx = 0; idx < methodCount; idx++) {
+ const SchemaMethod* method = cls->getMethod(idx);
+ if (string(method->getName()) == methodName) {
+ Mutex::ScopedLock _lock(lock);
+ SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method));
+ stringstream key;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve(methodContext));
+
+ Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST, sequence);
+ oid->impl->encode(outBuffer);
+ cls->getClassKey()->impl->encode(outBuffer);
+ outBuffer.putShortString(methodName);
+
+ string argErrorString = encodeMethodArguments(method, args, outBuffer);
+ if (argErrorString.empty()) {
+ key << "agent.1." << oid->impl->getAgentBank();
+ sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
+ QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << " method=" << methodName << " key=" << key.str());
+ } else {
+ MethodResponsePtr argError(MethodResponseImpl::factory(1, argErrorString));
+ eventQueue.push_back(eventMethodResponse(userContext, argError));
+ }
+ return;
+ }
+ }
+
+ MethodResponsePtr error(MethodResponseImpl::factory(1, string("Unknown method: ") + methodName));
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(eventMethodResponse(userContext, error));
+}
+
+void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
+{
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(eventBind(exchange, queueName, key));
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
+ event->name = queueName;
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
+ event->name = queue;
+ event->exchange = exchange;
+ event->bindingKey = key;
+
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
+{
+ QPID_LOG(trace, "Console Link to Broker Stable");
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponsePtr response)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
+ event->context = context;
+ event->queryResponse = response;
+ return event;
+}
+
+BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponsePtr response)
+{
+ BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::METHOD_RESPONSE));
+ event->context = context;
+ event->methodResponse = response;
+ return event;
+}
+
+void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
+{
+ brokerId.decode(inBuffer);
+ QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
+ Mutex::ScopedLock _lock(lock);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ incOutstandingLH();
+ Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
+}
+
+void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
+{
+ string package;
+
+ inBuffer.getShortString(package);
+ QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
+ console.impl->learnPackage(package);
+
+ Mutex::ScopedLock _lock(lock);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ incOutstandingLH();
+ Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
+ outBuffer.putShortString(package);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
+}
+
+void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
+{
+ string text;
+ uint32_t code = inBuffer.getLong();
+ inBuffer.getShortString(text);
+ QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
+}
+
+void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
+{
+ uint8_t kind = inBuffer.getOctet();
+ auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
+
+ QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey->impl->str());
+
+ if (!console.impl->haveClass(classKey.get())) {
+ Mutex::ScopedLock _lock(lock);
+ incOutstandingLH();
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
+ classKey->impl->encode(outBuffer);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey->impl->str());
+ }
+}
+
+MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema)
+{
+ MethodResponsePtr response(MethodResponseImpl::factory(inBuffer, schema));
+
+ QPID_LOG(trace, "RCVD MethodResponse seq=" << seq << " status=" << response->getStatus() << " text=" <<
+ response->getException()->asString());
+
+ return response;
+}
+
+void BrokerProxyImpl::handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq, const string& routingKey)
+{
+ vector<string> tokens = qpid::split(routingKey, ".");
+ uint32_t agentBank;
+ uint64_t timestamp;
+
+ if (routingKey.empty() || tokens.size() != 4)
+ agentBank = 0;
+ else
+ agentBank = ::atoi(tokens[3].c_str());
+
+ timestamp = inBuffer.getLongLong();
+ map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
+ if (iter != agentList.end()) {
+ console.impl->eventAgentHeartbeat(iter->second, timestamp);
+ }
+ QPID_LOG(trace, "RCVD HeartbeatIndication seq=" << seq << " agentBank=" << agentBank);
+}
+
+void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+
+void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
+{
+ SchemaObjectClass* oClassPtr;
+ SchemaEventClass* eClassPtr;
+ uint8_t kind = inBuffer.getOctet();
+ const SchemaClassKey* key;
+ if (kind == CLASS_OBJECT) {
+ oClassPtr = SchemaObjectClassImpl::factory(inBuffer);
+ console.impl->learnClass(oClassPtr);
+ key = oClassPtr->getClassKey();
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str());
+
+ //
+ // If we have just learned about the org.apache.qpid.broker:agent class, send a get
+ // request for the current list of agents so we can have it on-hand before we declare
+ // this session "stable".
+ //
+ if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) {
+ Mutex::ScopedLock _lock(lock);
+ incOutstandingLH();
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
+ FieldTable ft;
+ ft.setString("_class", AGENT_CLASS);
+ ft.setString("_package", BROKER_PACKAGE);
+ ft.encode(outBuffer);
+ sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
+ QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
+ }
+ } else if (kind == CLASS_EVENT) {
+ eClassPtr = SchemaEventClassImpl::factory(inBuffer);
+ console.impl->learnClass(eClassPtr);
+ key = eClassPtr->getClassKey();
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str());
+ }
+ else {
+ QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
+ }
+}
+
+ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
+{
+ auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
+ QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str());
+
+ SchemaObjectClass* schema = console.impl->getSchema(classKey.get());
+ if (schema == 0) {
+ QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str());
+ return ObjectPtr();
+ }
+
+ ObjectPtr optr(ObjectImpl::factory(schema, this, inBuffer, prop, stat, true));
+ if (prop && classKey->impl->getPackageName() == BROKER_PACKAGE && classKey->impl->getClassName() == AGENT_CLASS) {
+ //
+ // We've intercepted information about a remote agent... update the agent list accordingly
+ //
+ updateAgentList(optr);
+ }
+ return optr;
+}
+
+void BrokerProxyImpl::updateAgentList(ObjectPtr obj)
+{
+ Value* value = obj->getValue("agentBank");
+ Mutex::ScopedLock _lock(lock);
+ if (value != 0 && value->isUint()) {
+ uint32_t agentBank = value->asUint();
+ if (obj->isDeleted()) {
+ map<uint32_t, AgentProxyPtr>::iterator iter = agentList.find(agentBank);
+ if (iter != agentList.end()) {
+ AgentProxyPtr agent(iter->second);
+ console.impl->eventAgentDeleted(agent);
+ agentList.erase(agentBank);
+ QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list");
+
+ //
+ // Release all sequence numbers for requests in-flight to this agent.
+ // Since the agent is no longer connected, these requests would not
+ // otherwise complete.
+ //
+ agent->impl->releaseInFlight(seqMgr);
+ }
+ } else {
+ Value* str = obj->getValue("label");
+ string label;
+ if (str != 0 && str->isString())
+ label = str->asString();
+ map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
+ if (iter == agentList.end()) {
+ AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, agentBank, label));
+ agentList[agentBank] = agent;
+ console.impl->eventAgentAdded(agent);
+ QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank);
+ }
+ }
+ }
+}
+
+void BrokerProxyImpl::incOutstandingLH()
+{
+ requestsOutstanding++;
+}
+
+void BrokerProxyImpl::decOutstanding()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding--;
+ if (requestsOutstanding == 0 && !topicBound) {
+ topicBound = true;
+ for (vector<pair<string, string> >::const_iterator iter = console.impl->bindingList.begin();
+ iter != console.impl->bindingList.end(); iter++) {
+ string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
+ string key(iter->second);
+ eventQueue.push_back(eventBind(exchange, queueName, key));
+ }
+ eventQueue.push_back(eventStable());
+ }
+}
+
+MethodResponseImpl::MethodResponseImpl(const MethodResponseImpl& from) :
+ status(from.status), schema(from.schema)
+{
+ if (from.exception.get())
+ exception.reset(new Value(*(from.exception)));
+ if (from.arguments.get())
+ arguments.reset(new Value(*(from.arguments)));
+}
+
+MethodResponseImpl::MethodResponseImpl(Buffer& buf, const SchemaMethod* s) : schema(s)
+{
+ string text;
+
+ status = buf.getLong();
+ buf.getMediumString(text);
+ exception.reset(new Value(TYPE_LSTR));
+ exception->setString(text.c_str());
+
+ if (status != 0)
+ return;
+
+ arguments.reset(new Value(TYPE_MAP));
+ int argCount(schema->getArgumentCount());
+ for (int idx = 0; idx < argCount; idx++) {
+ const SchemaArgument* arg = schema->getArgument(idx);
+ if (arg->getDirection() == DIR_OUT || arg->getDirection() == DIR_IN_OUT) {
+ Value* value(ValueImpl::factory(arg->getType(), buf));
+ arguments->insert(arg->getName(), value);
+ }
+ }
+}
+
+MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : schema(0)
+{
+ status = s;
+ exception.reset(new Value(TYPE_LSTR));
+ exception->setString(text.c_str());
+}
+
+MethodResponse* MethodResponseImpl::factory(Buffer& buf, const SchemaMethod* schema)
+{
+ MethodResponseImpl* impl(new MethodResponseImpl(buf, schema));
+ return new MethodResponse(impl);
+}
+
+MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string& text)
+{
+ MethodResponseImpl* impl(new MethodResponseImpl(status, text));
+ return new MethodResponse(impl);
+}
+
+bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& routingKey, Buffer& buffer)
+{
+ ObjectPtr object;
+ bool completeContext = false;
+
+ if (opcode == Protocol::OP_BROKER_RESPONSE) {
+ broker.handleBrokerResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
+ broker.handleSchemaResponse(buffer, sequence);
+ completeContext = true;
+ }
+ else if (opcode == Protocol::OP_PACKAGE_INDICATION)
+ broker.handlePackageIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_CLASS_INDICATION)
+ broker.handleClassIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
+ broker.handleHeartbeatIndication(buffer, sequence, routingKey);
+ else if (opcode == Protocol::OP_EVENT_INDICATION)
+ broker.handleEventIndication(buffer, sequence);
+ else if (opcode == Protocol::OP_PROPERTY_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, false);
+ broker.console.impl->eventObjectUpdate(object, true, false);
+ }
+ else if (opcode == Protocol::OP_STATISTIC_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, false, true);
+ broker.console.impl->eventObjectUpdate(object, false, true);
+ }
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, true);
+ broker.console.impl->eventObjectUpdate(object, true, true);
+ }
+ else {
+ QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
+void QueryContext::reserve()
+{
+ Mutex::ScopedLock _lock(lock);
+ requestsOutstanding++;
+}
+
+void QueryContext::release()
+{
+ {
+ Mutex::ScopedLock _lock(lock);
+ if (--requestsOutstanding > 0)
+ return;
+ }
+
+ Mutex::ScopedLock _block(broker.lock);
+ broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
+}
+
+bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
+{
+ bool completeContext = false;
+ ObjectPtr object;
+
+ if (opcode == Protocol::OP_COMMAND_COMPLETE) {
+ broker.handleCommandComplete(buffer, sequence);
+ completeContext = true;
+
+ //
+ // Visit each agent and remove the sequence from that agent's in-flight list.
+ // This could be made more efficient because only one agent will have this sequence
+ // in its list.
+ //
+ map<uint32_t, AgentProxyPtr> copy;
+ {
+ Mutex::ScopedLock _block(broker.lock);
+ copy = broker.agentList;
+ }
+ for (map<uint32_t, AgentProxyPtr>::iterator iter = copy.begin(); iter != copy.end(); iter++)
+ iter->second->impl->delSequence(sequence);
+ }
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, true);
+ if (object.get() != 0)
+ queryResponse->impl->results.push_back(object);
+ }
+ else {
+ QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+ completeContext = true;
+ }
+
+ return completeContext;
+}
+
+void MethodContext::release()
+{
+ Mutex::ScopedLock _block(broker.lock);
+ broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse));
+}
+
+bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
+{
+ if (opcode == Protocol::OP_METHOD_RESPONSE)
+ methodResponse = broker.handleMethodResponse(buffer, sequence, schema);
+ else
+ QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
+
+ return true;
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
+AgentProxy::~AgentProxy() { delete impl; }
+const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
+uint32_t AgentProxy::getBrokerBank() const { return impl->getBrokerBank(); }
+uint32_t AgentProxy::getAgentBank() const { return impl->getAgentBank(); }
+
+BrokerProxy::BrokerProxy(Console& console) : impl(new BrokerProxyImpl(*this, console)) {}
+BrokerProxy::~BrokerProxy() { delete impl; }
+void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
+void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
+void BrokerProxy::startProtocol() { impl->startProtocol(); }
+void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
+bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
+void BrokerProxy::popXmt() { impl->popXmt(); }
+bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
+void BrokerProxy::popEvent() { impl->popEvent(); }
+uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
+const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
+void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
+
+MethodResponse::MethodResponse(const MethodResponse& from) : impl(new MethodResponseImpl(*(from.impl))) {}
+MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
+MethodResponse::~MethodResponse() {}
+uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
+const Value* MethodResponse::getException() const { return impl->getException(); }
+const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
+
+QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
+QueryResponse::~QueryResponse() {}
+uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
+const Value* QueryResponse::getException() const { return impl->getException(); }
+uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
+const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
+
diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
new file mode 100644
index 0000000000..798a5fdc76
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
@@ -0,0 +1,239 @@
+#ifndef _QmfEngineBrokerProxyImpl_
+#define _QmfEngineBrokerProxyImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/Console.h"
+#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/SchemaImpl.h"
+#include "qmf/engine/ValueImpl.h"
+#include "qmf/engine/QueryImpl.h"
+#include "qmf/engine/SequenceManager.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include "boost/shared_ptr.hpp"
+#include "boost/noncopyable.hpp"
+#include <memory>
+#include <string>
+#include <deque>
+#include <map>
+#include <set>
+#include <vector>
+
+namespace qmf {
+namespace engine {
+
+ typedef boost::shared_ptr<MethodResponse> MethodResponsePtr;
+ struct MethodResponseImpl {
+ uint32_t status;
+ const SchemaMethod* schema;
+ std::auto_ptr<Value> exception;
+ std::auto_ptr<Value> arguments;
+
+ MethodResponseImpl(const MethodResponseImpl& from);
+ MethodResponseImpl(qpid::framing::Buffer& buf, const SchemaMethod* schema);
+ MethodResponseImpl(uint32_t status, const std::string& text);
+ static MethodResponse* factory(qpid::framing::Buffer& buf, const SchemaMethod* schema);
+ static MethodResponse* factory(uint32_t status, const std::string& text);
+ ~MethodResponseImpl() {}
+ uint32_t getStatus() const { return status; }
+ const Value* getException() const { return exception.get(); }
+ const Value* getArgs() const { return arguments.get(); }
+ };
+
+ typedef boost::shared_ptr<QueryResponse> QueryResponsePtr;
+ struct QueryResponseImpl {
+ uint32_t status;
+ std::auto_ptr<Value> exception;
+ std::vector<ObjectPtr> results;
+
+ QueryResponseImpl() : status(0) {}
+ static QueryResponse* factory() {
+ QueryResponseImpl* impl(new QueryResponseImpl());
+ return new QueryResponse(impl);
+ }
+ ~QueryResponseImpl() {}
+ uint32_t getStatus() const { return status; }
+ const Value* getException() const { return exception.get(); }
+ uint32_t getObjectCount() const { return results.size(); }
+ const Object* getObject(uint32_t idx) const;
+ };
+
+ struct BrokerEventImpl {
+ typedef boost::shared_ptr<BrokerEventImpl> Ptr;
+ BrokerEvent::EventKind kind;
+ std::string name;
+ std::string exchange;
+ std::string bindingKey;
+ void* context;
+ QueryResponsePtr queryResponse;
+ MethodResponsePtr methodResponse;
+
+ BrokerEventImpl(BrokerEvent::EventKind k) : kind(k), context(0) {}
+ ~BrokerEventImpl() {}
+ BrokerEvent copy();
+ };
+
+ typedef boost::shared_ptr<AgentProxy> AgentProxyPtr;
+ struct AgentProxyImpl {
+ Console& console;
+ BrokerProxy& broker;
+ uint32_t agentBank;
+ std::string label;
+ std::set<uint32_t> inFlightSequences;
+
+ AgentProxyImpl(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) : console(c), broker(b), agentBank(ab), label(l) {}
+ static AgentProxy* factory(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) {
+ AgentProxyImpl* impl(new AgentProxyImpl(c, b, ab, l));
+ return new AgentProxy(impl);
+ }
+ ~AgentProxyImpl() {}
+ const std::string& getLabel() const { return label; }
+ uint32_t getBrokerBank() const { return 1; }
+ uint32_t getAgentBank() const { return agentBank; }
+ void addSequence(uint32_t seq) { inFlightSequences.insert(seq); }
+ void delSequence(uint32_t seq) { inFlightSequences.erase(seq); }
+ void releaseInFlight(SequenceManager& seqMgr) {
+ for (std::set<uint32_t>::iterator iter = inFlightSequences.begin(); iter != inFlightSequences.end(); iter++)
+ seqMgr.release(*iter);
+ inFlightSequences.clear();
+ }
+ };
+
+ class BrokerProxyImpl : public boost::noncopyable {
+ public:
+ BrokerProxyImpl(BrokerProxy& pub, Console& _console);
+ ~BrokerProxyImpl() {}
+
+ void sessionOpened(SessionHandle& sh);
+ void sessionClosed();
+ void startProtocol();
+
+ void sendBufferLH(qpid::framing::Buffer& buf, const std::string& destination, const std::string& routingKey);
+ void handleRcvMessage(Message& message);
+ bool getXmtMessage(Message& item) const;
+ void popXmt();
+
+ bool getEvent(BrokerEvent& event) const;
+ void popEvent();
+
+ uint32_t agentCount() const;
+ const AgentProxy* getAgent(uint32_t idx) const;
+ void sendQuery(const Query& query, void* context, const AgentProxy* agent);
+ void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent);
+ std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer);
+ void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context);
+
+ void addBinding(const std::string& exchange, const std::string& key);
+ void staticRelease() { decOutstanding(); }
+
+ private:
+ friend struct StaticContext;
+ friend struct QueryContext;
+ friend struct MethodContext;
+ BrokerProxy& publicObject;
+ mutable qpid::sys::Mutex lock;
+ Console& console;
+ std::string queueName;
+ qpid::framing::Uuid brokerId;
+ SequenceManager seqMgr;
+ uint32_t requestsOutstanding;
+ bool topicBound;
+ std::map<uint32_t, AgentProxyPtr> agentList;
+ std::deque<MessageImpl::Ptr> xmtQueue;
+ std::deque<BrokerEventImpl::Ptr> eventQueue;
+
+# define MA_BUFFER_SIZE 65536
+ char outputBuffer[MA_BUFFER_SIZE];
+
+ BrokerEventImpl::Ptr eventDeclareQueue(const std::string& queueName);
+ BrokerEventImpl::Ptr eventBind(const std::string& exchange, const std::string& queue, const std::string& key);
+ BrokerEventImpl::Ptr eventSetupComplete();
+ BrokerEventImpl::Ptr eventStable();
+ BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponsePtr response);
+ BrokerEventImpl::Ptr eventMethodResponse(void* context, MethodResponsePtr response);
+
+ void handleBrokerResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handlePackageIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleCommandComplete(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleClassIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ MethodResponsePtr handleMethodResponse(qpid::framing::Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema);
+ void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, const std::string& routingKey);
+ void handleEventIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ void handleSchemaResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
+ ObjectPtr handleObjectIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
+ void updateAgentList(ObjectPtr obj);
+ void incOutstandingLH();
+ void decOutstanding();
+ };
+
+ //
+ // StaticContext is used to handle:
+ //
+ // 1) Responses to console-level requests (for schema info, etc.)
+ // 2) Unsolicited messages from agents (events, published updates, etc.)
+ //
+ struct StaticContext : public SequenceContext {
+ StaticContext(BrokerProxyImpl& b) : broker(b) {}
+ virtual ~StaticContext() {}
+ void reserve() {}
+ void release() { broker.staticRelease(); }
+ bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
+ BrokerProxyImpl& broker;
+ };
+
+ //
+ // QueryContext is used to track and handle responses associated with a single Get Query
+ //
+ struct QueryContext : public SequenceContext {
+ QueryContext(BrokerProxyImpl& b, void* u) :
+ broker(b), userContext(u), requestsOutstanding(0), queryResponse(QueryResponseImpl::factory()) {}
+ virtual ~QueryContext() {}
+ void reserve();
+ void release();
+ bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
+
+ mutable qpid::sys::Mutex lock;
+ BrokerProxyImpl& broker;
+ void* userContext;
+ uint32_t requestsOutstanding;
+ QueryResponsePtr queryResponse;
+ };
+
+ struct MethodContext : public SequenceContext {
+ MethodContext(BrokerProxyImpl& b, void* u, const SchemaMethod* s) : broker(b), userContext(u), schema(s) {}
+ virtual ~MethodContext() {}
+ void reserve() {}
+ void release();
+ bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
+
+ BrokerProxyImpl& broker;
+ void* userContext;
+ const SchemaMethod* schema;
+ MethodResponsePtr methodResponse;
+ };
+
+}
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/ConnectionSettingsImpl.cpp b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp
index 034ab18395..2cd6af10f8 100644
--- a/qpid/cpp/src/qmf/ConnectionSettingsImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp
@@ -17,11 +17,11 @@
* under the License.
*/
-#include "qmf/ConnectionSettingsImpl.h"
-#include "qmf/Typecode.h"
+#include "qmf/engine/ConnectionSettingsImpl.h"
+#include "qmf/engine/Typecode.h"
using namespace std;
-using namespace qmf;
+using namespace qmf::engine;
using namespace qpid;
const string attrProtocol("protocol");
@@ -43,19 +43,20 @@ const string attrMaxSsf("maxSsf");
const string attrRetryDelayMin("retryDelayMin");
const string attrRetryDelayMax("retryDelayMax");
const string attrRetryDelayFactor("retryDelayFactor");
+const string attrSendUserId("sendUserId");
-ConnectionSettingsImpl::ConnectionSettingsImpl(ConnectionSettings* e) :
- envelope(e), retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2)
+ConnectionSettingsImpl::ConnectionSettingsImpl() :
+ retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2), sendUserId(true)
{
}
-ConnectionSettingsImpl::ConnectionSettingsImpl(ConnectionSettings* e, const string& /*url*/) :
- envelope(e), retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2)
+ConnectionSettingsImpl::ConnectionSettingsImpl(const string& /*url*/) :
+ retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2), sendUserId(true)
{
// TODO: Parse the URL
}
-void ConnectionSettingsImpl::setAttr(const string& key, const Value& value)
+bool ConnectionSettingsImpl::setAttr(const string& key, const Value& value)
{
if (key == attrProtocol) clientSettings.protocol = value.asString();
else if (key == attrHost) clientSettings.host = value.asString();
@@ -77,6 +78,10 @@ void ConnectionSettingsImpl::setAttr(const string& key, const Value& value)
else if (key == attrRetryDelayMin) retryDelayMin = value.asUint();
else if (key == attrRetryDelayMax) retryDelayMax = value.asUint();
else if (key == attrRetryDelayFactor) retryDelayFactor = value.asUint();
+ else if (key == attrSendUserId) sendUserId = value.asBool();
+ else
+ return false;
+ return true;
}
Value ConnectionSettingsImpl::getAttr(const string& key) const
@@ -251,73 +256,18 @@ void ConnectionSettingsImpl::getRetrySettings(int* min, int* max, int* factor) c
// Wrappers
//==================================================================
-ConnectionSettings::ConnectionSettings(const ConnectionSettings& from)
-{
- impl = new ConnectionSettingsImpl(*from.impl);
-}
-
-ConnectionSettings::ConnectionSettings()
-{
- impl = new ConnectionSettingsImpl(this);
-}
-
-ConnectionSettings::ConnectionSettings(const char* url)
-{
- impl = new ConnectionSettingsImpl(this, url);
-}
-
-ConnectionSettings::~ConnectionSettings()
-{
- delete impl;
-}
-
-void ConnectionSettings::setAttr(const char* key, const Value& value)
-{
- impl->setAttr(key, value);
-}
-
-Value ConnectionSettings::getAttr(const char* key) const
-{
- return impl->getAttr(key);
-}
-
-const char* ConnectionSettings::getAttrString() const
-{
- return impl->getAttrString().c_str();
-}
-
-void ConnectionSettings::transportTcp(uint16_t port)
-{
- impl->transportTcp(port);
-}
-
-void ConnectionSettings::transportSsl(uint16_t port)
-{
- impl->transportSsl(port);
-}
-
-void ConnectionSettings::transportRdma(uint16_t port)
-{
- impl->transportRdma(port);
-}
-
-void ConnectionSettings::authAnonymous(const char* username)
-{
- impl->authAnonymous(username);
-}
-
-void ConnectionSettings::authPlain(const char* username, const char* password)
-{
- impl->authPlain(username, password);
-}
-
-void ConnectionSettings::authGssapi(const char* serviceName, uint32_t minSsf, uint32_t maxSsf)
-{
- impl->authGssapi(serviceName, minSsf, maxSsf);
-}
-
-void ConnectionSettings::setRetry(int delayMin, int delayMax, int delayFactor)
-{
- impl->setRetry(delayMin, delayMax, delayFactor);
-}
+ConnectionSettings::ConnectionSettings(const ConnectionSettings& from) { impl = new ConnectionSettingsImpl(*from.impl); }
+ConnectionSettings::ConnectionSettings() { impl = new ConnectionSettingsImpl(); }
+ConnectionSettings::ConnectionSettings(const char* url) { impl = new ConnectionSettingsImpl(url); }
+ConnectionSettings::~ConnectionSettings() { delete impl; }
+bool ConnectionSettings::setAttr(const char* key, const Value& value) { return impl->setAttr(key, value); }
+Value ConnectionSettings::getAttr(const char* key) const { return impl->getAttr(key); }
+const char* ConnectionSettings::getAttrString() const { return impl->getAttrString().c_str(); }
+void ConnectionSettings::transportTcp(uint16_t port) { impl->transportTcp(port); }
+void ConnectionSettings::transportSsl(uint16_t port) { impl->transportSsl(port); }
+void ConnectionSettings::transportRdma(uint16_t port) { impl->transportRdma(port); }
+void ConnectionSettings::authAnonymous(const char* username) { impl->authAnonymous(username); }
+void ConnectionSettings::authPlain(const char* username, const char* password) { impl->authPlain(username, password); }
+void ConnectionSettings::authGssapi(const char* serviceName, uint32_t minSsf, uint32_t maxSsf) { impl->authGssapi(serviceName, minSsf, maxSsf); }
+void ConnectionSettings::setRetry(int delayMin, int delayMax, int delayFactor) { impl->setRetry(delayMin, delayMax, delayFactor); }
diff --git a/qpid/cpp/src/qmf/ConnectionSettingsImpl.h b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h
index a177233cf3..98bf87868b 100644
--- a/qpid/cpp/src/qmf/ConnectionSettingsImpl.h
+++ b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h
@@ -1,5 +1,5 @@
-#ifndef _QmfConnectionSettingsImpl_
-#define _QmfConnectionSettingsImpl_
+#ifndef _QmfEngineConnectionSettingsImpl_
+#define _QmfEngineConnectionSettingsImpl_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,27 +20,28 @@
* under the License.
*/
-#include "qmf/ConnectionSettings.h"
-#include "qmf/Value.h"
+#include "qmf/engine/ConnectionSettings.h"
+#include "qmf/engine/Value.h"
#include "qpid/client/ConnectionSettings.h"
#include <string>
#include <map>
namespace qmf {
+namespace engine {
class ConnectionSettingsImpl {
- ConnectionSettings* envelope;
qpid::client::ConnectionSettings clientSettings;
mutable std::string attrString;
int retryDelayMin;
int retryDelayMax;
int retryDelayFactor;
+ bool sendUserId;
public:
- ConnectionSettingsImpl(ConnectionSettings* e);
- ConnectionSettingsImpl(ConnectionSettings* e, const std::string& url);
+ ConnectionSettingsImpl();
+ ConnectionSettingsImpl(const std::string& url);
~ConnectionSettingsImpl() {}
- void setAttr(const std::string& key, const Value& value);
+ bool setAttr(const std::string& key, const Value& value);
Value getAttr(const std::string& key) const;
const std::string& getAttrString() const;
void transportTcp(uint16_t port);
@@ -53,8 +54,10 @@ namespace qmf {
const qpid::client::ConnectionSettings& getClientSettings() const;
void getRetrySettings(int* delayMin, int* delayMax, int* delayFactor) const;
+ bool getSendUserId() const { return sendUserId; }
};
}
+}
#endif
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
new file mode 100644
index 0000000000..c2d1f51f2b
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/ConsoleImpl.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qmf/engine/SchemaImpl.h"
+#include "qmf/engine/Typecode.h"
+#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/ObjectIdImpl.h"
+#include "qmf/engine/QueryImpl.h"
+#include "qmf/engine/ValueImpl.h"
+#include "qmf/engine/Protocol.h"
+#include "qmf/engine/SequenceManager.h"
+#include "qmf/engine/BrokerProxyImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/log/Statement.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/SystemInfo.h>
+#include <string.h>
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace {
+ const char* QMF_EXCHANGE = "qpid.management";
+}
+
+#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
+
+ConsoleEvent ConsoleEventImpl::copy()
+{
+ ConsoleEvent item;
+
+ ::memset(&item, 0, sizeof(ConsoleEvent));
+ item.kind = kind;
+ item.agent = agent.get();
+ item.classKey = classKey;
+ item.object = object.get();
+ item.context = context;
+ item.event = event;
+ item.timestamp = timestamp;
+ item.hasProps = hasProps;
+ item.hasStats = hasStats;
+
+ STRING_REF(name);
+
+ return item;
+}
+
+ConsoleImpl::ConsoleImpl(const ConsoleSettings& s) : settings(s)
+{
+ bindingList.push_back(pair<string, string>(string(), "schema.#"));
+ if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
+ bindingList.push_back(pair<string, string>(string(), "console.#"));
+ } else {
+ if (settings.rcvObjects && !settings.userBindings)
+ bindingList.push_back(pair<string, string>(string(), "console.obj.#"));
+ else
+ bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent"));
+ if (settings.rcvEvents)
+ bindingList.push_back(pair<string, string>(string(), "console.event.#"));
+ if (settings.rcvHeartbeats)
+ bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#"));
+ }
+}
+
+ConsoleImpl::~ConsoleImpl()
+{
+ // This function intentionally left blank.
+}
+
+bool ConsoleImpl::getEvent(ConsoleEvent& event) const
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front()->copy();
+ return true;
+}
+
+void ConsoleImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+void ConsoleImpl::addConnection(BrokerProxy& broker, void* /*context*/)
+{
+ Mutex::ScopedLock _lock(lock);
+ brokerList.push_back(broker.impl);
+}
+
+void ConsoleImpl::delConnection(BrokerProxy& broker)
+{
+ Mutex::ScopedLock _lock(lock);
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ if (*iter == broker.impl) {
+ brokerList.erase(iter);
+ break;
+ }
+}
+
+uint32_t ConsoleImpl::packageCount() const
+{
+ Mutex::ScopedLock _lock(lock);
+ return packages.size();
+}
+
+const string& ConsoleImpl::getPackageName(uint32_t idx) const
+{
+ const static string empty;
+
+ Mutex::ScopedLock _lock(lock);
+ if (idx >= packages.size())
+ return empty;
+
+ PackageList::const_iterator iter = packages.begin();
+ for (uint32_t i = 0; i < idx; i++) iter++;
+ return iter->first;
+}
+
+uint32_t ConsoleImpl::classCount(const char* packageName) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+
+ return oList.size() + eList.size();
+}
+
+const SchemaClassKey* ConsoleImpl::getClass(const char* packageName, uint32_t idx) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(packageName);
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+ uint32_t count = 0;
+
+ for (ObjectClassList::const_iterator oIter = oList.begin();
+ oIter != oList.end(); oIter++) {
+ if (count == idx)
+ return oIter->second->getClassKey();
+ count++;
+ }
+
+ for (EventClassList::const_iterator eIter = eList.begin();
+ eIter != eList.end(); eIter++) {
+ if (count == idx)
+ return eIter->second->getClassKey();
+ count++;
+ }
+
+ return 0;
+}
+
+ClassKind ConsoleImpl::getClassKind(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return CLASS_OBJECT;
+
+ const EventClassList& eList = pIter->second.second;
+ if (eList.find(key) != eList.end())
+ return CLASS_EVENT;
+ return CLASS_OBJECT;
+}
+
+const SchemaObjectClass* ConsoleImpl::getObjectClass(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ ObjectClassList::const_iterator iter = oList.find(key);
+ if (iter == oList.end())
+ return 0;
+ return iter->second;
+}
+
+const SchemaEventClass* ConsoleImpl::getEventClass(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const EventClassList& eList = pIter->second.second;
+ EventClassList::const_iterator iter = eList.find(key);
+ if (iter == eList.end())
+ return 0;
+ return iter->second;
+}
+
+void ConsoleImpl::bindPackage(const char* packageName)
+{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+void ConsoleImpl::bindClass(const SchemaClassKey* classKey)
+{
+ stringstream key;
+ key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+void ConsoleImpl::bindClass(const char* packageName, const char* className)
+{
+ stringstream key;
+ key << "console.obj.*.*." << packageName << "." << className << ".#";
+ Mutex::ScopedLock _lock(lock);
+ bindingList.push_back(pair<string, string>(string(), key.str()));
+ for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
+ iter != brokerList.end(); iter++)
+ (*iter)->addBinding(QMF_EXCHANGE, key.str());
+}
+
+/*
+void ConsoleImpl::startSync(const Query& query, void* context, SyncQuery& sync)
+{
+}
+
+void ConsoleImpl::touchSync(SyncQuery& sync)
+{
+}
+
+void ConsoleImpl::endSync(SyncQuery& sync)
+{
+}
+*/
+
+void ConsoleImpl::learnPackage(const string& packageName)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (packages.find(packageName) == packages.end()) {
+ packages.insert(pair<string, pair<ObjectClassList, EventClassList> >
+ (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList())));
+ eventNewPackage(packageName);
+ }
+}
+
+void ConsoleImpl::learnClass(SchemaObjectClass* cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ const SchemaClassKey* key = cls->getClassKey();
+ PackageList::iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return;
+
+ ObjectClassList& list = pIter->second.first;
+ if (list.find(key) == list.end()) {
+ list[key] = cls;
+ eventNewClass(key);
+ }
+}
+
+void ConsoleImpl::learnClass(SchemaEventClass* cls)
+{
+ Mutex::ScopedLock _lock(lock);
+ const SchemaClassKey* key = cls->getClassKey();
+ PackageList::iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return;
+
+ EventClassList& list = pIter->second.second;
+ if (list.find(key) == list.end()) {
+ list[key] = cls;
+ eventNewClass(key);
+ }
+}
+
+bool ConsoleImpl::haveClass(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return false;
+
+ const ObjectClassList& oList = pIter->second.first;
+ const EventClassList& eList = pIter->second.second;
+
+ return oList.find(key) != oList.end() || eList.find(key) != eList.end();
+}
+
+SchemaObjectClass* ConsoleImpl::getSchema(const SchemaClassKey* key) const
+{
+ Mutex::ScopedLock _lock(lock);
+ PackageList::const_iterator pIter = packages.find(key->getPackageName());
+ if (pIter == packages.end())
+ return 0;
+
+ const ObjectClassList& oList = pIter->second.first;
+ ObjectClassList::const_iterator iter = oList.find(key);
+ if (iter == oList.end())
+ return 0;
+
+ return iter->second;
+}
+
+void ConsoleImpl::eventAgentAdded(boost::shared_ptr<AgentProxy> agent)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_ADDED));
+ event->agent = agent;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventAgentDeleted(boost::shared_ptr<AgentProxy> agent)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_DELETED));
+ event->agent = agent;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventNewPackage(const string& packageName)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_PACKAGE));
+ event->name = packageName;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventNewClass(const SchemaClassKey* key)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_CLASS));
+ event->classKey = key;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventObjectUpdate(ObjectPtr object, bool prop, bool stat)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::OBJECT_UPDATE));
+ event->object = object;
+ event->hasProps = prop;
+ event->hasStats = stat;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+void ConsoleImpl::eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp)
+{
+ ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_HEARTBEAT));
+ event->agent = agent;
+ event->timestamp = timestamp;
+ Mutex::ScopedLock _lock(lock);
+ eventQueue.push_back(event);
+}
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Console::Console(const ConsoleSettings& settings) : impl(new ConsoleImpl(settings)) {}
+Console::~Console() { delete impl; }
+bool Console::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }
+void Console::popEvent() { impl->popEvent(); }
+void Console::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); }
+void Console::delConnection(BrokerProxy& broker) { impl->delConnection(broker); }
+uint32_t Console::packageCount() const { return impl->packageCount(); }
+const char* Console::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); }
+uint32_t Console::classCount(const char* packageName) const { return impl->classCount(packageName); }
+const SchemaClassKey* Console::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); }
+ClassKind Console::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); }
+const SchemaObjectClass* Console::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); }
+const SchemaEventClass* Console::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); }
+void Console::bindPackage(const char* packageName) { impl->bindPackage(packageName); }
+void Console::bindClass(const SchemaClassKey* key) { impl->bindClass(key); }
+void Console::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); }
+//void Console::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); }
+//void Console::touchSync(SyncQuery& sync) { impl->touchSync(sync); }
+//void Console::endSync(SyncQuery& sync) { impl->endSync(sync); }
+
+
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.h b/qpid/cpp/src/qmf/engine/ConsoleImpl.h
new file mode 100644
index 0000000000..8f99c5e6b9
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.h
@@ -0,0 +1,145 @@
+#ifndef _QmfEngineConsoleEngineImpl_
+#define _QmfEngineConsoleEngineImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/Console.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qmf/engine/SchemaImpl.h"
+#include "qmf/engine/Typecode.h"
+#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/ObjectIdImpl.h"
+#include "qmf/engine/QueryImpl.h"
+#include "qmf/engine/ValueImpl.h"
+#include "qmf/engine/Protocol.h"
+#include "qmf/engine/SequenceManager.h"
+#include "qmf/engine/BrokerProxyImpl.h"
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/Uuid.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/SystemInfo.h>
+#include <string.h>
+#include <string>
+#include <deque>
+#include <map>
+#include <vector>
+#include <iostream>
+#include <fstream>
+#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace qmf {
+namespace engine {
+
+ struct ConsoleEventImpl {
+ typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
+ ConsoleEvent::EventKind kind;
+ boost::shared_ptr<AgentProxy> agent;
+ std::string name;
+ const SchemaClassKey* classKey;
+ boost::shared_ptr<Object> object;
+ void* context;
+ Event* event;
+ uint64_t timestamp;
+ bool hasProps;
+ bool hasStats;
+
+ ConsoleEventImpl(ConsoleEvent::EventKind k) :
+ kind(k), classKey(0), context(0), event(0), timestamp(0) {}
+ ~ConsoleEventImpl() {}
+ ConsoleEvent copy();
+ };
+
+ class ConsoleImpl : public boost::noncopyable {
+ public:
+ ConsoleImpl(const ConsoleSettings& settings = ConsoleSettings());
+ ~ConsoleImpl();
+
+ bool getEvent(ConsoleEvent& event) const;
+ void popEvent();
+
+ void addConnection(BrokerProxy& broker, void* context);
+ void delConnection(BrokerProxy& broker);
+
+ uint32_t packageCount() const;
+ const std::string& getPackageName(uint32_t idx) const;
+
+ uint32_t classCount(const char* packageName) const;
+ const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
+
+ ClassKind getClassKind(const SchemaClassKey* key) const;
+ const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const;
+ const SchemaEventClass* getEventClass(const SchemaClassKey* key) const;
+
+ void bindPackage(const char* packageName);
+ void bindClass(const SchemaClassKey* key);
+ void bindClass(const char* packageName, const char* className);
+
+ /*
+ void startSync(const Query& query, void* context, SyncQuery& sync);
+ void touchSync(SyncQuery& sync);
+ void endSync(SyncQuery& sync);
+ */
+
+ private:
+ friend class BrokerProxyImpl;
+ friend struct StaticContext;
+ const ConsoleSettings& settings;
+ mutable qpid::sys::Mutex lock;
+ std::deque<ConsoleEventImpl::Ptr> eventQueue;
+ std::vector<BrokerProxyImpl*> brokerList;
+ std::vector<std::pair<std::string, std::string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
+
+ // Declare a compare class for the class maps that compares the dereferenced
+ // class key pointers. The default behavior would be to compare the pointer
+ // addresses themselves.
+ struct KeyCompare {
+ bool operator()(const SchemaClassKey* left, const SchemaClassKey* right) const {
+ return *left < *right;
+ }
+ };
+
+ typedef std::map<const SchemaClassKey*, SchemaObjectClass*, KeyCompare> ObjectClassList;
+ typedef std::map<const SchemaClassKey*, SchemaEventClass*, KeyCompare> EventClassList;
+ typedef std::map<std::string, std::pair<ObjectClassList, EventClassList> > PackageList;
+
+ PackageList packages;
+
+ void learnPackage(const std::string& packageName);
+ void learnClass(SchemaObjectClass* cls);
+ void learnClass(SchemaEventClass* cls);
+ bool haveClass(const SchemaClassKey* key) const;
+ SchemaObjectClass* getSchema(const SchemaClassKey* key) const;
+
+ void eventAgentAdded(boost::shared_ptr<AgentProxy> agent);
+ void eventAgentDeleted(boost::shared_ptr<AgentProxy> agent);
+ void eventNewPackage(const std::string& packageName);
+ void eventNewClass(const SchemaClassKey* key);
+ void eventObjectUpdate(ObjectPtr object, bool prop, bool stat);
+ void eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp);
+ };
+}
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/MessageImpl.cpp b/qpid/cpp/src/qmf/engine/MessageImpl.cpp
index f2625c7202..0047d3eb9d 100644
--- a/qpid/cpp/src/qmf/MessageImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/MessageImpl.cpp
@@ -17,11 +17,11 @@
* under the License.
*/
-#include "qmf/MessageImpl.h"
+#include "qmf/engine/MessageImpl.h"
#include <string.h>
using namespace std;
-using namespace qmf;
+using namespace qmf::engine;
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
diff --git a/qpid/cpp/src/qmf/MessageImpl.h b/qpid/cpp/src/qmf/engine/MessageImpl.h
index 137f435699..b91291d2e4 100644
--- a/qpid/cpp/src/qmf/MessageImpl.h
+++ b/qpid/cpp/src/qmf/engine/MessageImpl.h
@@ -1,5 +1,5 @@
-#ifndef _QmfMessageImpl_
-#define _QmfMessageImpl_
+#ifndef _QmfEngineMessageImpl_
+#define _QmfEngineMessageImpl_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,11 +20,12 @@
* under the License.
*/
-#include "qmf/Message.h"
+#include "qmf/engine/Message.h"
#include <string>
#include <boost/shared_ptr.hpp>
namespace qmf {
+namespace engine {
struct MessageImpl {
typedef boost::shared_ptr<MessageImpl> Ptr;
@@ -38,5 +39,6 @@ namespace qmf {
Message copy();
};
}
+}
#endif
diff --git a/qpid/cpp/src/qmf/ObjectIdImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
index c0618ccc49..b08ae2756c 100644
--- a/qpid/cpp/src/qmf/ObjectIdImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
@@ -17,11 +17,11 @@
* under the License.
*/
-#include "qmf/ObjectIdImpl.h"
+#include "qmf/engine/ObjectIdImpl.h"
#include <stdlib.h>
using namespace std;
-using namespace qmf;
+using namespace qmf::engine;
using qpid::framing::Buffer;
@@ -32,13 +32,12 @@ void AgentAttachment::setBanks(uint32_t broker, uint32_t agent)
((uint64_t) (agent & 0x0fffffff));
}
-ObjectIdImpl::ObjectIdImpl(Buffer& buffer) : envelope(new ObjectId(this)), agent(0)
+ObjectIdImpl::ObjectIdImpl(Buffer& buffer) : agent(0)
{
decode(buffer);
}
-ObjectIdImpl::ObjectIdImpl(AgentAttachment* a, uint8_t flags, uint16_t seq, uint64_t object) :
- envelope(new ObjectId(this)), agent(a)
+ObjectIdImpl::ObjectIdImpl(AgentAttachment* a, uint8_t flags, uint16_t seq, uint64_t object) : agent(a)
{
first =
((uint64_t) (flags & 0x0f)) << 60 |
@@ -46,6 +45,18 @@ ObjectIdImpl::ObjectIdImpl(AgentAttachment* a, uint8_t flags, uint16_t seq, uint
second = object;
}
+ObjectId* ObjectIdImpl::factory(Buffer& buffer)
+{
+ ObjectIdImpl* impl(new ObjectIdImpl(buffer));
+ return new ObjectId(impl);
+}
+
+ObjectId* ObjectIdImpl::factory(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object)
+{
+ ObjectIdImpl* impl(new ObjectIdImpl(agent, flags, seq, object));
+ return new ObjectId(impl);
+}
+
void ObjectIdImpl::decode(Buffer& buffer)
{
first = buffer.getLongLong();
@@ -100,13 +111,14 @@ void ObjectIdImpl::fromString(const std::string& repr)
agent = 0;
}
-std::string ObjectIdImpl::asString() const
+const string& ObjectIdImpl::asString() const
{
stringstream val;
- val << getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" <<
+ val << (int) getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" <<
getAgentBank() << "-" << getObjectNum();
- return val.str();
+ repr = val.str();
+ return repr;
}
bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const
@@ -135,58 +147,22 @@ bool ObjectIdImpl::operator>(const ObjectIdImpl& other) const
// Wrappers
//==================================================================
-ObjectId::ObjectId() : impl(new ObjectIdImpl(this)) {}
-
+ObjectId::ObjectId() : impl(new ObjectIdImpl()) {}
ObjectId::ObjectId(const ObjectId& from) : impl(new ObjectIdImpl(*(from.impl))) {}
-
ObjectId::ObjectId(ObjectIdImpl* i) : impl(i) {}
+ObjectId::~ObjectId() { delete impl; }
+uint64_t ObjectId::getObjectNum() const { return impl->getObjectNum(); }
+uint32_t ObjectId::getObjectNumHi() const { return impl->getObjectNumHi(); }
+uint32_t ObjectId::getObjectNumLo() const { return impl->getObjectNumLo(); }
+bool ObjectId::isDurable() const { return impl->isDurable(); }
+const char* ObjectId::str() const { return impl->asString().c_str(); }
+uint8_t ObjectId::getFlags() const { return impl->getFlags(); }
+uint16_t ObjectId::getSequence() const { return impl->getSequence(); }
+uint32_t ObjectId::getBrokerBank() const { return impl->getBrokerBank(); }
+uint32_t ObjectId::getAgentBank() const { return impl->getAgentBank(); }
+bool ObjectId::operator==(const ObjectId& other) const { return *impl == *other.impl; }
+bool ObjectId::operator<(const ObjectId& other) const { return *impl < *other.impl; }
+bool ObjectId::operator>(const ObjectId& other) const { return *impl > *other.impl; }
+bool ObjectId::operator<=(const ObjectId& other) const { return !(*impl > *other.impl); }
+bool ObjectId::operator>=(const ObjectId& other) const { return !(*impl < *other.impl); }
-ObjectId::~ObjectId()
-{
- delete impl;
-}
-
-uint64_t ObjectId::getObjectNum() const
-{
- return impl->getObjectNum();
-}
-
-uint32_t ObjectId::getObjectNumHi() const
-{
- return impl->getObjectNumHi();
-}
-
-uint32_t ObjectId::getObjectNumLo() const
-{
- return impl->getObjectNumLo();
-}
-
-bool ObjectId::isDurable() const
-{
- return impl->isDurable();
-}
-
-bool ObjectId::operator==(const ObjectId& other) const
-{
- return *impl == *other.impl;
-}
-
-bool ObjectId::operator<(const ObjectId& other) const
-{
- return *impl < *other.impl;
-}
-
-bool ObjectId::operator>(const ObjectId& other) const
-{
- return *impl > *other.impl;
-}
-
-bool ObjectId::operator<=(const ObjectId& other) const
-{
- return !(*impl > *other.impl);
-}
-
-bool ObjectId::operator>=(const ObjectId& other) const
-{
- return !(*impl < *other.impl);
-}
diff --git a/qpid/cpp/src/qmf/ObjectIdImpl.h b/qpid/cpp/src/qmf/engine/ObjectIdImpl.h
index 38d231237f..d9871ac217 100644
--- a/qpid/cpp/src/qmf/ObjectIdImpl.h
+++ b/qpid/cpp/src/qmf/engine/ObjectIdImpl.h
@@ -1,5 +1,5 @@
-#ifndef _QmfObjectIdImpl_
-#define _QmfObjectIdImpl_
+#ifndef _QmfEngineObjectIdImpl_
+#define _QmfEngineObjectIdImpl_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,10 +20,11 @@
* under the License.
*/
-#include <qmf/ObjectId.h>
+#include <qmf/engine/ObjectId.h>
#include <qpid/framing/Buffer.h>
namespace qmf {
+namespace engine {
struct AgentAttachment {
uint64_t first;
@@ -34,19 +35,22 @@ namespace qmf {
};
struct ObjectIdImpl {
- ObjectId* envelope;
AgentAttachment* agent;
uint64_t first;
uint64_t second;
+ mutable std::string repr;
- ObjectIdImpl(ObjectId* e) : envelope(e), agent(0), first(0), second(0) {}
+ ObjectIdImpl() : agent(0), first(0), second(0) {}
ObjectIdImpl(qpid::framing::Buffer& buffer);
ObjectIdImpl(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object);
+ static ObjectId* factory(qpid::framing::Buffer& buffer);
+ static ObjectId* factory(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object);
+
void decode(qpid::framing::Buffer& buffer);
void encode(qpid::framing::Buffer& buffer) const;
void fromString(const std::string& repr);
- std::string asString() const;
+ const std::string& asString() const;
uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; }
uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; }
uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; }
@@ -62,6 +66,7 @@ namespace qmf {
bool operator>(const ObjectIdImpl& other) const;
};
}
+}
#endif
diff --git a/qpid/cpp/src/qmf/ObjectImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
index 1ea2d54527..cae0e0da68 100644
--- a/qpid/cpp/src/qmf/ObjectImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
@@ -17,18 +17,17 @@
* under the License.
*/
-#include "qmf/ObjectImpl.h"
-#include "qmf/ValueImpl.h"
+#include "qmf/engine/ObjectImpl.h"
+#include "qmf/engine/ValueImpl.h"
+#include "qmf/engine/BrokerProxyImpl.h"
#include <qpid/sys/Time.h>
using namespace std;
-using namespace qmf;
+using namespace qmf::engine;
using namespace qpid::sys;
using qpid::framing::Buffer;
-ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) :
- envelope(e), objectClass(type), createTime(uint64_t(Duration(now()))),
- destroyTime(0), lastUpdatedTime(createTime)
+ObjectImpl::ObjectImpl(const SchemaObjectClass* type) : objectClass(type), broker(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
{
int propCount = objectClass->getPropertyCount();
int statCount = objectClass->getStatisticCount();
@@ -45,8 +44,8 @@ ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) :
}
}
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop, bool stat, bool managed) :
- envelope(new Object(this)), objectClass(type), createTime(0), destroyTime(0), lastUpdatedTime(0)
+ObjectImpl::ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) :
+ objectClass(type), broker(b), createTime(0), destroyTime(0), lastUpdatedTime(0)
{
int idx;
@@ -54,7 +53,7 @@ ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop,
lastUpdatedTime = buffer.getLongLong();
createTime = buffer.getLongLong();
destroyTime = buffer.getLongLong();
- objectId.reset(new ObjectIdImpl(buffer));
+ objectId.reset(ObjectIdImpl::factory(buffer));
}
if (prop) {
@@ -66,8 +65,8 @@ ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop,
if (excludes.count(prop->getName()) != 0) {
properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
} else {
- ValueImpl* pval = new ValueImpl(prop->getType(), buffer);
- properties[prop->getName()] = ValuePtr(pval->envelope);
+ Value* pval = ValueImpl::factory(prop->getType(), buffer);
+ properties[prop->getName()] = ValuePtr(pval);
}
}
}
@@ -76,12 +75,18 @@ ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop,
int statCount = objectClass->getStatisticCount();
for (idx = 0; idx < statCount; idx++) {
const SchemaStatistic* stat = objectClass->getStatistic(idx);
- ValueImpl* sval = new ValueImpl(stat->getType(), buffer);
- statistics[stat->getName()] = ValuePtr(sval->envelope);
+ Value* sval = ValueImpl::factory(stat->getType(), buffer);
+ statistics[stat->getName()] = ValuePtr(sval);
}
}
}
+Object* ObjectImpl::factory(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed)
+{
+ ObjectImpl* impl(new ObjectImpl(type, b, buffer, prop, stat, managed));
+ return new Object(impl);
+}
+
ObjectImpl::~ObjectImpl()
{
}
@@ -107,6 +112,22 @@ Value* ObjectImpl::getValue(const string& key) const
return 0;
}
+void ObjectImpl::invokeMethod(const string& methodName, const Value* inArgs, void* context) const
+{
+ if (broker != 0 && objectId.get() != 0)
+ broker->sendMethodRequest(objectId.get(), objectClass, methodName, inArgs, context);
+}
+
+void ObjectImpl::merge(const Object& from)
+{
+ for (map<string, ValuePtr>::const_iterator piter = from.impl->properties.begin();
+ piter != from.impl->properties.end(); piter++)
+ properties[piter->first] = piter->second;
+ for (map<string, ValuePtr>::const_iterator siter = from.impl->statistics.begin();
+ siter != from.impl->statistics.end(); siter++)
+ statistics[siter->first] = siter->second;
+}
+
void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList)
{
int propCount = objectClass->getPropertyCount();
@@ -143,7 +164,7 @@ void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const
buffer.putLongLong(lastUpdatedTime);
buffer.putLongLong(createTime);
buffer.putLongLong(destroyTime);
- objectId->encode(buffer);
+ objectId->impl->encode(buffer);
}
void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const
@@ -196,7 +217,7 @@ void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const
// Wrappers
//==================================================================
-Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(this, type)) {}
+Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(type)) {}
Object::Object(ObjectImpl* i) : impl(i) {}
Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {}
Object::~Object() { delete impl; }
@@ -204,5 +225,8 @@ void Object::destroy() { impl->destroy(); }
const ObjectId* Object::getObjectId() const { return impl->getObjectId(); }
void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); }
const SchemaObjectClass* Object::getClass() const { return impl->getClass(); }
-Value* Object::getValue(char* key) const { return impl->getValue(key); }
+Value* Object::getValue(const char* key) const { return impl->getValue(key); }
+void Object::invokeMethod(const char* m, const Value* a, void* c) const { impl->invokeMethod(m, a, c); }
+bool Object::isDeleted() const { return impl->isDeleted(); }
+void Object::merge(const Object& from) { impl->merge(from); }
diff --git a/qpid/cpp/src/qmf/ObjectImpl.h b/qpid/cpp/src/qmf/engine/ObjectImpl.h
index d69979e0da..ddd20bfea2 100644
--- a/qpid/cpp/src/qmf/ObjectImpl.h
+++ b/qpid/cpp/src/qmf/engine/ObjectImpl.h
@@ -1,5 +1,5 @@
-#ifndef _QmfObjectImpl_
-#define _QmfObjectImpl_
+#ifndef _QmfEngineObjectImpl_
+#define _QmfEngineObjectImpl_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,8 +20,8 @@
* under the License.
*/
-#include <qmf/Object.h>
-#include <qmf/ObjectIdImpl.h>
+#include <qmf/engine/Object.h>
+#include <qmf/engine/ObjectIdImpl.h>
#include <map>
#include <set>
#include <string>
@@ -30,28 +30,38 @@
#include <qpid/sys/Mutex.h>
namespace qmf {
+namespace engine {
+
+ class BrokerProxyImpl;
+
+ typedef boost::shared_ptr<Object> ObjectPtr;
struct ObjectImpl {
- typedef boost::shared_ptr<ObjectImpl> Ptr;
typedef boost::shared_ptr<Value> ValuePtr;
- Object* envelope;
const SchemaObjectClass* objectClass;
- boost::shared_ptr<ObjectIdImpl> objectId;
+ BrokerProxyImpl* broker;
+ boost::shared_ptr<ObjectId> objectId;
uint64_t createTime;
uint64_t destroyTime;
uint64_t lastUpdatedTime;
mutable std::map<std::string, ValuePtr> properties;
mutable std::map<std::string, ValuePtr> statistics;
- ObjectImpl(Object* e, const SchemaObjectClass* type);
- ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer, bool prop, bool stat, bool managed);
+ ObjectImpl(const SchemaObjectClass* type);
+ ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
+ bool prop, bool stat, bool managed);
+ static Object* factory(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
+ bool prop, bool stat, bool managed);
~ObjectImpl();
void destroy();
- const ObjectId* getObjectId() const { return objectId.get() ? objectId->envelope : 0; }
- void setObjectId(ObjectId* oid) { objectId.reset(oid->impl); }
+ const ObjectId* getObjectId() const { return objectId.get(); }
+ void setObjectId(ObjectId* oid) { objectId.reset(oid); }
const SchemaObjectClass* getClass() const { return objectClass; }
Value* getValue(const std::string& key) const;
+ void invokeMethod(const std::string& methodName, const Value* inArgs, void* context) const;
+ bool isDeleted() const { return destroyTime != 0; }
+ void merge(const Object& from);
void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList);
void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
@@ -60,6 +70,7 @@ namespace qmf {
void encodeStatistics(qpid::framing::Buffer& buffer) const;
};
}
+}
#endif
diff --git a/qpid/cpp/src/qmf/Protocol.cpp b/qpid/cpp/src/qmf/engine/Protocol.cpp
index 0a3beeb276..6061b70a8d 100644
--- a/qpid/cpp/src/qmf/Protocol.cpp
+++ b/qpid/cpp/src/qmf/engine/Protocol.cpp
@@ -17,11 +17,11 @@
* under the License.
*/
-#include "qmf/Protocol.h"
+#include "qmf/engine/Protocol.h"
#include "qpid/framing/Buffer.h"
using namespace std;
-using namespace qmf;
+using namespace qmf::engine;
using namespace qpid::framing;
diff --git a/qpid/cpp/src/qmf/Protocol.h b/qpid/cpp/src/qmf/engine/Protocol.h
index d5da08c1db..1cdfa60c84 100644
--- a/qpid/cpp/src/qmf/Protocol.h
+++ b/qpid/cpp/src/qmf/engine/Protocol.h
@@ -1,5 +1,5 @@
-#ifndef _QmfProtocol_
-#define _QmfProtocol_
+#ifndef _QmfEngineProtocol_
+#define _QmfEngineProtocol_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -29,6 +29,7 @@ namespace qpid {
}
namespace qmf {
+namespace engine {
class Protocol {
public:
@@ -62,6 +63,7 @@ namespace qmf {
};
}
+}
#endif
diff --git a/qpid/cpp/src/qmf/QueryImpl.cpp b/qpid/cpp/src/qmf/engine/QueryImpl.cpp
index f75a9aa5d5..6f2beeee87 100644
--- a/qpid/cpp/src/qmf/QueryImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/QueryImpl.cpp
@@ -17,13 +17,13 @@
* under the License.
*/
-#include "qmf/QueryImpl.h"
-#include "qmf/ObjectIdImpl.h"
+#include "qmf/engine/QueryImpl.h"
+#include "qmf/engine/ObjectIdImpl.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/FieldTable.h"
using namespace std;
-using namespace qmf;
+using namespace qmf::engine;
using namespace qpid::framing;
bool QueryElementImpl::evaluate(const Object* /*object*/) const
@@ -45,6 +45,12 @@ QueryImpl::QueryImpl(Buffer& buffer)
// TODO
}
+Query* QueryImpl::factory(Buffer& buffer)
+{
+ QueryImpl* impl(new QueryImpl(buffer));
+ return new Query(impl);
+}
+
void QueryImpl::encode(Buffer& buffer) const
{
FieldTable ft;
@@ -69,14 +75,17 @@ QueryElement::QueryElement(const char* attrName, const Value* value, ValueOper o
QueryElement::QueryElement(QueryElementImpl* i) : impl(i) {}
QueryElement::~QueryElement() { delete impl; }
bool QueryElement::evaluate(const Object* object) const { return impl->evaluate(object); }
+
QueryExpression::QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2) : impl(new QueryExpressionImpl(oper, operand1, operand2)) {}
QueryExpression::QueryExpression(QueryExpressionImpl* i) : impl(i) {}
QueryExpression::~QueryExpression() { delete impl; }
bool QueryExpression::evaluate(const Object* object) const { return impl->evaluate(object); }
+
Query::Query(const char* className, const char* packageName) : impl(new QueryImpl(className, packageName)) {}
Query::Query(const SchemaClassKey* key) : impl(new QueryImpl(key)) {}
Query::Query(const ObjectId* oid) : impl(new QueryImpl(oid)) {}
Query::Query(QueryImpl* i) : impl(i) {}
+Query::Query(const Query& from) : impl(new QueryImpl(*(from.impl))) {}
Query::~Query() { delete impl; }
void Query::setSelect(const QueryOperand* criterion) { impl->setSelect(criterion); }
void Query::setLimit(uint32_t maxResults) { impl->setLimit(maxResults); }
diff --git a/qpid/cpp/src/qmf/QueryImpl.h b/qpid/cpp/src/qmf/engine/QueryImpl.h
index 4a56a457c0..2c64c6739c 100644
--- a/qpid/cpp/src/qmf/QueryImpl.h
+++ b/qpid/cpp/src/qmf/engine/QueryImpl.h
@@ -1,5 +1,5 @@
-#ifndef _QmfQueryImpl_
-#define _QmfQueryImpl_
+#ifndef _QmfEngineQueryImpl_
+#define _QmfEngineQueryImpl_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,8 +20,8 @@
* under the License.
*/
-#include "qmf/Query.h"
-#include "qmf/Schema.h"
+#include "qmf/engine/Query.h"
+#include "qmf/engine/Schema.h"
#include <string>
#include <boost/shared_ptr.hpp>
@@ -32,41 +32,39 @@ namespace qpid {
}
namespace qmf {
+namespace engine {
struct QueryElementImpl {
- QueryElementImpl(const std::string& a, const Value* v, ValueOper o) :
- envelope(new QueryElement(this)), attrName(a), value(v), oper(o) {}
+ QueryElementImpl(const std::string& a, const Value* v, ValueOper o) : attrName(a), value(v), oper(o) {}
~QueryElementImpl() {}
bool evaluate(const Object* object) const;
- QueryElement* envelope;
std::string attrName;
const Value* value;
ValueOper oper;
};
struct QueryExpressionImpl {
- QueryExpressionImpl(ExprOper o, const QueryOperand* operand1, const QueryOperand* operand2) :
- envelope(new QueryExpression(this)), oper(o), left(operand1), right(operand2) {}
+ QueryExpressionImpl(ExprOper o, const QueryOperand* operand1, const QueryOperand* operand2) : oper(o), left(operand1), right(operand2) {}
~QueryExpressionImpl() {}
bool evaluate(const Object* object) const;
- QueryExpression* envelope;
ExprOper oper;
const QueryOperand* left;
const QueryOperand* right;
};
struct QueryImpl {
- QueryImpl(Query* e) : envelope(e), select(0) {}
- QueryImpl(const std::string& c, const std::string& p) :
- envelope(new Query(this)), packageName(p), className(c) {}
- QueryImpl(const SchemaClassKey* key) :
- envelope(new Query(this)), packageName(key->getPackageName()), className(key->getClassName()) {}
- QueryImpl(const ObjectId* oid) :
- envelope(new Query(this)), oid(new ObjectId(*oid)) {}
+ // Constructors mapped to public
+ QueryImpl(const std::string& c, const std::string& p) : packageName(p), className(c), select(0), resultLimit(0) {}
+ QueryImpl(const SchemaClassKey* key) : packageName(key->getPackageName()), className(key->getClassName()), select(0), resultLimit(0) {}
+ QueryImpl(const ObjectId* oid) : oid(new ObjectId(*oid)), select(0), resultLimit(0) {}
+
+ // Factory constructors
QueryImpl(qpid::framing::Buffer& buffer);
+
~QueryImpl() {};
+ static Query* factory(qpid::framing::Buffer& buffer);
void setSelect(const QueryOperand* criterion) { select = criterion; }
void setLimit(uint32_t maxResults) { resultLimit = maxResults; }
@@ -88,7 +86,6 @@ namespace qmf {
void encode(qpid::framing::Buffer& buffer) const;
- Query* envelope;
std::string packageName;
std::string className;
boost::shared_ptr<ObjectId> oid;
@@ -98,5 +95,6 @@ namespace qmf {
bool orderDecreasing;
};
}
+}
#endif
diff --git a/qpid/cpp/src/qmf/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
index 7ec03cf4da..9502130288 100644
--- a/qpid/cpp/src/qmf/ResilientConnection.cpp
+++ b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
@@ -17,9 +17,9 @@
* under the License.
*/
-#include "qmf/ResilientConnection.h"
-#include "qmf/MessageImpl.h"
-#include "qmf/ConnectionSettingsImpl.h"
+#include "qmf/engine/ResilientConnection.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qmf/engine/ConnectionSettingsImpl.h"
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
@@ -38,13 +38,15 @@
#include <vector>
#include <set>
#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
using namespace std;
-using namespace qmf;
+using namespace qmf::engine;
using namespace qpid;
using qpid::sys::Mutex;
namespace qmf {
+namespace engine {
struct ResilientConnectionEventImpl {
ResilientConnectionEvent::EventKind kind;
void* sessionContext;
@@ -64,20 +66,19 @@ namespace qmf {
client::Connection& connection;
client::Session session;
client::SubscriptionManager* subscriptions;
+ string userId;
void* userContext;
vector<string> dests;
qpid::sys::Thread thread;
- RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) :
- connImpl(ci), name(n), connection(c), session(connection.newSession(name)),
- subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this) {}
+ RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc);
~RCSession();
void received(client::Message& msg);
void run();
void stop();
};
- class ResilientConnectionImpl : public qpid::sys::Runnable {
+ class ResilientConnectionImpl : public qpid::sys::Runnable, public boost::noncopyable {
public:
ResilientConnectionImpl(const ConnectionSettings& settings);
~ResilientConnectionImpl();
@@ -87,7 +88,7 @@ namespace qmf {
void popEvent();
bool createSession(const char* name, void* sessionContext, SessionHandle& handle);
void destroySession(SessionHandle handle);
- void sendMessage(SessionHandle handle, qmf::Message& message);
+ void sendMessage(SessionHandle handle, qmf::engine::Message& message);
void declareQueue(SessionHandle handle, char* queue);
void deleteQueue(SessionHandle handle, char* queue);
void bind(SessionHandle handle, char* exchange, char* queue, char* key);
@@ -120,6 +121,7 @@ namespace qmf {
set<RCSession::Ptr> sessions;
};
}
+}
ResilientConnectionEvent ResilientConnectionEventImpl::copy()
{
@@ -134,6 +136,14 @@ ResilientConnectionEvent ResilientConnectionEventImpl::copy()
return item;
}
+RCSession::RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) :
+ connImpl(ci), name(n), connection(c), session(connection.newSession(name)),
+ subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this)
+{
+ const qpid::client::ConnectionSettings& operSettings = connection.getNegotiatedSettings();
+ userId = operSettings.username;
+}
+
RCSession::~RCSession()
{
subscriptions->stop();
@@ -158,18 +168,23 @@ void RCSession::stop()
void RCSession::received(client::Message& msg)
{
- qmf::MessageImpl qmsg;
+ MessageImpl qmsg;
qmsg.body = msg.getData();
- qpid::framing::MessageProperties p = msg.getMessageProperties();
- if (p.hasReplyTo()) {
- const qpid::framing::ReplyTo& rt = p.getReplyTo();
+ qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties();
+ if (dp.hasRoutingKey()) {
+ qmsg.routingKey = dp.getRoutingKey();
+ }
+
+ qpid::framing::MessageProperties mp = msg.getMessageProperties();
+ if (mp.hasReplyTo()) {
+ const qpid::framing::ReplyTo& rt = mp.getReplyTo();
qmsg.replyExchange = rt.getExchange();
qmsg.replyKey = rt.getRoutingKey();
}
- if (p.hasUserId()) {
- qmsg.userId = p.getUserId();
+ if (mp.hasUserId()) {
+ qmsg.userId = mp.getUserId();
}
connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg);
@@ -244,7 +259,7 @@ void ResilientConnectionImpl::destroySession(SessionHandle handle)
}
}
-void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& message)
+void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::engine::Message& message)
{
Mutex::ScopedLock _lock(lock);
RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
@@ -253,6 +268,8 @@ void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& me
string data(message.body, message.length);
msg.getDeliveryProperties().setRoutingKey(message.routingKey);
msg.getMessageProperties().setReplyTo(qpid::framing::ReplyTo(message.replyExchange, message.replyKey));
+ if (settings.impl->getSendUserId())
+ msg.getMessageProperties().setUserId(sess->userId);
msg.setData(data);
try {
@@ -383,7 +400,7 @@ void ResilientConnectionImpl::sessionClosed(RCSession*)
void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind kind,
void* sessionContext,
- const qmf::MessageImpl& message,
+ const MessageImpl& message,
const string& errorText)
{
Mutex::ScopedLock _lock(lock);
@@ -440,7 +457,7 @@ void ResilientConnection::destroySession(SessionHandle handle)
impl->destroySession(handle);
}
-void ResilientConnection::sendMessage(SessionHandle handle, qmf::Message& message)
+void ResilientConnection::sendMessage(SessionHandle handle, qmf::engine::Message& message)
{
impl->sendMessage(handle, message);
}
diff --git a/qpid/cpp/src/qmf/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
index 3eb14c3952..e366a66826 100644
--- a/qpid/cpp/src/qmf/SchemaImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
@@ -17,7 +17,7 @@
* under the License.
*/
-#include "qmf/SchemaImpl.h"
+#include "qmf/engine/SchemaImpl.h"
#include <qpid/framing/Buffer.h>
#include <qpid/framing/FieldTable.h>
#include <qpid/framing/Uuid.h>
@@ -26,7 +26,7 @@
#include <vector>
using namespace std;
-using namespace qmf;
+using namespace qmf::engine;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
using qpid::framing::Uuid;
@@ -81,7 +81,7 @@ bool SchemaHash::operator>(const SchemaHash& other) const
return ::memcmp(&hash, &other.hash, 16) > 0;
}
-SchemaArgumentImpl::SchemaArgumentImpl(Buffer& buffer) : envelope(new SchemaArgument(this))
+SchemaArgumentImpl::SchemaArgumentImpl(Buffer& buffer)
{
FieldTable map;
map.decode(buffer);
@@ -99,6 +99,12 @@ SchemaArgumentImpl::SchemaArgumentImpl(Buffer& buffer) : envelope(new SchemaArgu
dir = DIR_IN_OUT;
}
+SchemaArgument* SchemaArgumentImpl::factory(Buffer& buffer)
+{
+ SchemaArgumentImpl* impl(new SchemaArgumentImpl(buffer));
+ return new SchemaArgument(impl);
+}
+
void SchemaArgumentImpl::encode(Buffer& buffer) const
{
FieldTable map;
@@ -128,7 +134,7 @@ void SchemaArgumentImpl::updateHash(SchemaHash& hash) const
hash.update(description);
}
-SchemaMethodImpl::SchemaMethodImpl(Buffer& buffer) : envelope(new SchemaMethod(this))
+SchemaMethodImpl::SchemaMethodImpl(Buffer& buffer)
{
FieldTable map;
int argCount;
@@ -139,11 +145,17 @@ SchemaMethodImpl::SchemaMethodImpl(Buffer& buffer) : envelope(new SchemaMethod(t
description = map.getAsString("desc");
for (int idx = 0; idx < argCount; idx++) {
- SchemaArgumentImpl* arg = new SchemaArgumentImpl(buffer);
- addArgument(*arg->envelope);
+ SchemaArgument* arg = SchemaArgumentImpl::factory(buffer);
+ addArgument(arg);
}
}
+SchemaMethod* SchemaMethodImpl::factory(Buffer& buffer)
+{
+ SchemaMethodImpl* impl(new SchemaMethodImpl(buffer));
+ return new SchemaMethod(impl);
+}
+
void SchemaMethodImpl::encode(Buffer& buffer) const
{
FieldTable map;
@@ -154,23 +166,23 @@ void SchemaMethodImpl::encode(Buffer& buffer) const
map.setString("desc", description);
map.encode(buffer);
- for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin();
+ for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
iter != arguments.end(); iter++)
- (*iter)->encode(buffer);
+ (*iter)->impl->encode(buffer);
}
-void SchemaMethodImpl::addArgument(const SchemaArgument& argument)
+void SchemaMethodImpl::addArgument(const SchemaArgument* argument)
{
- arguments.push_back(argument.impl);
+ arguments.push_back(argument);
}
const SchemaArgument* SchemaMethodImpl::getArgument(int idx) const
{
int count = 0;
- for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin();
+ for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
iter != arguments.end(); iter++, count++)
if (idx == count)
- return (*iter)->envelope;
+ return (*iter);
return 0;
}
@@ -178,12 +190,12 @@ void SchemaMethodImpl::updateHash(SchemaHash& hash) const
{
hash.update(name);
hash.update(description);
- for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin();
+ for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
iter != arguments.end(); iter++)
- (*iter)->updateHash(hash);
+ (*iter)->impl->updateHash(hash);
}
-SchemaPropertyImpl::SchemaPropertyImpl(Buffer& buffer) : envelope(new SchemaProperty(this))
+SchemaPropertyImpl::SchemaPropertyImpl(Buffer& buffer)
{
FieldTable map;
map.decode(buffer);
@@ -197,6 +209,12 @@ SchemaPropertyImpl::SchemaPropertyImpl(Buffer& buffer) : envelope(new SchemaProp
description = map.getAsString("desc");
}
+SchemaProperty* SchemaPropertyImpl::factory(Buffer& buffer)
+{
+ SchemaPropertyImpl* impl(new SchemaPropertyImpl(buffer));
+ return new SchemaProperty(impl);
+}
+
void SchemaPropertyImpl::encode(Buffer& buffer) const
{
FieldTable map;
@@ -225,7 +243,7 @@ void SchemaPropertyImpl::updateHash(SchemaHash& hash) const
hash.update(description);
}
-SchemaStatisticImpl::SchemaStatisticImpl(Buffer& buffer) : envelope(new SchemaStatistic(this))
+SchemaStatisticImpl::SchemaStatisticImpl(Buffer& buffer)
{
FieldTable map;
map.decode(buffer);
@@ -236,6 +254,12 @@ SchemaStatisticImpl::SchemaStatisticImpl(Buffer& buffer) : envelope(new SchemaSt
description = map.getAsString("desc");
}
+SchemaStatistic* SchemaStatisticImpl::factory(Buffer& buffer)
+{
+ SchemaStatisticImpl* impl(new SchemaStatisticImpl(buffer));
+ return new SchemaStatistic(impl);
+}
+
void SchemaStatisticImpl::encode(Buffer& buffer) const
{
FieldTable map;
@@ -258,16 +282,26 @@ void SchemaStatisticImpl::updateHash(SchemaHash& hash) const
hash.update(description);
}
-SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) :
- envelope(new SchemaClassKey(this)), package(p), name(n), hash(h) {}
+SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) : package(p), name(n), hash(h) {}
-SchemaClassKeyImpl::SchemaClassKeyImpl(Buffer& buffer) :
- envelope(new SchemaClassKey(this)), package(packageContainer), name(nameContainer), hash(hashContainer)
+SchemaClassKeyImpl::SchemaClassKeyImpl(Buffer& buffer) : package(packageContainer), name(nameContainer), hash(hashContainer)
{
buffer.getShortString(packageContainer);
buffer.getShortString(nameContainer);
hashContainer.decode(buffer);
-}
+}
+
+SchemaClassKey* SchemaClassKeyImpl::factory(const string& package, const string& name, const SchemaHash& hash)
+{
+ SchemaClassKeyImpl* impl(new SchemaClassKeyImpl(package, name, hash));
+ return new SchemaClassKey(impl);
+}
+
+SchemaClassKey* SchemaClassKeyImpl::factory(Buffer& buffer)
+{
+ SchemaClassKeyImpl* impl(new SchemaClassKeyImpl(buffer));
+ return new SchemaClassKey(impl);
+}
void SchemaClassKeyImpl::encode(Buffer& buffer) const
{
@@ -292,16 +326,16 @@ bool SchemaClassKeyImpl::operator<(const SchemaClassKeyImpl& other) const
return hash < other.hash;
}
-string SchemaClassKeyImpl::str() const
+const string& SchemaClassKeyImpl::str() const
{
Uuid printableHash(hash.get());
stringstream str;
str << package << ":" << name << "(" << printableHash << ")";
- return str.str();
+ repr = str.str();
+ return repr;
}
-SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) :
- envelope(new SchemaObjectClass(this)), hasHash(true), classKey(package, name, hash)
+SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash))
{
buffer.getShortString(package);
buffer.getShortString(name);
@@ -313,21 +347,27 @@ SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) :
uint16_t methodCount = buffer.getShort();
for (uint16_t idx = 0; idx < propCount; idx++) {
- SchemaPropertyImpl* property = new SchemaPropertyImpl(buffer);
- addProperty(*property->envelope);
+ const SchemaProperty* property = SchemaPropertyImpl::factory(buffer);
+ addProperty(property);
}
for (uint16_t idx = 0; idx < statCount; idx++) {
- SchemaStatisticImpl* statistic = new SchemaStatisticImpl(buffer);
- addStatistic(*statistic->envelope);
+ const SchemaStatistic* statistic = SchemaStatisticImpl::factory(buffer);
+ addStatistic(statistic);
}
for (uint16_t idx = 0; idx < methodCount; idx++) {
- SchemaMethodImpl* method = new SchemaMethodImpl(buffer);
- addMethod(*method->envelope);
+ SchemaMethod* method = SchemaMethodImpl::factory(buffer);
+ addMethod(method);
}
}
+SchemaObjectClass* SchemaObjectClassImpl::factory(Buffer& buffer)
+{
+ SchemaObjectClassImpl* impl(new SchemaObjectClassImpl(buffer));
+ return new SchemaObjectClass(impl);
+}
+
void SchemaObjectClassImpl::encode(Buffer& buffer) const
{
buffer.putOctet((uint8_t) CLASS_OBJECT);
@@ -339,15 +379,15 @@ void SchemaObjectClassImpl::encode(Buffer& buffer) const
buffer.putShort((uint16_t) statistics.size());
buffer.putShort((uint16_t) methods.size());
- for (vector<SchemaPropertyImpl*>::const_iterator iter = properties.begin();
+ for (vector<const SchemaProperty*>::const_iterator iter = properties.begin();
iter != properties.end(); iter++)
- (*iter)->encode(buffer);
- for (vector<SchemaStatisticImpl*>::const_iterator iter = statistics.begin();
+ (*iter)->impl->encode(buffer);
+ for (vector<const SchemaStatistic*>::const_iterator iter = statistics.begin();
iter != statistics.end(); iter++)
- (*iter)->encode(buffer);
- for (vector<SchemaMethodImpl*>::const_iterator iter = methods.begin();
+ (*iter)->impl->encode(buffer);
+ for (vector<const SchemaMethod*>::const_iterator iter = methods.begin();
iter != methods.end(); iter++)
- (*iter)->encode(buffer);
+ (*iter)->impl->encode(buffer);
}
const SchemaClassKey* SchemaObjectClassImpl::getClassKey() const
@@ -356,67 +396,66 @@ const SchemaClassKey* SchemaObjectClassImpl::getClassKey() const
hasHash = true;
hash.update(package);
hash.update(name);
- for (vector<SchemaPropertyImpl*>::const_iterator iter = properties.begin();
+ for (vector<const SchemaProperty*>::const_iterator iter = properties.begin();
iter != properties.end(); iter++)
- (*iter)->updateHash(hash);
- for (vector<SchemaStatisticImpl*>::const_iterator iter = statistics.begin();
+ (*iter)->impl->updateHash(hash);
+ for (vector<const SchemaStatistic*>::const_iterator iter = statistics.begin();
iter != statistics.end(); iter++)
- (*iter)->updateHash(hash);
- for (vector<SchemaMethodImpl*>::const_iterator iter = methods.begin();
+ (*iter)->impl->updateHash(hash);
+ for (vector<const SchemaMethod*>::const_iterator iter = methods.begin();
iter != methods.end(); iter++)
- (*iter)->updateHash(hash);
+ (*iter)->impl->updateHash(hash);
}
- return classKey.envelope;
+ return classKey.get();
}
-void SchemaObjectClassImpl::addProperty(const SchemaProperty& property)
+void SchemaObjectClassImpl::addProperty(const SchemaProperty* property)
{
- properties.push_back(property.impl);
+ properties.push_back(property);
}
-void SchemaObjectClassImpl::addStatistic(const SchemaStatistic& statistic)
+void SchemaObjectClassImpl::addStatistic(const SchemaStatistic* statistic)
{
- statistics.push_back(statistic.impl);
+ statistics.push_back(statistic);
}
-void SchemaObjectClassImpl::addMethod(const SchemaMethod& method)
+void SchemaObjectClassImpl::addMethod(const SchemaMethod* method)
{
- methods.push_back(method.impl);
+ methods.push_back(method);
}
const SchemaProperty* SchemaObjectClassImpl::getProperty(int idx) const
{
int count = 0;
- for (vector<SchemaPropertyImpl*>::const_iterator iter = properties.begin();
+ for (vector<const SchemaProperty*>::const_iterator iter = properties.begin();
iter != properties.end(); iter++, count++)
if (idx == count)
- return (*iter)->envelope;
+ return *iter;
return 0;
}
const SchemaStatistic* SchemaObjectClassImpl::getStatistic(int idx) const
{
int count = 0;
- for (vector<SchemaStatisticImpl*>::const_iterator iter = statistics.begin();
+ for (vector<const SchemaStatistic*>::const_iterator iter = statistics.begin();
iter != statistics.end(); iter++, count++)
if (idx == count)
- return (*iter)->envelope;
+ return *iter;
return 0;
}
const SchemaMethod* SchemaObjectClassImpl::getMethod(int idx) const
{
int count = 0;
- for (vector<SchemaMethodImpl*>::const_iterator iter = methods.begin();
+ for (vector<const SchemaMethod*>::const_iterator iter = methods.begin();
iter != methods.end(); iter++, count++)
if (idx == count)
- return (*iter)->envelope;
+ return *iter;
return 0;
}
-SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) :
- envelope(new SchemaEventClass(this)), hasHash(true), classKey(package, name, hash)
+SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash))
{
buffer.getShortString(package);
buffer.getShortString(name);
@@ -426,11 +465,17 @@ SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) :
uint16_t argCount = buffer.getShort();
for (uint16_t idx = 0; idx < argCount; idx++) {
- SchemaArgumentImpl* argument = new SchemaArgumentImpl(buffer);
- addArgument(*argument->envelope);
+ SchemaArgument* argument = SchemaArgumentImpl::factory(buffer);
+ addArgument(argument);
}
}
+SchemaEventClass* SchemaEventClassImpl::factory(Buffer& buffer)
+{
+ SchemaEventClassImpl* impl(new SchemaEventClassImpl(buffer));
+ return new SchemaEventClass(impl);
+}
+
void SchemaEventClassImpl::encode(Buffer& buffer) const
{
buffer.putOctet((uint8_t) CLASS_EVENT);
@@ -439,9 +484,9 @@ void SchemaEventClassImpl::encode(Buffer& buffer) const
hash.encode(buffer);
buffer.putShort((uint16_t) arguments.size());
- for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin();
+ for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
iter != arguments.end(); iter++)
- (*iter)->encode(buffer);
+ (*iter)->impl->encode(buffer);
}
const SchemaClassKey* SchemaEventClassImpl::getClassKey() const
@@ -450,25 +495,25 @@ const SchemaClassKey* SchemaEventClassImpl::getClassKey() const
hasHash = true;
hash.update(package);
hash.update(name);
- for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin();
+ for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
iter != arguments.end(); iter++)
- (*iter)->updateHash(hash);
+ (*iter)->impl->updateHash(hash);
}
- return classKey.envelope;
+ return classKey.get();
}
-void SchemaEventClassImpl::addArgument(const SchemaArgument& argument)
+void SchemaEventClassImpl::addArgument(const SchemaArgument* argument)
{
- arguments.push_back(argument.impl);
+ arguments.push_back(argument);
}
const SchemaArgument* SchemaEventClassImpl::getArgument(int idx) const
{
int count = 0;
- for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin();
+ for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
iter != arguments.end(); iter++, count++)
if (idx == count)
- return (*iter)->envelope;
+ return (*iter);
return 0;
}
@@ -477,8 +522,9 @@ const SchemaArgument* SchemaEventClassImpl::getArgument(int idx) const
// Wrappers
//==================================================================
-SchemaArgument::SchemaArgument(const char* name, Typecode typecode) { impl = new SchemaArgumentImpl(this, name, typecode); }
+SchemaArgument::SchemaArgument(const char* name, Typecode typecode) { impl = new SchemaArgumentImpl(name, typecode); }
SchemaArgument::SchemaArgument(SchemaArgumentImpl* i) : impl(i) {}
+SchemaArgument::SchemaArgument(const SchemaArgument& from) : impl(new SchemaArgumentImpl(*(from.impl))) {}
SchemaArgument::~SchemaArgument() { delete impl; }
void SchemaArgument::setDirection(Direction dir) { impl->setDirection(dir); }
void SchemaArgument::setUnit(const char* val) { impl->setUnit(val); }
@@ -488,17 +534,21 @@ Typecode SchemaArgument::getType() const { return impl->getType(); }
Direction SchemaArgument::getDirection() const { return impl->getDirection(); }
const char* SchemaArgument::getUnit() const { return impl->getUnit().c_str(); }
const char* SchemaArgument::getDesc() const { return impl->getDesc().c_str(); }
-SchemaMethod::SchemaMethod(const char* name) { impl = new SchemaMethodImpl(this, name); }
+
+SchemaMethod::SchemaMethod(const char* name) : impl(new SchemaMethodImpl(name)) {}
SchemaMethod::SchemaMethod(SchemaMethodImpl* i) : impl(i) {}
+SchemaMethod::SchemaMethod(const SchemaMethod& from) : impl(new SchemaMethodImpl(*(from.impl))) {}
SchemaMethod::~SchemaMethod() { delete impl; }
-void SchemaMethod::addArgument(const SchemaArgument& argument) { impl->addArgument(argument); }
+void SchemaMethod::addArgument(const SchemaArgument* argument) { impl->addArgument(argument); }
void SchemaMethod::setDesc(const char* desc) { impl->setDesc(desc); }
const char* SchemaMethod::getName() const { return impl->getName().c_str(); }
const char* SchemaMethod::getDesc() const { return impl->getDesc().c_str(); }
int SchemaMethod::getArgumentCount() const { return impl->getArgumentCount(); }
const SchemaArgument* SchemaMethod::getArgument(int idx) const { return impl->getArgument(idx); }
-SchemaProperty::SchemaProperty(const char* name, Typecode typecode) { impl = new SchemaPropertyImpl(this, name, typecode); }
+
+SchemaProperty::SchemaProperty(const char* name, Typecode typecode) : impl(new SchemaPropertyImpl(name, typecode)) {}
SchemaProperty::SchemaProperty(SchemaPropertyImpl* i) : impl(i) {}
+SchemaProperty::SchemaProperty(const SchemaProperty& from) : impl(new SchemaPropertyImpl(*(from.impl))) {}
SchemaProperty::~SchemaProperty() { delete impl; }
void SchemaProperty::setAccess(Access access) { impl->setAccess(access); }
void SchemaProperty::setIndex(bool val) { impl->setIndex(val); }
@@ -512,8 +562,10 @@ bool SchemaProperty::isIndex() const { return impl->isIndex(); }
bool SchemaProperty::isOptional() const { return impl->isOptional(); }
const char* SchemaProperty::getUnit() const { return impl->getUnit().c_str(); }
const char* SchemaProperty::getDesc() const { return impl->getDesc().c_str(); }
-SchemaStatistic::SchemaStatistic(const char* name, Typecode typecode) { impl = new SchemaStatisticImpl(this, name, typecode); }
+
+SchemaStatistic::SchemaStatistic(const char* name, Typecode typecode) : impl(new SchemaStatisticImpl(name, typecode)) {}
SchemaStatistic::SchemaStatistic(SchemaStatisticImpl* i) : impl(i) {}
+SchemaStatistic::SchemaStatistic(const SchemaStatistic& from) : impl(new SchemaStatisticImpl(*(from.impl))) {}
SchemaStatistic::~SchemaStatistic() { delete impl; }
void SchemaStatistic::setUnit(const char* val) { impl->setUnit(val); }
void SchemaStatistic::setDesc(const char* desc) { impl->setDesc(desc); }
@@ -521,17 +573,24 @@ const char* SchemaStatistic::getName() const { return impl->getName().c_str(); }
Typecode SchemaStatistic::getType() const { return impl->getType(); }
const char* SchemaStatistic::getUnit() const { return impl->getUnit().c_str(); }
const char* SchemaStatistic::getDesc() const { return impl->getDesc().c_str(); }
+
SchemaClassKey::SchemaClassKey(SchemaClassKeyImpl* i) : impl(i) {}
+SchemaClassKey::SchemaClassKey(const SchemaClassKey& from) : impl(new SchemaClassKeyImpl(*(from.impl))) {}
SchemaClassKey::~SchemaClassKey() { delete impl; }
const char* SchemaClassKey::getPackageName() const { return impl->getPackageName().c_str(); }
const char* SchemaClassKey::getClassName() const { return impl->getClassName().c_str(); }
const uint8_t* SchemaClassKey::getHash() const { return impl->getHash(); }
-SchemaObjectClass::SchemaObjectClass(const char* package, const char* name) { impl = new SchemaObjectClassImpl(this, package, name); }
+const char* SchemaClassKey::asString() const { return impl->str().c_str(); }
+bool SchemaClassKey::operator==(const SchemaClassKey& other) const { return *impl == *(other.impl); }
+bool SchemaClassKey::operator<(const SchemaClassKey& other) const { return *impl < *(other.impl); }
+
+SchemaObjectClass::SchemaObjectClass(const char* package, const char* name) : impl(new SchemaObjectClassImpl(package, name)) {}
SchemaObjectClass::SchemaObjectClass(SchemaObjectClassImpl* i) : impl(i) {}
+SchemaObjectClass::SchemaObjectClass(const SchemaObjectClass& from) : impl(new SchemaObjectClassImpl(*(from.impl))) {}
SchemaObjectClass::~SchemaObjectClass() { delete impl; }
-void SchemaObjectClass::addProperty(const SchemaProperty& property) { impl->addProperty(property); }
-void SchemaObjectClass::addStatistic(const SchemaStatistic& statistic) { impl->addStatistic(statistic); }
-void SchemaObjectClass::addMethod(const SchemaMethod& method) { impl->addMethod(method); }
+void SchemaObjectClass::addProperty(const SchemaProperty* property) { impl->addProperty(property); }
+void SchemaObjectClass::addStatistic(const SchemaStatistic* statistic) { impl->addStatistic(statistic); }
+void SchemaObjectClass::addMethod(const SchemaMethod* method) { impl->addMethod(method); }
const SchemaClassKey* SchemaObjectClass::getClassKey() const { return impl->getClassKey(); }
int SchemaObjectClass::getPropertyCount() const { return impl->getPropertyCount(); }
int SchemaObjectClass::getStatisticCount() const { return impl->getStatisticCount(); }
@@ -539,10 +598,12 @@ int SchemaObjectClass::getMethodCount() const { return impl->getMethodCount(); }
const SchemaProperty* SchemaObjectClass::getProperty(int idx) const { return impl->getProperty(idx); }
const SchemaStatistic* SchemaObjectClass::getStatistic(int idx) const { return impl->getStatistic(idx); }
const SchemaMethod* SchemaObjectClass::getMethod(int idx) const { return impl->getMethod(idx); }
-SchemaEventClass::SchemaEventClass(const char* package, const char* name) { impl = new SchemaEventClassImpl(this, package, name); }
+
+SchemaEventClass::SchemaEventClass(const char* package, const char* name) : impl(new SchemaEventClassImpl(package, name)) {}
SchemaEventClass::SchemaEventClass(SchemaEventClassImpl* i) : impl(i) {}
+SchemaEventClass::SchemaEventClass(const SchemaEventClass& from) : impl(new SchemaEventClassImpl(*(from.impl))) {}
SchemaEventClass::~SchemaEventClass() { delete impl; }
-void SchemaEventClass::addArgument(const SchemaArgument& argument) { impl->addArgument(argument); }
+void SchemaEventClass::addArgument(const SchemaArgument* argument) { impl->addArgument(argument); }
void SchemaEventClass::setDesc(const char* desc) { impl->setDesc(desc); }
const SchemaClassKey* SchemaEventClass::getClassKey() const { return impl->getClassKey(); }
int SchemaEventClass::getArgumentCount() const { return impl->getArgumentCount(); }
diff --git a/qpid/cpp/src/qmf/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h
index 035d99aecd..af3a1d98e4 100644
--- a/qpid/cpp/src/qmf/SchemaImpl.h
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h
@@ -1,5 +1,5 @@
-#ifndef _QmfSchemaImpl_
-#define _QmfSchemaImpl_
+#ifndef _QmfEngineSchemaImpl_
+#define _QmfEngineSchemaImpl_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,13 +20,13 @@
* under the License.
*/
-#include "qmf/Schema.h"
-#include <boost/shared_ptr.hpp>
+#include "qmf/engine/Schema.h"
#include <string>
#include <vector>
#include <qpid/framing/Buffer.h>
namespace qmf {
+namespace engine {
// TODO: Destructors for schema classes
// TODO: Add "frozen" attribute for schema classes so they can't be modified after
@@ -52,16 +52,15 @@ namespace qmf {
};
struct SchemaArgumentImpl {
- SchemaArgument* envelope;
std::string name;
Typecode typecode;
Direction dir;
std::string unit;
std::string description;
- SchemaArgumentImpl(SchemaArgument* e, const char* n, Typecode t) :
- envelope(e), name(n), typecode(t), dir(DIR_IN) {}
+ SchemaArgumentImpl(const char* n, Typecode t) : name(n), typecode(t), dir(DIR_IN) {}
SchemaArgumentImpl(qpid::framing::Buffer& buffer);
+ static SchemaArgument* factory(qpid::framing::Buffer& buffer);
void encode(qpid::framing::Buffer& buffer) const;
void setDirection(Direction d) { dir = d; }
void setUnit(const char* val) { unit = val; }
@@ -75,15 +74,15 @@ namespace qmf {
};
struct SchemaMethodImpl {
- SchemaMethod* envelope;
std::string name;
std::string description;
- std::vector<SchemaArgumentImpl*> arguments;
+ std::vector<const SchemaArgument*> arguments;
- SchemaMethodImpl(SchemaMethod* e, const char* n) : envelope(e), name(n) {}
+ SchemaMethodImpl(const char* n) : name(n) {}
SchemaMethodImpl(qpid::framing::Buffer& buffer);
+ static SchemaMethod* factory(qpid::framing::Buffer& buffer);
void encode(qpid::framing::Buffer& buffer) const;
- void addArgument(const SchemaArgument& argument);
+ void addArgument(const SchemaArgument* argument);
void setDesc(const char* desc) { description = desc; }
const std::string& getName() const { return name; }
const std::string& getDesc() const { return description; }
@@ -93,7 +92,6 @@ namespace qmf {
};
struct SchemaPropertyImpl {
- SchemaProperty* envelope;
std::string name;
Typecode typecode;
Access access;
@@ -102,10 +100,9 @@ namespace qmf {
std::string unit;
std::string description;
- SchemaPropertyImpl(SchemaProperty* e, const char* n, Typecode t) :
- envelope(e), name(n), typecode(t), access(ACCESS_READ_ONLY),
- index(false), optional(false) {}
+ SchemaPropertyImpl(const char* n, Typecode t) : name(n), typecode(t), access(ACCESS_READ_ONLY), index(false), optional(false) {}
SchemaPropertyImpl(qpid::framing::Buffer& buffer);
+ static SchemaProperty* factory(qpid::framing::Buffer& buffer);
void encode(qpid::framing::Buffer& buffer) const;
void setAccess(Access a) { access = a; }
void setIndex(bool val) { index = val; }
@@ -123,15 +120,14 @@ namespace qmf {
};
struct SchemaStatisticImpl {
- SchemaStatistic* envelope;
std::string name;
Typecode typecode;
std::string unit;
std::string description;
- SchemaStatisticImpl(SchemaStatistic* e, const char* n, Typecode t) :
- envelope(e), name(n), typecode(t) {}
+ SchemaStatisticImpl(const char* n, Typecode t) : name(n), typecode(t) {}
SchemaStatisticImpl(qpid::framing::Buffer& buffer);
+ static SchemaStatistic* factory(qpid::framing::Buffer& buffer);
void encode(qpid::framing::Buffer& buffer) const;
void setUnit(const char* val) { unit = val; }
void setDesc(const char* desc) { description = desc; }
@@ -143,10 +139,10 @@ namespace qmf {
};
struct SchemaClassKeyImpl {
- const SchemaClassKey* envelope;
const std::string& package;
const std::string& name;
const SchemaHash& hash;
+ mutable std::string repr;
// The *Container elements are only used if there isn't an external place to
// store these values.
@@ -156,6 +152,8 @@ namespace qmf {
SchemaClassKeyImpl(const std::string& package, const std::string& name, const SchemaHash& hash);
SchemaClassKeyImpl(qpid::framing::Buffer& buffer);
+ static SchemaClassKey* factory(const std::string& package, const std::string& name, const SchemaHash& hash);
+ static SchemaClassKey* factory(qpid::framing::Buffer& buffer);
const std::string& getPackageName() const { return package; }
const std::string& getClassName() const { return name; }
@@ -164,28 +162,28 @@ namespace qmf {
void encode(qpid::framing::Buffer& buffer) const;
bool operator==(const SchemaClassKeyImpl& other) const;
bool operator<(const SchemaClassKeyImpl& other) const;
- std::string str() const;
+ const std::string& str() const;
};
struct SchemaObjectClassImpl {
- typedef boost::shared_ptr<SchemaObjectClassImpl> Ptr;
- SchemaObjectClass* envelope;
std::string package;
std::string name;
mutable SchemaHash hash;
mutable bool hasHash;
- SchemaClassKeyImpl classKey;
- std::vector<SchemaPropertyImpl*> properties;
- std::vector<SchemaStatisticImpl*> statistics;
- std::vector<SchemaMethodImpl*> methods;
+ std::auto_ptr<SchemaClassKey> classKey;
+ std::vector<const SchemaProperty*> properties;
+ std::vector<const SchemaStatistic*> statistics;
+ std::vector<const SchemaMethod*> methods;
- SchemaObjectClassImpl(SchemaObjectClass* e, const char* p, const char* n) :
- envelope(e), package(p), name(n), hasHash(false), classKey(package, name, hash) {}
+ SchemaObjectClassImpl(const char* p, const char* n) :
+ package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {}
SchemaObjectClassImpl(qpid::framing::Buffer& buffer);
+ static SchemaObjectClass* factory(qpid::framing::Buffer& buffer);
+
void encode(qpid::framing::Buffer& buffer) const;
- void addProperty(const SchemaProperty& property);
- void addStatistic(const SchemaStatistic& statistic);
- void addMethod(const SchemaMethod& method);
+ void addProperty(const SchemaProperty* property);
+ void addStatistic(const SchemaStatistic* statistic);
+ void addMethod(const SchemaMethod* method);
const SchemaClassKey* getClassKey() const;
int getPropertyCount() const { return properties.size(); }
@@ -197,21 +195,21 @@ namespace qmf {
};
struct SchemaEventClassImpl {
- typedef boost::shared_ptr<SchemaEventClassImpl> Ptr;
- SchemaEventClass* envelope;
std::string package;
std::string name;
mutable SchemaHash hash;
mutable bool hasHash;
- SchemaClassKeyImpl classKey;
+ std::auto_ptr<SchemaClassKey> classKey;
std::string description;
- std::vector<SchemaArgumentImpl*> arguments;
+ std::vector<const SchemaArgument*> arguments;
- SchemaEventClassImpl(SchemaEventClass* e, const char* p, const char* n) :
- envelope(e), package(p), name(n), hasHash(false), classKey(package, name, hash) {}
+ SchemaEventClassImpl(const char* p, const char* n) :
+ package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {}
SchemaEventClassImpl(qpid::framing::Buffer& buffer);
+ static SchemaEventClass* factory(qpid::framing::Buffer& buffer);
+
void encode(qpid::framing::Buffer& buffer) const;
- void addArgument(const SchemaArgument& argument);
+ void addArgument(const SchemaArgument* argument);
void setDesc(const char* desc) { description = desc; }
const SchemaClassKey* getClassKey() const;
@@ -219,6 +217,7 @@ namespace qmf {
const SchemaArgument* getArgument(int idx) const;
};
}
+}
#endif
diff --git a/qpid/cpp/src/qmf/SequenceManager.cpp b/qpid/cpp/src/qmf/engine/SequenceManager.cpp
index 3171e66fac..4a4644a8b9 100644
--- a/qpid/cpp/src/qmf/SequenceManager.cpp
+++ b/qpid/cpp/src/qmf/engine/SequenceManager.cpp
@@ -17,10 +17,10 @@
* under the License.
*/
-#include "qmf/SequenceManager.h"
+#include "qmf/engine/SequenceManager.h"
using namespace std;
-using namespace qmf;
+using namespace qmf::engine;
using namespace qpid::sys;
SequenceManager::SequenceManager() : nextSequence(1) {}
@@ -68,14 +68,14 @@ void SequenceManager::releaseAll()
contextMap.clear();
}
-void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer)
+void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, const string& routingKey, qpid::framing::Buffer& buffer)
{
Mutex::ScopedLock _lock(lock);
bool done;
if (sequence == 0) {
if (unsolicitedContext.get() != 0) {
- done = unsolicitedContext->handleMessage(opcode, sequence, buffer);
+ done = unsolicitedContext->handleMessage(opcode, sequence, routingKey, buffer);
if (done)
unsolicitedContext->release();
}
@@ -85,7 +85,7 @@ void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, qpid::framing:
map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence);
if (iter != contextMap.end()) {
if (iter->second != 0) {
- done = iter->second->handleMessage(opcode, sequence, buffer);
+ done = iter->second->handleMessage(opcode, sequence, routingKey, buffer);
if (done) {
iter->second->release();
contextMap.erase(iter);
diff --git a/qpid/cpp/src/qmf/SequenceManager.h b/qpid/cpp/src/qmf/engine/SequenceManager.h
index bbfd0728a7..9e47e38610 100644
--- a/qpid/cpp/src/qmf/SequenceManager.h
+++ b/qpid/cpp/src/qmf/engine/SequenceManager.h
@@ -1,5 +1,5 @@
-#ifndef _QmfSequenceManager_
-#define _QmfSequenceManager_
+#ifndef _QmfEngineSequenceManager_
+#define _QmfEngineSequenceManager_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -31,6 +31,7 @@ namespace qpid {
}
namespace qmf {
+namespace engine {
class SequenceContext {
public:
@@ -39,7 +40,7 @@ namespace qmf {
virtual ~SequenceContext() {}
virtual void reserve() = 0;
- virtual bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer) = 0;
+ virtual bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer) = 0;
virtual void release() = 0;
};
@@ -51,7 +52,7 @@ namespace qmf {
uint32_t reserve(SequenceContext::Ptr ctx = SequenceContext::Ptr());
void release(uint32_t sequence);
void releaseAll();
- void dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer);
+ void dispatch(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
private:
mutable qpid::sys::Mutex lock;
@@ -61,6 +62,7 @@ namespace qmf {
};
}
+}
#endif
diff --git a/qpid/cpp/src/qmf/ValueImpl.cpp b/qpid/cpp/src/qmf/engine/ValueImpl.cpp
index f42c85eb33..f80bdab866 100644
--- a/qpid/cpp/src/qmf/ValueImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/ValueImpl.cpp
@@ -17,14 +17,14 @@
* under the License.
*/
-#include "qmf/ValueImpl.h"
+#include "qmf/engine/ValueImpl.h"
#include <qpid/framing/FieldTable.h>
using namespace std;
-using namespace qmf;
+using namespace qmf::engine;
using qpid::framing::Buffer;
-ValueImpl::ValueImpl(Typecode t, Buffer& buf) : envelope(new Value(this)), typecode(t)
+ValueImpl::ValueImpl(Typecode t, Buffer& buf) : typecode(t)
{
uint64_t first;
uint64_t second;
@@ -42,8 +42,8 @@ ValueImpl::ValueImpl(Typecode t, Buffer& buf) : envelope(new Value(this)), typec
case TYPE_BOOL : value.boolVal = (buf.getOctet() != 0); break;
case TYPE_FLOAT : value.floatVal = buf.getFloat(); break;
case TYPE_DOUBLE : value.doubleVal = buf.getDouble(); break;
- case TYPE_INT8 : value.s32 = (int32_t) buf.getOctet(); break;
- case TYPE_INT16 : value.s32 = (int32_t) buf.getShort(); break;
+ case TYPE_INT8 : value.s32 = (int32_t) ((int8_t) buf.getOctet()); break;
+ case TYPE_INT16 : value.s32 = (int32_t) ((int16_t) buf.getShort()); break;
case TYPE_INT32 : value.s32 = (int32_t) buf.getLong(); break;
case TYPE_INT64 : value.s64 = buf.getLongLong(); break;
case TYPE_UUID : buf.getBin128(value.uuidVal); break;
@@ -67,11 +67,27 @@ ValueImpl::ValueImpl(Typecode t, Buffer& buf) : envelope(new Value(this)), typec
}
}
-ValueImpl::ValueImpl(Typecode t) : envelope(new Value(this)), typecode(t)
+ValueImpl::ValueImpl(Typecode t, Typecode at) : typecode(t), valid(false), arrayTypecode(at)
+{
+}
+
+ValueImpl::ValueImpl(Typecode t) : typecode(t)
{
::memset(&value, 0, sizeof(value));
}
+Value* ValueImpl::factory(Typecode t, Buffer& b)
+{
+ ValueImpl* impl(new ValueImpl(t, b));
+ return new Value(impl);
+}
+
+Value* ValueImpl::factory(Typecode t)
+{
+ ValueImpl* impl(new ValueImpl(t));
+ return new Value(impl);
+}
+
ValueImpl::~ValueImpl()
{
}
@@ -113,9 +129,9 @@ bool ValueImpl::keyInMap(const char* key) const
Value* ValueImpl::byKey(const char* key)
{
if (keyInMap(key)) {
- map<std::string, VPtr>::iterator iter = mapVal.find(key);
+ map<std::string, Value>::iterator iter = mapVal.find(key);
if (iter != mapVal.end())
- return iter->second.get();
+ return &iter->second;
}
return 0;
}
@@ -123,9 +139,9 @@ Value* ValueImpl::byKey(const char* key)
const Value* ValueImpl::byKey(const char* key) const
{
if (keyInMap(key)) {
- map<std::string, VPtr>::const_iterator iter = mapVal.find(key);
+ map<std::string, Value>::const_iterator iter = mapVal.find(key);
if (iter != mapVal.end())
- return iter->second.get();
+ return &iter->second;
}
return 0;
}
@@ -137,12 +153,13 @@ void ValueImpl::deleteKey(const char* key)
void ValueImpl::insert(const char* key, Value* val)
{
- mapVal[key] = VPtr(val);
+ pair<string, Value> entry(key, *val);
+ mapVal.insert(entry);
}
const char* ValueImpl::key(uint32_t idx) const
{
- map<std::string, VPtr>::const_iterator iter = mapVal.begin();
+ map<std::string, Value>::const_iterator iter = mapVal.begin();
for (uint32_t i = 0; i < idx; i++) {
if (iter == mapVal.end())
break;
@@ -186,293 +203,64 @@ void ValueImpl::deleteArrayItem(uint32_t)
// Wrappers
//==================================================================
-Value::Value(Typecode t, Typecode at)
-{
- impl = new ValueImpl(this, t, at);
-}
-
-Value::Value(ValueImpl* i)
-{
- impl = i;
-}
-
-Value::~Value()
-{
- delete impl;
-}
-
-Typecode Value::getType() const
-{
- return impl->getType();
-}
-
-bool Value::isNull() const
-{
- return impl->isNull();
-}
-
-void Value::setNull()
-{
- impl->setNull();
-}
-
-bool Value::isObjectId() const
-{
- return impl->isObjectId();
-}
-
-const ObjectId& Value::asObjectId() const
-{
- return impl->asObjectId();
-}
-
-void Value::setObjectId(const ObjectId& oid)
-{
- impl->setObjectId(oid);
-}
-
-bool Value::isUint() const
-{
- return impl->isUint();
-}
-
-uint32_t Value::asUint() const
-{
- return impl->asUint();
-}
-
-void Value::setUint(uint32_t val)
-{
- impl->setUint(val);
-}
-
-bool Value::isInt() const
-{
- return impl->isInt();
-}
-
-int32_t Value::asInt() const
-{
- return impl->asInt();
-}
-
-void Value::setInt(int32_t val)
-{
- impl->setInt(val);
-}
-
-bool Value::isUint64() const
-{
- return impl->isUint64();
-}
-
-uint64_t Value::asUint64() const
-{
- return impl->asUint64();
-}
-
-void Value::setUint64(uint64_t val)
-{
- impl->setUint64(val);
-}
-
-bool Value::isInt64() const
-{
- return impl->isInt64();
-}
-
-int64_t Value::asInt64() const
-{
- return impl->asInt64();
-}
-
-void Value::setInt64(int64_t val)
-{
- impl->setInt64(val);
-}
-
-bool Value::isString() const
-{
- return impl->isString();
-}
-
-const char* Value::asString() const
-{
- return impl->asString();
-}
-
-void Value::setString(const char* val)
-{
- impl->setString(val);
-}
-
-bool Value::isBool() const
-{
- return impl->isBool();
-}
-
-bool Value::asBool() const
-{
- return impl->asBool();
-}
-
-void Value::setBool(bool val)
-{
- impl->setBool(val);
-}
-
-bool Value::isFloat() const
-{
- return impl->isFloat();
-}
-
-float Value::asFloat() const
-{
- return impl->asFloat();
-}
-
-void Value::setFloat(float val)
-{
- impl->setFloat(val);
-}
-
-bool Value::isDouble() const
-{
- return impl->isDouble();
-}
-
-double Value::asDouble() const
-{
- return impl->asDouble();
-}
-
-void Value::setDouble(double val)
-{
- impl->setDouble(val);
-}
-
-bool Value::isUuid() const
-{
- return impl->isUuid();
-}
-
-const uint8_t* Value::asUuid() const
-{
- return impl->asUuid();
-}
-
-void Value::setUuid(const uint8_t* val)
-{
- impl->setUuid(val);
-}
-
-bool Value::isObject() const
-{
- return impl->isObject();
-}
-
-Object* Value::asObject() const
-{
- return impl->asObject();
-}
-
-void Value::setObject(Object* val)
-{
- impl->setObject(val);
-}
-
-bool Value::isMap() const
-{
- return impl->isMap();
-}
-
-bool Value::keyInMap(const char* key) const
-{
- return impl->keyInMap(key);
-}
-
-Value* Value::byKey(const char* key)
-{
- return impl->byKey(key);
-}
-
-const Value* Value::byKey(const char* key) const
-{
- return impl->byKey(key);
-}
-
-void Value::deleteKey(const char* key)
-{
- impl->deleteKey(key);
-}
-
-void Value::insert(const char* key, Value* val)
-{
- impl->insert(key, val);
-}
-
-uint32_t Value::keyCount() const
-{
- return impl->keyCount();
-}
-
-const char* Value::key(uint32_t idx) const
-{
- return impl->key(idx);
-}
-
-bool Value::isList() const
-{
- return impl->isList();
-}
-
-uint32_t Value::listItemCount() const
-{
- return impl->listItemCount();
-}
-
-Value* Value::listItem(uint32_t idx)
-{
- return impl->listItem(idx);
-}
-
-void Value::appendToList(Value* val)
-{
- impl->appendToList(val);
-}
-
-void Value::deleteListItem(uint32_t idx)
-{
- impl->deleteListItem(idx);
-}
-
-bool Value::isArray() const
-{
- return impl->isArray();
-}
-
-Typecode Value::arrayType() const
-{
- return impl->arrayType();
-}
-
-uint32_t Value::arrayItemCount() const
-{
- return impl->arrayItemCount();
-}
-
-Value* Value::arrayItem(uint32_t idx)
-{
- return impl->arrayItem(idx);
-}
-
-void Value::appendToArray(Value* val)
-{
- impl->appendToArray(val);
-}
-
-void Value::deleteArrayItem(uint32_t idx)
-{
- impl->deleteArrayItem(idx);
-}
+Value::Value(const Value& from) : impl(new ValueImpl(*(from.impl))) {}
+Value::Value(Typecode t, Typecode at) : impl(new ValueImpl(t, at)) {}
+Value::Value(ValueImpl* i) : impl(i) {}
+Value::~Value() { delete impl;}
+
+Typecode Value::getType() const { return impl->getType(); }
+bool Value::isNull() const { return impl->isNull(); }
+void Value::setNull() { impl->setNull(); }
+bool Value::isObjectId() const { return impl->isObjectId(); }
+const ObjectId& Value::asObjectId() const { return impl->asObjectId(); }
+void Value::setObjectId(const ObjectId& oid) { impl->setObjectId(oid); }
+bool Value::isUint() const { return impl->isUint(); }
+uint32_t Value::asUint() const { return impl->asUint(); }
+void Value::setUint(uint32_t val) { impl->setUint(val); }
+bool Value::isInt() const { return impl->isInt(); }
+int32_t Value::asInt() const { return impl->asInt(); }
+void Value::setInt(int32_t val) { impl->setInt(val); }
+bool Value::isUint64() const { return impl->isUint64(); }
+uint64_t Value::asUint64() const { return impl->asUint64(); }
+void Value::setUint64(uint64_t val) { impl->setUint64(val); }
+bool Value::isInt64() const { return impl->isInt64(); }
+int64_t Value::asInt64() const { return impl->asInt64(); }
+void Value::setInt64(int64_t val) { impl->setInt64(val); }
+bool Value::isString() const { return impl->isString(); }
+const char* Value::asString() const { return impl->asString(); }
+void Value::setString(const char* val) { impl->setString(val); }
+bool Value::isBool() const { return impl->isBool(); }
+bool Value::asBool() const { return impl->asBool(); }
+void Value::setBool(bool val) { impl->setBool(val); }
+bool Value::isFloat() const { return impl->isFloat(); }
+float Value::asFloat() const { return impl->asFloat(); }
+void Value::setFloat(float val) { impl->setFloat(val); }
+bool Value::isDouble() const { return impl->isDouble(); }
+double Value::asDouble() const { return impl->asDouble(); }
+void Value::setDouble(double val) { impl->setDouble(val); }
+bool Value::isUuid() const { return impl->isUuid(); }
+const uint8_t* Value::asUuid() const { return impl->asUuid(); }
+void Value::setUuid(const uint8_t* val) { impl->setUuid(val); }
+bool Value::isObject() const { return impl->isObject(); }
+const Object* Value::asObject() const { return impl->asObject(); }
+void Value::setObject(Object* val) { impl->setObject(val); }
+bool Value::isMap() const { return impl->isMap(); }
+bool Value::keyInMap(const char* key) const { return impl->keyInMap(key); }
+Value* Value::byKey(const char* key) { return impl->byKey(key); }
+const Value* Value::byKey(const char* key) const { return impl->byKey(key); }
+void Value::deleteKey(const char* key) { impl->deleteKey(key); }
+void Value::insert(const char* key, Value* val) { impl->insert(key, val); }
+uint32_t Value::keyCount() const { return impl->keyCount(); }
+const char* Value::key(uint32_t idx) const { return impl->key(idx); }
+bool Value::isList() const { return impl->isList(); }
+uint32_t Value::listItemCount() const { return impl->listItemCount(); }
+Value* Value::listItem(uint32_t idx) { return impl->listItem(idx); }
+void Value::appendToList(Value* val) { impl->appendToList(val); }
+void Value::deleteListItem(uint32_t idx) { impl->deleteListItem(idx); }
+bool Value::isArray() const { return impl->isArray(); }
+Typecode Value::arrayType() const { return impl->arrayType(); }
+uint32_t Value::arrayItemCount() const { return impl->arrayItemCount(); }
+Value* Value::arrayItem(uint32_t idx) { return impl->arrayItem(idx); }
+void Value::appendToArray(Value* val) { impl->appendToArray(val); }
+void Value::deleteArrayItem(uint32_t idx) { impl->deleteArrayItem(idx); }
diff --git a/qpid/cpp/src/qmf/ValueImpl.h b/qpid/cpp/src/qmf/engine/ValueImpl.h
index cf33035bf7..b6adae5d93 100644
--- a/qpid/cpp/src/qmf/ValueImpl.h
+++ b/qpid/cpp/src/qmf/engine/ValueImpl.h
@@ -1,5 +1,5 @@
-#ifndef _QmfValueImpl_
-#define _QmfValueImpl_
+#ifndef _QmfEngineValueImpl_
+#define _QmfEngineValueImpl_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,9 +20,9 @@
* under the License.
*/
-#include <qmf/Value.h>
-#include <qmf/ObjectIdImpl.h>
-#include <qmf/Object.h>
+#include <qmf/engine/Value.h>
+#include <qmf/engine/ObjectIdImpl.h>
+#include <qmf/engine/Object.h>
#include <qpid/framing/Buffer.h>
#include <string>
#include <string.h>
@@ -31,22 +31,20 @@
#include <boost/shared_ptr.hpp>
namespace qmf {
+namespace engine {
// TODO: set valid flag on all value settors
// TODO: add a modified flag and accessors
struct ValueImpl {
- typedef boost::shared_ptr<Value> VPtr;
- typedef boost::shared_ptr<Object> OPtr;
- Value* envelope;
const Typecode typecode;
bool valid;
ObjectId refVal;
std::string stringVal;
- OPtr objectVal;
- std::map<std::string, VPtr> mapVal;
- std::vector<VPtr> vectorVal;
+ std::auto_ptr<Object> objectVal;
+ std::map<std::string, Value> mapVal;
+ std::vector<Value> vectorVal;
Typecode arrayTypecode;
union {
@@ -60,10 +58,17 @@ namespace qmf {
uint8_t uuidVal[16];
} value;
- ValueImpl(Value* e, Typecode t, Typecode at) :
- envelope(e), typecode(t), valid(false), arrayTypecode(at) {}
+ ValueImpl(const ValueImpl& from) :
+ typecode(from.typecode), valid(from.valid), refVal(from.refVal), stringVal(from.stringVal),
+ objectVal(from.objectVal.get() ? new Object(*(from.objectVal)) : 0),
+ mapVal(from.mapVal), vectorVal(from.vectorVal), arrayTypecode(from.arrayTypecode),
+ value(from.value) {}
+
+ ValueImpl(Typecode t, Typecode at);
ValueImpl(Typecode t, qpid::framing::Buffer& b);
ValueImpl(Typecode t);
+ static Value* factory(Typecode t, qpid::framing::Buffer& b);
+ static Value* factory(Typecode t);
~ValueImpl();
void encode(qpid::framing::Buffer& b) const;
@@ -139,6 +144,7 @@ namespace qmf {
void deleteArrayItem(uint32_t idx);
};
}
+}
#endif