diff options
Diffstat (limited to 'qpid/cpp/src/qmf')
-rw-r--r-- | qpid/cpp/src/qmf/AgentEngine.h | 207 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ConsoleEngine.cpp | 1091 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ConsoleEngine.h | 222 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Event.h | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Message.h | 39 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Object.h | 48 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ObjectId.h | 53 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Query.h | 104 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ResilientConnection.h | 163 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Schema.h | 171 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Typecode.h | 51 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Value.h | 113 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/Agent.cpp (renamed from qpid/cpp/src/qmf/AgentEngine.cpp) | 248 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp | 763 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/BrokerProxyImpl.h | 239 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp (renamed from qpid/cpp/src/qmf/ConnectionSettingsImpl.cpp) | 104 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h (renamed from qpid/cpp/src/qmf/ConnectionSettingsImpl.h) | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ConsoleImpl.cpp | 419 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ConsoleImpl.h | 145 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/MessageImpl.cpp (renamed from qpid/cpp/src/qmf/MessageImpl.cpp) | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/MessageImpl.h (renamed from qpid/cpp/src/qmf/MessageImpl.h) | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp (renamed from qpid/cpp/src/qmf/ObjectIdImpl.cpp) | 96 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ObjectIdImpl.h (renamed from qpid/cpp/src/qmf/ObjectIdImpl.h) | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ObjectImpl.cpp (renamed from qpid/cpp/src/qmf/ObjectImpl.cpp) | 56 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ObjectImpl.h (renamed from qpid/cpp/src/qmf/ObjectImpl.h) | 33 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/Protocol.cpp (renamed from qpid/cpp/src/qmf/Protocol.cpp) | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/Protocol.h (renamed from qpid/cpp/src/qmf/Protocol.h) | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/QueryImpl.cpp (renamed from qpid/cpp/src/qmf/QueryImpl.cpp) | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/QueryImpl.h (renamed from qpid/cpp/src/qmf/QueryImpl.h) | 34 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ResilientConnection.cpp (renamed from qpid/cpp/src/qmf/ResilientConnection.cpp) | 53 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/SchemaImpl.cpp (renamed from qpid/cpp/src/qmf/SchemaImpl.cpp) | 221 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/SchemaImpl.h (renamed from qpid/cpp/src/qmf/SchemaImpl.h) | 75 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/SequenceManager.cpp (renamed from qpid/cpp/src/qmf/SequenceManager.cpp) | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/SequenceManager.h (renamed from qpid/cpp/src/qmf/SequenceManager.h) | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ValueImpl.cpp (renamed from qpid/cpp/src/qmf/ValueImpl.cpp) | 390 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ValueImpl.h (renamed from qpid/cpp/src/qmf/ValueImpl.h) | 32 |
36 files changed, 2210 insertions, 3083 deletions
diff --git a/qpid/cpp/src/qmf/AgentEngine.h b/qpid/cpp/src/qmf/AgentEngine.h deleted file mode 100644 index c88ef33657..0000000000 --- a/qpid/cpp/src/qmf/AgentEngine.h +++ /dev/null @@ -1,207 +0,0 @@ -#ifndef _QmfAgentEngine_ -#define _QmfAgentEngine_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/Schema.h> -#include <qmf/ObjectId.h> -#include <qmf/Object.h> -#include <qmf/Event.h> -#include <qmf/Query.h> -#include <qmf/Value.h> -#include <qmf/Message.h> - -namespace qmf { - - /** - * AgentEvent - * - * This structure represents a QMF event coming from the agent to - * the application. - */ - struct AgentEvent { - enum EventKind { - GET_QUERY = 1, - START_SYNC = 2, - END_SYNC = 3, - METHOD_CALL = 4, - DECLARE_QUEUE = 5, - DELETE_QUEUE = 6, - BIND = 7, - UNBIND = 8, - SETUP_COMPLETE = 9 - }; - - EventKind kind; - uint32_t sequence; // Protocol sequence (for all kinds) - char* authUserId; // Authenticated user ID (for all kinds) - char* authToken; // Authentication token if issued (for all kinds) - char* name; // Name of the method/sync query - // (METHOD_CALL, START_SYNC, END_SYNC, DECLARE_QUEUE, BIND, UNBIND) - Object* object; // Object involved in method call (METHOD_CALL) - ObjectId* objectId; // ObjectId for method call (METHOD_CALL) - Query* query; // Query parameters (GET_QUERY, START_SYNC) - Value* arguments; // Method parameters (METHOD_CALL) - char* exchange; // Exchange for bind (BIND, UNBIND) - char* bindingKey; // Key for bind (BIND, UNBIND) - SchemaObjectClass* objectClass; // (METHOD_CALL) - }; - - class AgentEngineImpl; - - /** - * AgentEngine - Protocol engine for the QMF agent - */ - class AgentEngine { - public: - AgentEngine(char* label, bool internalStore=true); - ~AgentEngine(); - - /** - * Configure the directory path for storing persistent data. - *@param path Null-terminated string containing a directory path where files can be - * created, written, and read. If NULL, no persistent storage will be - * attempted. - */ - void setStoreDir(const char* path); - - /** - * Configure the directory path for files transferred over QMF. - *@param path Null-terminated string containing a directory path where files can be - * created, deleted, written, and read. If NULL, file transfers shall not - * be permitted. - */ - void setTransferDir(const char* path); - - /** - * Pass messages received from the AMQP session to the Agent engine. - *@param message AMQP messages received on the agent session. - */ - void handleRcvMessage(Message& message); - - /** - * Get the next message to be sent to the AMQP network. - *@param item The Message structure describing the message to be produced. - *@return true if the Message is valid, false if there are no messages to send. - */ - bool getXmtMessage(Message& item) const; - - /** - * Remove and discard one message from the head of the transmit queue. - */ - void popXmt(); - - /** - * Get the next application event from the agent engine. - *@param event The event iff the return value is true - *@return true if event is valid, false if there are no events to process - */ - bool getEvent(AgentEvent& event) const; - - /** - * Remove and discard one event from the head of the event queue. - */ - void popEvent(); - - /** - * A new AMQP session has been established for Agent communication. - */ - void newSession(); - - /** - * Start the QMF Agent protocol. This should be invoked after a SETUP_COMPLETE event - * is received from the Agent engine. - */ - void startProtocol(); - - /** - * This method is called periodically so the agent can supply a heartbeat. - */ - void heartbeat(); - - /** - * Respond to a method request. - *@param sequence The sequence number from the method request event. - *@param status The method's completion status. - *@param text Status text ("OK" or an error message) - *@param arguments The list of output arguments from the method call. - */ - void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments); - - /** - * Send a content indication to the QMF bus. This is only needed for objects that are - * managed by the application. This is *NOT* needed for objects managed by the Agent - * (inserted using addObject). - *@param sequence The sequence number of the GET request or the SYNC_START request. - *@param object The object (annotated with "changed" flags) for publication. - *@param prop If true, changed object properties are transmitted. - *@param stat If true, changed object statistics are transmitted. - */ - void queryResponse(uint32_t sequence, Object& object, bool prop = true, bool stat = true); - - /** - * Indicate the completion of a query. This is not used for SYNC_START requests. - *@param sequence The sequence number of the GET request. - */ - void queryComplete(uint32_t sequence); - - /** - * Register a schema class with the Agent. - *@param cls A SchemaObejctClass object that defines data managed by the agent. - */ - void registerClass(SchemaObjectClass* cls); - - /** - * Register a schema class with the Agent. - *@param cls A SchemaEventClass object that defines events sent by the agent. - */ - void registerClass(SchemaEventClass* cls); - - /** - * Give an object to the Agent for storage and management. Once added, the agent takes - * responsibility for the life cycle of the object. - *@param obj The object to be managed by the Agent. - *@param persistId A unique non-zero value if the object-id is to be persistent. - *@return The objectId of the managed object. - */ - const ObjectId* addObject(Object& obj, uint64_t persistId); - // const ObjectId* addObject(Object& obj, uint32_t persistIdLo, uint32_t persistIdHi); - - /** - * Allocate an object-id for an object that will be managed by the application. - *@param persistId A unique non-zero value if the object-id is to be persistent. - *@return The objectId structure for the allocated ID. - */ - const ObjectId* allocObjectId(uint64_t persistId); - const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi); - - /** - * Raise an event into the QMF network.. - *@param event The event object for the event to be raised. - */ - void raiseEvent(Event& event); - - private: - AgentEngineImpl* impl; - }; -} - -#endif - diff --git a/qpid/cpp/src/qmf/ConsoleEngine.cpp b/qpid/cpp/src/qmf/ConsoleEngine.cpp deleted file mode 100644 index e7991328ee..0000000000 --- a/qpid/cpp/src/qmf/ConsoleEngine.cpp +++ /dev/null @@ -1,1091 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/ConsoleEngine.h" -#include "qmf/MessageImpl.h" -#include "qmf/SchemaImpl.h" -#include "qmf/Typecode.h" -#include "qmf/ObjectImpl.h" -#include "qmf/ObjectIdImpl.h" -#include "qmf/QueryImpl.h" -#include "qmf/ValueImpl.h" -#include "qmf/Protocol.h" -#include "qmf/SequenceManager.h" -#include <qpid/framing/Buffer.h> -#include <qpid/framing/Uuid.h> -#include <qpid/framing/FieldTable.h> -#include <qpid/framing/FieldValue.h> -#include <qpid/sys/Mutex.h> -#include <qpid/log/Statement.h> -#include <qpid/sys/Time.h> -#include <qpid/sys/SystemInfo.h> -#include <string.h> -#include <string> -#include <deque> -#include <map> -#include <vector> -#include <iostream> -#include <fstream> -#include <boost/shared_ptr.hpp> - -using namespace std; -using namespace qmf; -using namespace qpid::framing; -using namespace qpid::sys; - -namespace qmf { - - struct MethodResponseImpl { - typedef boost::shared_ptr<MethodResponseImpl> Ptr; - MethodResponse* envelope; - uint32_t status; - auto_ptr<Value> exception; - auto_ptr<Value> arguments; - - MethodResponseImpl(Buffer& buf); - ~MethodResponseImpl() { delete envelope; } - uint32_t getStatus() const { return status; } - const Value* getException() const { return exception.get(); } - const Value* getArgs() const { return arguments.get(); } - }; - - struct QueryResponseImpl { - typedef boost::shared_ptr<QueryResponseImpl> Ptr; - QueryResponse *envelope; - uint32_t status; - auto_ptr<Value> exception; - vector<ObjectImpl::Ptr> results; - - QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {} - ~QueryResponseImpl() { delete envelope; } - uint32_t getStatus() const { return status; } - const Value* getException() const { return exception.get(); } - uint32_t getObjectCount() const { return results.size(); } - const Object* getObject(uint32_t idx) const; - }; - - struct ConsoleEventImpl { - typedef boost::shared_ptr<ConsoleEventImpl> Ptr; - ConsoleEvent::EventKind kind; - boost::shared_ptr<AgentProxyImpl> agent; - string name; - boost::shared_ptr<SchemaClassKey> classKey; - Object* object; - void* context; - Event* event; - uint64_t timestamp; - uint32_t methodHandle; - MethodResponseImpl::Ptr methodResponse; - - ConsoleEventImpl(ConsoleEvent::EventKind k) : - kind(k), object(0), context(0), event(0), timestamp(0), methodHandle(0) {} - ~ConsoleEventImpl() {} - ConsoleEvent copy(); - }; - - struct BrokerEventImpl { - typedef boost::shared_ptr<BrokerEventImpl> Ptr; - BrokerEvent::EventKind kind; - string name; - string exchange; - string bindingKey; - void* context; - QueryResponseImpl::Ptr queryResponse; - - BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {} - ~BrokerEventImpl() {} - BrokerEvent copy(); - }; - - struct AgentProxyImpl { - typedef boost::shared_ptr<AgentProxyImpl> Ptr; - AgentProxy* envelope; - ConsoleEngineImpl* console; - BrokerProxyImpl* broker; - uint32_t agentBank; - string label; - - AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const string& l) : - envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {} - ~AgentProxyImpl() {} - const string& getLabel() const { return label; } - }; - - class BrokerProxyImpl { - public: - typedef boost::shared_ptr<BrokerProxyImpl> Ptr; - - BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console); - ~BrokerProxyImpl() {} - - void sessionOpened(SessionHandle& sh); - void sessionClosed(); - void startProtocol(); - - void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); - void handleRcvMessage(Message& message); - bool getXmtMessage(Message& item) const; - void popXmt(); - - bool getEvent(BrokerEvent& event) const; - void popEvent(); - - uint32_t agentCount() const; - const AgentProxy* getAgent(uint32_t idx) const; - void sendQuery(const Query& query, void* context, const AgentProxy* agent); - void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent); - - void addBinding(const string& exchange, const string& key); - void staticRelease() { decOutstanding(); } - - private: - friend class StaticContext; - friend class QueryContext; - mutable Mutex lock; - BrokerProxy* envelope; - ConsoleEngineImpl* console; - string queueName; - Uuid brokerId; - SequenceManager seqMgr; - uint32_t requestsOutstanding; - bool topicBound; - vector<AgentProxyImpl::Ptr> agentList; - deque<MessageImpl::Ptr> xmtQueue; - deque<BrokerEventImpl::Ptr> eventQueue; - -# define MA_BUFFER_SIZE 65536 - char outputBuffer[MA_BUFFER_SIZE]; - - BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName); - BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); - BrokerEventImpl::Ptr eventSetupComplete(); - BrokerEventImpl::Ptr eventStable(); - BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response); - - void handleBrokerResponse(Buffer& inBuffer, uint32_t seq); - void handlePackageIndication(Buffer& inBuffer, uint32_t seq); - void handleCommandComplete(Buffer& inBuffer, uint32_t seq); - void handleClassIndication(Buffer& inBuffer, uint32_t seq); - void handleMethodResponse(Buffer& inBuffer, uint32_t seq); - void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq); - void handleEventIndication(Buffer& inBuffer, uint32_t seq); - void handleSchemaResponse(Buffer& inBuffer, uint32_t seq); - ObjectImpl::Ptr handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat); - void incOutstandingLH(); - void decOutstanding(); - }; - - struct StaticContext : public SequenceContext { - StaticContext(BrokerProxyImpl& b) : broker(b) {} - ~StaticContext() {} - void reserve() {} - void release() { broker.staticRelease(); } - bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer); - BrokerProxyImpl& broker; - }; - - struct QueryContext : public SequenceContext { - QueryContext(BrokerProxyImpl& b, void* u) : - broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {} - ~QueryContext() {} - void reserve(); - void release(); - bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer); - - mutable Mutex lock; - BrokerProxyImpl& broker; - void* userContext; - uint32_t requestsOutstanding; - QueryResponseImpl::Ptr queryResponse; - }; - - class ConsoleEngineImpl { - public: - ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings()); - ~ConsoleEngineImpl(); - - bool getEvent(ConsoleEvent& event) const; - void popEvent(); - - void addConnection(BrokerProxy& broker, void* context); - void delConnection(BrokerProxy& broker); - - uint32_t packageCount() const; - const string& getPackageName(uint32_t idx) const; - - uint32_t classCount(const char* packageName) const; - const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const; - - ClassKind getClassKind(const SchemaClassKey* key) const; - const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const; - const SchemaEventClass* getEventClass(const SchemaClassKey* key) const; - - void bindPackage(const char* packageName); - void bindClass(const SchemaClassKey* key); - void bindClass(const char* packageName, const char* className); - - /* - void startSync(const Query& query, void* context, SyncQuery& sync); - void touchSync(SyncQuery& sync); - void endSync(SyncQuery& sync); - */ - - private: - friend class BrokerProxyImpl; - ConsoleEngine* envelope; - const ConsoleSettings& settings; - mutable Mutex lock; - deque<ConsoleEventImpl::Ptr> eventQueue; - vector<BrokerProxyImpl*> brokerList; - vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE) - - // Declare a compare class for the class maps that compares the dereferenced - // class key pointers. The default behavior would be to compare the pointer - // addresses themselves. - struct KeyCompare { - bool operator()(const SchemaClassKeyImpl* left, const SchemaClassKeyImpl* right) const { - return *left < *right; - } - }; - - typedef map<const SchemaClassKeyImpl*, SchemaObjectClassImpl::Ptr, KeyCompare> ObjectClassList; - typedef map<const SchemaClassKeyImpl*, SchemaEventClassImpl::Ptr, KeyCompare> EventClassList; - typedef map<string, pair<ObjectClassList, EventClassList> > PackageList; - - PackageList packages; - - void learnPackage(const string& packageName); - void learnClass(SchemaObjectClassImpl::Ptr cls); - void learnClass(SchemaEventClassImpl::Ptr cls); - bool haveClass(const SchemaClassKeyImpl& key) const; - SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const; - }; -} - -namespace { - const char* QMF_EXCHANGE = "qpid.management"; - const char* DIR_EXCHANGE = "amq.direct"; - const char* BROKER_KEY = "broker"; - const char* BROKER_PACKAGE = "org.apache.qpid.broker"; - const char* AGENT_CLASS = "agent"; - const char* BROKER_AGENT_KEY = "agent.1.0"; -} - -const Object* QueryResponseImpl::getObject(uint32_t idx) const -{ - vector<ObjectImpl::Ptr>::const_iterator iter = results.begin(); - - while (idx > 0) { - if (iter == results.end()) - return 0; - iter++; - idx--; - } - - return (*iter)->envelope; -} - -#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} - -ConsoleEvent ConsoleEventImpl::copy() -{ - ConsoleEvent item; - - ::memset(&item, 0, sizeof(ConsoleEvent)); - item.kind = kind; - item.agent = agent.get() ? agent->envelope : 0; - item.classKey = classKey.get(); - item.object = object; - item.context = context; - item.event = event; - item.timestamp = timestamp; - item.methodHandle = methodHandle; - item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0; - - STRING_REF(name); - - return item; -} - -BrokerEvent BrokerEventImpl::copy() -{ - BrokerEvent item; - - ::memset(&item, 0, sizeof(BrokerEvent)); - item.kind = kind; - - STRING_REF(name); - STRING_REF(exchange); - STRING_REF(bindingKey); - item.context = context; - item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0; - - return item; -} - -BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) : - envelope(e), console(_console.impl) -{ - stringstream qn; - qpid::TcpAddress addr; - - SystemInfo::getLocalHostname(addr); - qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId(); - queueName = qn.str(); - - seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this))); -} - -void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/) -{ - Mutex::ScopedLock _lock(lock); - agentList.clear(); - eventQueue.clear(); - xmtQueue.clear(); - eventQueue.push_back(eventDeclareQueue(queueName)); - eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName)); - eventQueue.push_back(eventSetupComplete()); - - // TODO: Store session handle -} - -void BrokerProxyImpl::sessionClosed() -{ - Mutex::ScopedLock _lock(lock); - agentList.clear(); - eventQueue.clear(); - xmtQueue.clear(); -} - -void BrokerProxyImpl::startProtocol() -{ - Mutex::ScopedLock _lock(lock); - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker"))); - - requestsOutstanding = 1; - topicBound = false; - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence); -} - -void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) -{ - uint32_t length = buf.getPosition(); - MessageImpl::Ptr message(new MessageImpl); - - buf.reset(); - buf.getRawData(message->body, length); - message->destination = destination; - message->routingKey = routingKey; - message->replyExchange = DIR_EXCHANGE; - message->replyKey = queueName; - - xmtQueue.push_back(message); -} - -void BrokerProxyImpl::handleRcvMessage(Message& message) -{ - Buffer inBuffer(message.body, message.length); - uint8_t opcode; - uint32_t sequence; - - while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) - seqMgr.dispatch(opcode, sequence, inBuffer); -} - -bool BrokerProxyImpl::getXmtMessage(Message& item) const -{ - Mutex::ScopedLock _lock(lock); - if (xmtQueue.empty()) - return false; - item = xmtQueue.front()->copy(); - return true; -} - -void BrokerProxyImpl::popXmt() -{ - Mutex::ScopedLock _lock(lock); - if (!xmtQueue.empty()) - xmtQueue.pop_front(); -} - -bool BrokerProxyImpl::getEvent(BrokerEvent& event) const -{ - Mutex::ScopedLock _lock(lock); - if (eventQueue.empty()) - return false; - event = eventQueue.front()->copy(); - return true; -} - -void BrokerProxyImpl::popEvent() -{ - Mutex::ScopedLock _lock(lock); - if (!eventQueue.empty()) - eventQueue.pop_front(); -} - -uint32_t BrokerProxyImpl::agentCount() const -{ - Mutex::ScopedLock _lock(lock); - return agentList.size(); -} - -const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const -{ - Mutex::ScopedLock _lock(lock); - for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin(); - iter != agentList.end(); iter++) - if (idx-- == 0) - return (*iter)->envelope; - return 0; -} - -void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent) -{ - SequenceContext::Ptr queryContext(new QueryContext(*this, context)); - Mutex::ScopedLock _lock(lock); - if (agent != 0) { - sendGetRequestLH(queryContext, query, agent->impl); - } else { - // TODO (optimization) only send queries to agents that have the requested class+package - for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin(); - iter != agentList.end(); iter++) { - sendGetRequestLH(queryContext, query, (*iter).get()); - } - } -} - -void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent) -{ - stringstream key; - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve(queryContext)); - - Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); - query.impl->encode(outBuffer); - key << "agent.1." << agent->agentBank; - sendBufferLH(outBuffer, QMF_EXCHANGE, key.str()); - QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str()); -} - -void BrokerProxyImpl::addBinding(const string& exchange, const string& key) -{ - eventQueue.push_back(eventBind(exchange, queueName, key)); -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName) -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE)); - event->name = queueName; - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key) -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND)); - event->name = queue; - event->exchange = exchange; - event->bindingKey = key; - - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete() -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE)); - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventStable() -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE)); - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response) -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE)); - event->context = context; - event->queryResponse = response; - return event; -} - -void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq) -{ - brokerId.decode(inBuffer); - QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId); - Mutex::ScopedLock _lock(lock); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - incOutstandingLH(); - Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT PackageRequest seq=" << sequence); -} - -void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq) -{ - string package; - - inBuffer.getShortString(package); - QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package); - console->learnPackage(package); - - Mutex::ScopedLock _lock(lock); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - incOutstandingLH(); - Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence); - outBuffer.putShortString(package); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package); -} - -void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq) -{ - string text; - uint32_t code = inBuffer.getLong(); - inBuffer.getShortString(text); - QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text); -} - -void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq) -{ - uint8_t kind = inBuffer.getOctet(); - SchemaClassKeyImpl classKey(inBuffer); - - QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str()); - - if (!console->haveClass(classKey)) { - Mutex::ScopedLock _lock(lock); - incOutstandingLH(); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence); - classKey.encode(outBuffer); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str()); - } -} - -void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/) -{ - // TODO -} - -void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) -{ - // TODO -} - -void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) -{ - // TODO -} - -void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) -{ - SchemaObjectClassImpl::Ptr oClassPtr; - SchemaEventClassImpl::Ptr eClassPtr; - uint8_t kind = inBuffer.getOctet(); - const SchemaClassKeyImpl* key; - if (kind == CLASS_OBJECT) { - oClassPtr.reset(new SchemaObjectClassImpl(inBuffer)); - console->learnClass(oClassPtr); - key = oClassPtr->getClassKey()->impl; - QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str()); - - // - // If we have just learned about the org.apache.qpid.broker:agent class, send a get - // request for the current list of agents so we can have it on-hand before we declare - // this session "stable". - // - if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) { - Mutex::ScopedLock _lock(lock); - incOutstandingLH(); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); - FieldTable ft; - ft.setString("_class", AGENT_CLASS); - ft.setString("_package", BROKER_PACKAGE); - ft.encode(outBuffer); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY); - QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY); - } - } else if (kind == CLASS_EVENT) { - eClassPtr.reset(new SchemaEventClassImpl(inBuffer)); - console->learnClass(eClassPtr); - key = eClassPtr->getClassKey()->impl; - QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str()); - } - else { - QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind); - } -} - -ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat) -{ - SchemaClassKeyImpl classKey(inBuffer); - QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str()); - - SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey); - if (schema.get() == 0) { - QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str()); - return ObjectImpl::Ptr(); - } - - return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, inBuffer, prop, stat, true)); -} - -void BrokerProxyImpl::incOutstandingLH() -{ - requestsOutstanding++; -} - -void BrokerProxyImpl::decOutstanding() -{ - Mutex::ScopedLock _lock(lock); - requestsOutstanding--; - if (requestsOutstanding == 0 && !topicBound) { - topicBound = true; - for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin(); - iter != console->bindingList.end(); iter++) { - string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first); - string key(iter->second); - eventQueue.push_back(eventBind(exchange, queueName, key)); - } - eventQueue.push_back(eventStable()); - } -} - -MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this)) -{ - string text; - - status = buf.getLong(); - buf.getMediumString(text); - exception.reset(new Value(TYPE_LSTR)); - exception->setString(text.c_str()); - - // TODO: Parse schema-specific output arguments. - arguments.reset(new Value(TYPE_MAP)); -} - -bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) -{ - bool completeContext = false; - if (opcode == Protocol::OP_BROKER_RESPONSE) { - broker.handleBrokerResponse(buffer, sequence); - completeContext = true; - } - else if (opcode == Protocol::OP_COMMAND_COMPLETE) { - broker.handleCommandComplete(buffer, sequence); - completeContext = true; - } - else if (opcode == Protocol::OP_SCHEMA_RESPONSE) { - broker.handleSchemaResponse(buffer, sequence); - completeContext = true; - } - else if (opcode == Protocol::OP_PACKAGE_INDICATION) - broker.handlePackageIndication(buffer, sequence); - else if (opcode == Protocol::OP_CLASS_INDICATION) - broker.handleClassIndication(buffer, sequence); - else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) - broker.handleHeartbeatIndication(buffer, sequence); - else if (opcode == Protocol::OP_EVENT_INDICATION) - broker.handleEventIndication(buffer, sequence); - else if (opcode == Protocol::OP_PROPERTY_INDICATION) - broker.handleObjectIndication(buffer, sequence, true, false); - else if (opcode == Protocol::OP_STATISTIC_INDICATION) - broker.handleObjectIndication(buffer, sequence, false, true); - else if (opcode == Protocol::OP_OBJECT_INDICATION) - broker.handleObjectIndication(buffer, sequence, true, true); - else { - QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode); - completeContext = true; - } - - return completeContext; -} - -void QueryContext::reserve() -{ - Mutex::ScopedLock _lock(lock); - requestsOutstanding++; -} - -void QueryContext::release() -{ - Mutex::ScopedLock _lock(lock); - if (--requestsOutstanding == 0) { - broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse)); - } -} - -bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) -{ - bool completeContext = false; - ObjectImpl::Ptr object; - - if (opcode == Protocol::OP_COMMAND_COMPLETE) { - broker.handleCommandComplete(buffer, sequence); - completeContext = true; - } - else if (opcode == Protocol::OP_OBJECT_INDICATION) { - object = broker.handleObjectIndication(buffer, sequence, true, true); - if (object.get() != 0) - queryResponse->results.push_back(object); - } - else { - QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode); - completeContext = true; - } - - return completeContext; -} - -ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) : - envelope(e), settings(s) -{ - bindingList.push_back(pair<string, string>(string(), "schema.#")); - if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) { - bindingList.push_back(pair<string, string>(string(), "console.#")); - } else { - if (settings.rcvObjects && !settings.userBindings) - bindingList.push_back(pair<string, string>(string(), "console.obj.#")); - else - bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent")); - if (settings.rcvEvents) - bindingList.push_back(pair<string, string>(string(), "console.event.#")); - if (settings.rcvHeartbeats) - bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#")); - } -} - -ConsoleEngineImpl::~ConsoleEngineImpl() -{ - // This function intentionally left blank. -} - -bool ConsoleEngineImpl::getEvent(ConsoleEvent& event) const -{ - Mutex::ScopedLock _lock(lock); - if (eventQueue.empty()) - return false; - event = eventQueue.front()->copy(); - return true; -} - -void ConsoleEngineImpl::popEvent() -{ - Mutex::ScopedLock _lock(lock); - if (!eventQueue.empty()) - eventQueue.pop_front(); -} - -void ConsoleEngineImpl::addConnection(BrokerProxy& broker, void* /*context*/) -{ - Mutex::ScopedLock _lock(lock); - brokerList.push_back(broker.impl); -} - -void ConsoleEngineImpl::delConnection(BrokerProxy& broker) -{ - Mutex::ScopedLock _lock(lock); - for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); - iter != brokerList.end(); iter++) - if (*iter == broker.impl) { - brokerList.erase(iter); - break; - } -} - -uint32_t ConsoleEngineImpl::packageCount() const -{ - Mutex::ScopedLock _lock(lock); - return packages.size(); -} - -const string& ConsoleEngineImpl::getPackageName(uint32_t idx) const -{ - const static string empty; - - Mutex::ScopedLock _lock(lock); - if (idx >= packages.size()) - return empty; - - PackageList::const_iterator iter = packages.begin(); - for (uint32_t i = 0; i < idx; i++) iter++; - return iter->first; -} - -uint32_t ConsoleEngineImpl::classCount(const char* packageName) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(packageName); - if (pIter == packages.end()) - return 0; - - const ObjectClassList& oList = pIter->second.first; - const EventClassList& eList = pIter->second.second; - - return oList.size() + eList.size(); -} - -const SchemaClassKey* ConsoleEngineImpl::getClass(const char* packageName, uint32_t idx) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(packageName); - if (pIter == packages.end()) - return 0; - - const ObjectClassList& oList = pIter->second.first; - const EventClassList& eList = pIter->second.second; - uint32_t count = 0; - - for (ObjectClassList::const_iterator oIter = oList.begin(); - oIter != oList.end(); oIter++) { - if (count == idx) - return oIter->second->getClassKey(); - count++; - } - - for (EventClassList::const_iterator eIter = eList.begin(); - eIter != eList.end(); eIter++) { - if (count == idx) - return eIter->second->getClassKey(); - count++; - } - - return 0; -} - -ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey* key) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(key->getPackageName()); - if (pIter == packages.end()) - return CLASS_OBJECT; - - const EventClassList& eList = pIter->second.second; - if (eList.find(key->impl) != eList.end()) - return CLASS_EVENT; - return CLASS_OBJECT; -} - -const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey* key) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(key->getPackageName()); - if (pIter == packages.end()) - return 0; - - const ObjectClassList& oList = pIter->second.first; - ObjectClassList::const_iterator iter = oList.find(key->impl); - if (iter == oList.end()) - return 0; - return iter->second->envelope; -} - -const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey* key) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(key->getPackageName()); - if (pIter == packages.end()) - return 0; - - const EventClassList& eList = pIter->second.second; - EventClassList::const_iterator iter = eList.find(key->impl); - if (iter == eList.end()) - return 0; - return iter->second->envelope; -} - -void ConsoleEngineImpl::bindPackage(const char* packageName) -{ - stringstream key; - key << "console.obj.*.*." << packageName << ".#"; - Mutex::ScopedLock _lock(lock); - bindingList.push_back(pair<string, string>(string(), key.str())); - for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); - iter != brokerList.end(); iter++) - (*iter)->addBinding(QMF_EXCHANGE, key.str()); -} - -void ConsoleEngineImpl::bindClass(const SchemaClassKey* classKey) -{ - stringstream key; - key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#"; - Mutex::ScopedLock _lock(lock); - bindingList.push_back(pair<string, string>(string(), key.str())); - for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); - iter != brokerList.end(); iter++) - (*iter)->addBinding(QMF_EXCHANGE, key.str()); -} - -void ConsoleEngineImpl::bindClass(const char* packageName, const char* className) -{ - stringstream key; - key << "console.obj.*.*." << packageName << "." << className << ".#"; - Mutex::ScopedLock _lock(lock); - bindingList.push_back(pair<string, string>(string(), key.str())); - for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); - iter != brokerList.end(); iter++) - (*iter)->addBinding(QMF_EXCHANGE, key.str()); -} - -/* -void ConsoleEngineImpl::startSync(const Query& query, void* context, SyncQuery& sync) -{ -} - -void ConsoleEngineImpl::touchSync(SyncQuery& sync) -{ -} - -void ConsoleEngineImpl::endSync(SyncQuery& sync) -{ -} -*/ - -void ConsoleEngineImpl::learnPackage(const string& packageName) -{ - Mutex::ScopedLock _lock(lock); - if (packages.find(packageName) == packages.end()) - packages.insert(pair<string, pair<ObjectClassList, EventClassList> > - (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList()))); -} - -void ConsoleEngineImpl::learnClass(SchemaObjectClassImpl::Ptr cls) -{ - Mutex::ScopedLock _lock(lock); - const SchemaClassKey* key = cls->getClassKey(); - PackageList::iterator pIter = packages.find(key->getPackageName()); - if (pIter == packages.end()) - return; - - ObjectClassList& list = pIter->second.first; - if (list.find(key->impl) == list.end()) - list[key->impl] = cls; -} - -void ConsoleEngineImpl::learnClass(SchemaEventClassImpl::Ptr cls) -{ - Mutex::ScopedLock _lock(lock); - const SchemaClassKey* key = cls->getClassKey(); - PackageList::iterator pIter = packages.find(key->getPackageName()); - if (pIter == packages.end()) - return; - - EventClassList& list = pIter->second.second; - if (list.find(key->impl) == list.end()) - list[key->impl] = cls; -} - -bool ConsoleEngineImpl::haveClass(const SchemaClassKeyImpl& key) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(key.getPackageName()); - if (pIter == packages.end()) - return false; - - const ObjectClassList& oList = pIter->second.first; - const EventClassList& eList = pIter->second.second; - - return oList.find(&key) != oList.end() || eList.find(&key) != eList.end(); -} - -SchemaObjectClassImpl::Ptr ConsoleEngineImpl::getSchema(const SchemaClassKeyImpl& key) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(key.getPackageName()); - if (pIter == packages.end()) - return SchemaObjectClassImpl::Ptr(); - - const ObjectClassList& oList = pIter->second.first; - ObjectClassList::const_iterator iter = oList.find(&key); - if (iter == oList.end()) - return SchemaObjectClassImpl::Ptr(); - - return iter->second; -} - -//================================================================== -// Wrappers -//================================================================== - -AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {} -AgentProxy::~AgentProxy() { delete impl; } -const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); } - -BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {} -BrokerProxy::~BrokerProxy() { delete impl; } -void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); } -void BrokerProxy::sessionClosed() { impl->sessionClosed(); } -void BrokerProxy::startProtocol() { impl->startProtocol(); } -void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } -bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } -void BrokerProxy::popXmt() { impl->popXmt(); } -bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); } -void BrokerProxy::popEvent() { impl->popEvent(); } -uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); } -const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); } -void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); } - -MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {} -MethodResponse::~MethodResponse() {} -uint32_t MethodResponse::getStatus() const { return impl->getStatus(); } -const Value* MethodResponse::getException() const { return impl->getException(); } -const Value* MethodResponse::getArgs() const { return impl->getArgs(); } - -QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {} -QueryResponse::~QueryResponse() {} -uint32_t QueryResponse::getStatus() const { return impl->getStatus(); } -const Value* QueryResponse::getException() const { return impl->getException(); } -uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); } -const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); } - -ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {} -ConsoleEngine::~ConsoleEngine() { delete impl; } -bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); } -void ConsoleEngine::popEvent() { impl->popEvent(); } -void ConsoleEngine::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); } -void ConsoleEngine::delConnection(BrokerProxy& broker) { impl->delConnection(broker); } -uint32_t ConsoleEngine::packageCount() const { return impl->packageCount(); } -const char* ConsoleEngine::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); } -uint32_t ConsoleEngine::classCount(const char* packageName) const { return impl->classCount(packageName); } -const SchemaClassKey* ConsoleEngine::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); } -ClassKind ConsoleEngine::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); } -const SchemaObjectClass* ConsoleEngine::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); } -const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); } -void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); } -void ConsoleEngine::bindClass(const SchemaClassKey* key) { impl->bindClass(key); } -void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); } -//void ConsoleEngine::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); } -//void ConsoleEngine::touchSync(SyncQuery& sync) { impl->touchSync(sync); } -//void ConsoleEngine::endSync(SyncQuery& sync) { impl->endSync(sync); } - - diff --git a/qpid/cpp/src/qmf/ConsoleEngine.h b/qpid/cpp/src/qmf/ConsoleEngine.h deleted file mode 100644 index 457e83ad58..0000000000 --- a/qpid/cpp/src/qmf/ConsoleEngine.h +++ /dev/null @@ -1,222 +0,0 @@ -#ifndef _QmfConsoleEngine_ -#define _QmfConsoleEngine_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/ResilientConnection.h> -#include <qmf/Schema.h> -#include <qmf/ObjectId.h> -#include <qmf/Object.h> -#include <qmf/Event.h> -#include <qmf/Query.h> -#include <qmf/Value.h> -#include <qmf/Message.h> - -namespace qmf { - - class ConsoleEngine; - class ConsoleEngineImpl; - class BrokerProxyImpl; - class AgentProxy; - class AgentProxyImpl; - class MethodResponseImpl; - class QueryResponseImpl; - class QueryContext; - - /** - * - */ - class MethodResponse { - public: - MethodResponse(MethodResponseImpl* impl); - ~MethodResponse(); - uint32_t getStatus() const; - const Value* getException() const; - const Value* getArgs() const; - - private: - friend class ConsoleEngineImpl; - MethodResponseImpl* impl; - }; - - /** - * - */ - class QueryResponse { - public: - QueryResponse(QueryResponseImpl* impl); - ~QueryResponse(); - uint32_t getStatus() const; - const Value* getException() const; - uint32_t getObjectCount() const; - const Object* getObject(uint32_t idx) const; - - private: - friend class QueryContext; - QueryResponseImpl *impl; - }; - - /** - * - */ - struct ConsoleEvent { - enum EventKind { - AGENT_ADDED = 1, - AGENT_DELETED = 2, - NEW_PACKAGE = 3, - NEW_CLASS = 4, - OBJECT_UPDATE = 5, - EVENT_RECEIVED = 7, - AGENT_HEARTBEAT = 8, - METHOD_RESPONSE = 9 - }; - - EventKind kind; - AgentProxy* agent; // (AGENT_[ADDED|DELETED|HEARTBEAT]) - char* name; // (NEW_PACKAGE) - SchemaClassKey* classKey; // (NEW_CLASS) - Object* object; // (OBJECT_UPDATE) - void* context; // (OBJECT_UPDATE) - Event* event; // (EVENT_RECEIVED) - uint64_t timestamp; // (AGENT_HEARTBEAT) - uint32_t methodHandle; // (METHOD_RESPONSE) - MethodResponse* methodResponse; // (METHOD_RESPONSE) - QueryResponse* queryResponse; // (QUERY_COMPLETE) - }; - - /** - * - */ - struct BrokerEvent { - enum EventKind { - BROKER_INFO = 10, - DECLARE_QUEUE = 11, - DELETE_QUEUE = 12, - BIND = 13, - UNBIND = 14, - SETUP_COMPLETE = 15, - STABLE = 16, - QUERY_COMPLETE = 17 - }; - - EventKind kind; - char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND) - char* exchange; // ([UN]BIND) - char* bindingKey; // ([UN]BIND) - void* context; // (QUERY_COMPLETE) - QueryResponse* queryResponse; // (QUERY_COMPLETE) - }; - - /** - * - */ - class AgentProxy { - public: - AgentProxy(AgentProxyImpl* impl); - ~AgentProxy(); - const char* getLabel() const; - - private: - friend class BrokerProxyImpl; - AgentProxyImpl* impl; - }; - - /** - * - */ - class BrokerProxy { - public: - BrokerProxy(ConsoleEngine& console); - ~BrokerProxy(); - - void sessionOpened(SessionHandle& sh); - void sessionClosed(); - void startProtocol(); - - void handleRcvMessage(Message& message); - bool getXmtMessage(Message& item) const; - void popXmt(); - - bool getEvent(BrokerEvent& event) const; - void popEvent(); - - uint32_t agentCount() const; - const AgentProxy* getAgent(uint32_t idx) const; - void sendQuery(const Query& query, void* context, const AgentProxy* agent = 0); - - private: - friend class ConsoleEngineImpl; - BrokerProxyImpl* impl; - }; - - // TODO - move this to a public header - struct ConsoleSettings { - bool rcvObjects; - bool rcvEvents; - bool rcvHeartbeats; - bool userBindings; - - ConsoleSettings() : - rcvObjects(true), - rcvEvents(true), - rcvHeartbeats(true), - userBindings(false) {} - }; - - class ConsoleEngine { - public: - ConsoleEngine(const ConsoleSettings& settings = ConsoleSettings()); - ~ConsoleEngine(); - - bool getEvent(ConsoleEvent& event) const; - void popEvent(); - - void addConnection(BrokerProxy& broker, void* context); - void delConnection(BrokerProxy& broker); - - uint32_t packageCount() const; - const char* getPackageName(uint32_t idx) const; - - uint32_t classCount(const char* packageName) const; - const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const; - - ClassKind getClassKind(const SchemaClassKey* key) const; - const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const; - const SchemaEventClass* getEventClass(const SchemaClassKey* key) const; - - void bindPackage(const char* packageName); - void bindClass(const SchemaClassKey* key); - void bindClass(const char* packageName, const char* className); - - /* - void startSync(const Query& query, void* context, SyncQuery& sync); - void touchSync(SyncQuery& sync); - void endSync(SyncQuery& sync); - */ - - private: - friend class BrokerProxyImpl; - friend class AgentProxyImpl; - ConsoleEngineImpl* impl; - }; -} - -#endif - diff --git a/qpid/cpp/src/qmf/Event.h b/qpid/cpp/src/qmf/Event.h deleted file mode 100644 index f20c6d2fb1..0000000000 --- a/qpid/cpp/src/qmf/Event.h +++ /dev/null @@ -1,30 +0,0 @@ -#ifndef _QmfEvent_ -#define _QmfEvent_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace qmf { - - class Event { - }; -} - -#endif - diff --git a/qpid/cpp/src/qmf/Message.h b/qpid/cpp/src/qmf/Message.h deleted file mode 100644 index 52b8ba72d3..0000000000 --- a/qpid/cpp/src/qmf/Message.h +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef _QmfMessage_ -#define _QmfMessage_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qpid/sys/IntegerTypes.h" - -namespace qmf { - - struct Message { - char* body; - uint32_t length; - char* destination; - char* routingKey; - char* replyExchange; - char* replyKey; - char* userId; - }; - -} - -#endif diff --git a/qpid/cpp/src/qmf/Object.h b/qpid/cpp/src/qmf/Object.h deleted file mode 100644 index 9cb3224d9b..0000000000 --- a/qpid/cpp/src/qmf/Object.h +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef _QmfObject_ -#define _QmfObject_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/Schema.h> -#include <qmf/ObjectId.h> -#include <qmf/Value.h> - -namespace qmf { - - struct ObjectImpl; - class Object { - public: - Object(const SchemaObjectClass* type); - Object(ObjectImpl* impl); - Object(const Object& from); - virtual ~Object(); - - void destroy(); - const ObjectId* getObjectId() const; - void setObjectId(ObjectId* oid); - const SchemaObjectClass* getClass() const; - Value* getValue(char* key) const; - - ObjectImpl* impl; - }; -} - -#endif - diff --git a/qpid/cpp/src/qmf/ObjectId.h b/qpid/cpp/src/qmf/ObjectId.h deleted file mode 100644 index e894e0b39c..0000000000 --- a/qpid/cpp/src/qmf/ObjectId.h +++ /dev/null @@ -1,53 +0,0 @@ -#ifndef _QmfObjectId_ -#define _QmfObjectId_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/sys/IntegerTypes.h> - -namespace qmf { - - // TODO: Add to/from string and << operator - - struct ObjectIdImpl; - class ObjectId { - public: - ObjectId(); - ObjectId(const ObjectId& from); - ObjectId(ObjectIdImpl* impl); - ~ObjectId(); - - uint64_t getObjectNum() const; - uint32_t getObjectNumHi() const; - uint32_t getObjectNumLo() const; - bool isDurable() const; - - bool operator==(const ObjectId& other) const; - bool operator<(const ObjectId& other) const; - bool operator>(const ObjectId& other) const; - bool operator<=(const ObjectId& other) const; - bool operator>=(const ObjectId& other) const; - - ObjectIdImpl* impl; - }; -} - -#endif - diff --git a/qpid/cpp/src/qmf/Query.h b/qpid/cpp/src/qmf/Query.h deleted file mode 100644 index 875749862e..0000000000 --- a/qpid/cpp/src/qmf/Query.h +++ /dev/null @@ -1,104 +0,0 @@ -#ifndef _QmfQuery_ -#define _QmfQuery_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/ObjectId.h> -#include <qmf/Value.h> - -namespace qmf { - - struct Object; - struct QueryElementImpl; - struct QueryImpl; - struct QueryExpressionImpl; - struct SchemaClassKey; - - enum ValueOper { - O_EQ = 1, - O_NE = 2, - O_LT = 3, - O_LE = 4, - O_GT = 5, - O_GE = 6, - O_RE_MATCH = 7, - O_RE_NOMATCH = 8 - }; - - struct QueryOperand { - virtual ~QueryOperand() {} - virtual bool evaluate(const Object* object) const = 0; - }; - - struct QueryElement : public QueryOperand { - QueryElement(const char* attrName, const Value* value, ValueOper oper); - QueryElement(QueryElementImpl* impl); - virtual ~QueryElement(); - bool evaluate(const Object* object) const; - - QueryElementImpl* impl; - }; - - enum ExprOper { - E_NOT = 1, - E_AND = 2, - E_OR = 3, - E_XOR = 4 - }; - - struct QueryExpression : public QueryOperand { - QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2); - QueryExpression(QueryExpressionImpl* impl); - virtual ~QueryExpression(); - bool evaluate(const Object* object) const; - - QueryExpressionImpl* impl; - }; - - class Query { - public: - Query(const char* className, const char* packageName); - Query(const SchemaClassKey* key); - Query(const ObjectId* oid); - Query(QueryImpl* impl); - ~Query(); - - void setSelect(const QueryOperand* criterion); - void setLimit(uint32_t maxResults); - void setOrderBy(const char* attrName, bool decreasing); - - const char* getPackage() const; - const char* getClass() const; - const ObjectId* getObjectId() const; - - bool haveSelect() const; - bool haveLimit() const; - bool haveOrderBy() const; - const QueryOperand* getSelect() const; - uint32_t getLimit() const; - const char* getOrderBy() const; - bool getDecreasing() const; - - QueryImpl* impl; - }; -} - -#endif - diff --git a/qpid/cpp/src/qmf/ResilientConnection.h b/qpid/cpp/src/qmf/ResilientConnection.h deleted file mode 100644 index 03f1b9c0d5..0000000000 --- a/qpid/cpp/src/qmf/ResilientConnection.h +++ /dev/null @@ -1,163 +0,0 @@ -#ifndef _QmfResilientConnection_ -#define _QmfResilientConnection_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/Message.h> -#include <qmf/ConnectionSettings.h> -#include <string> - -namespace qmf { - - class ResilientConnectionImpl; - - /** - * Represents events that occur, unsolicited, from ResilientConnection. - */ - struct ResilientConnectionEvent { - enum EventKind { - CONNECTED = 1, - DISCONNECTED = 2, - SESSION_CLOSED = 3, - RECV = 4 - }; - - EventKind kind; - void* sessionContext; // SESSION_CLOSED, RECV - char* errorText; // DISCONNECTED, SESSION_CLOSED - Message message; // RECV - }; - - class SessionHandle { - friend class ResilientConnectionImpl; - void* impl; - }; - - /** - * ResilientConnection represents a Qpid connection that is resilient. - * - * Upon creation, ResilientConnection attempts to establish a connection to the - * messaging broker. If it fails, it will continue to retry at an interval that - * increases over time (to a maximum interval). If an extablished connection is - * dropped, a reconnect will be attempted. - */ - class ResilientConnection { - public: - - /** - * Create a new resilient connection. - *@param settings Settings that define how the connection is to be made. - *@param delayMin Minimum delay (in seconds) between retries. - *@param delayMax Maximum delay (in seconds) between retries. - *@param delayFactor Factor to multiply retry delay by after each failure. - */ - ResilientConnection(const ConnectionSettings& settings); - ~ResilientConnection(); - - /** - * Get the connected status of the resilient connection. - *@return true iff the connection is established. - */ - bool isConnected() const; - - /** - * Get the next event (if present) from the connection. - *@param event Returned event if one is available. - *@return true if event is valid, false if there are no more events to handle. - */ - bool getEvent(ResilientConnectionEvent& event); - - /** - * Discard the event on the front of the queue. This should be invoked after processing - * the event from getEvent. - */ - void popEvent(); - - /** - * Create a new AMQP session. - *@param name Unique name for the session. - *@param sessionContext Optional user-context value that will be provided in events - * pertaining to this session. - *@param handle Output handle to be stored and used in subsequent calls pertaining to - * this session. - *@return true iff the session was successfully created. - */ - bool createSession(const char* name, void* sessionContext, SessionHandle& handle); - - /** - * Destroy a created session. - *@param handle SessionHandle returned by createSession. - */ - void destroySession(SessionHandle handle); - - /** - * Send a message into the AMQP broker via a session. - *@param handle The session handle of the session to transmit through. - *@param message The QMF message to transmit. - */ - void sendMessage(SessionHandle handle, Message& message); - - /** - * Declare an exclusive, auto-delete queue for a session. - *@param handle The session handle for the owner of the queue. - *@param queue The name of the queue. - */ - void declareQueue(SessionHandle handle, char* queue); - - /** - * Delete a queue. - *@param handle The session handle for the owner of the queue. - *@param queue The name of the queue. - */ - void deleteQueue(SessionHandle handle, char* queue); - - /** - * Bind a queue to an exchange. - *@param handle The session handle of the session to use for binding. - *@param exchange The name of the exchange for binding. - *@param queue The name of the queue for binding. - *@param key The binding key. - */ - void bind(SessionHandle handle, char* exchange, char* queue, char* key); - - /** - * Remove a binding. - *@param handle The session handle of the session to use for un-binding. - *@param exchange The name of the exchange. - *@param queue The name of the queue. - *@param key The binding key. - */ - void unbind(SessionHandle handle, char* exchange, char* queue, char* key); - - /** - * Establish a file descriptor for event notification. - *@param fd A file descriptor into which the connection shall write a character each - * time an event is enqueued. This fd may be in a pair, the other fd of which - * is used in a select loop to control execution. - */ - void setNotifyFd(int fd); - - private: - ResilientConnectionImpl* impl; - }; -} - -#endif - diff --git a/qpid/cpp/src/qmf/Schema.h b/qpid/cpp/src/qmf/Schema.h deleted file mode 100644 index 1123acc3b8..0000000000 --- a/qpid/cpp/src/qmf/Schema.h +++ /dev/null @@ -1,171 +0,0 @@ -#ifndef _QmfSchema_ -#define _QmfSchema_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/Typecode.h> -#include <qpid/sys/IntegerTypes.h> - -namespace qmf { - - enum Access { ACCESS_READ_CREATE = 1, ACCESS_READ_WRITE = 2, ACCESS_READ_ONLY = 3 }; - enum Direction { DIR_IN = 1, DIR_OUT = 2, DIR_IN_OUT = 3 }; - enum ClassKind { CLASS_OBJECT = 1, CLASS_EVENT = 2 }; - - struct SchemaArgumentImpl; - struct SchemaMethodImpl; - struct SchemaPropertyImpl; - struct SchemaStatisticImpl; - struct SchemaObjectClassImpl; - struct SchemaEventClassImpl; - struct SchemaClassKeyImpl; - - /** - */ - class SchemaArgument { - public: - SchemaArgument(const char* name, Typecode typecode); - SchemaArgument(SchemaArgumentImpl* impl); - ~SchemaArgument(); - void setDirection(Direction dir); - void setUnit(const char* val); - void setDesc(const char* desc); - const char* getName() const; - Typecode getType() const; - Direction getDirection() const; - const char* getUnit() const; - const char* getDesc() const; - - SchemaArgumentImpl* impl; - }; - - /** - */ - class SchemaMethod { - public: - SchemaMethod(const char* name); - SchemaMethod(SchemaMethodImpl* impl); - ~SchemaMethod(); - void addArgument(const SchemaArgument& argument); - void setDesc(const char* desc); - const char* getName() const; - const char* getDesc() const; - int getArgumentCount() const; - const SchemaArgument* getArgument(int idx) const; - - SchemaMethodImpl* impl; - }; - - /** - */ - class SchemaProperty { - public: - SchemaProperty(const char* name, Typecode typecode); - SchemaProperty(SchemaPropertyImpl* impl); - ~SchemaProperty(); - void setAccess(Access access); - void setIndex(bool val); - void setOptional(bool val); - void setUnit(const char* val); - void setDesc(const char* desc); - const char* getName() const; - Typecode getType() const; - Access getAccess() const; - bool isIndex() const; - bool isOptional() const; - const char* getUnit() const; - const char* getDesc() const; - - SchemaPropertyImpl* impl; - }; - - /** - */ - class SchemaStatistic { - public: - SchemaStatistic(const char* name, Typecode typecode); - SchemaStatistic(SchemaStatisticImpl* impl); - ~SchemaStatistic(); - void setUnit(const char* val); - void setDesc(const char* desc); - const char* getName() const; - Typecode getType() const; - const char* getUnit() const; - const char* getDesc() const; - - SchemaStatisticImpl* impl; - }; - - /** - */ - class SchemaClassKey { - public: - SchemaClassKey(SchemaClassKeyImpl* impl); - ~SchemaClassKey(); - - const char* getPackageName() const; - const char* getClassName() const; - const uint8_t* getHash() const; - - SchemaClassKeyImpl* impl; - }; - - /** - */ - class SchemaObjectClass { - public: - SchemaObjectClass(const char* package, const char* name); - SchemaObjectClass(SchemaObjectClassImpl* impl); - ~SchemaObjectClass(); - void addProperty(const SchemaProperty& property); - void addStatistic(const SchemaStatistic& statistic); - void addMethod(const SchemaMethod& method); - - const SchemaClassKey* getClassKey() const; - int getPropertyCount() const; - int getStatisticCount() const; - int getMethodCount() const; - const SchemaProperty* getProperty(int idx) const; - const SchemaStatistic* getStatistic(int idx) const; - const SchemaMethod* getMethod(int idx) const; - - SchemaObjectClassImpl* impl; - }; - - /** - */ - class SchemaEventClass { - public: - SchemaEventClass(const char* package, const char* name); - SchemaEventClass(SchemaEventClassImpl* impl); - ~SchemaEventClass(); - void addArgument(const SchemaArgument& argument); - void setDesc(const char* desc); - - const SchemaClassKey* getClassKey() const; - int getArgumentCount() const; - const SchemaArgument* getArgument(int idx) const; - - SchemaEventClassImpl* impl; - }; -} - -#endif - diff --git a/qpid/cpp/src/qmf/Typecode.h b/qpid/cpp/src/qmf/Typecode.h deleted file mode 100644 index 94614d2977..0000000000 --- a/qpid/cpp/src/qmf/Typecode.h +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef _QmfTypecode_ -#define _QmfTypecode_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace qmf { - - enum Typecode { - TYPE_UINT8 = 1, - TYPE_UINT16 = 2, - TYPE_UINT32 = 3, - TYPE_UINT64 = 4, - TYPE_SSTR = 6, - TYPE_LSTR = 7, - TYPE_ABSTIME = 8, - TYPE_DELTATIME = 9, - TYPE_REF = 10, - TYPE_BOOL = 11, - TYPE_FLOAT = 12, - TYPE_DOUBLE = 13, - TYPE_UUID = 14, - TYPE_MAP = 15, - TYPE_INT8 = 16, - TYPE_INT16 = 17, - TYPE_INT32 = 18, - TYPE_INT64 = 19, - TYPE_OBJECT = 20, - TYPE_LIST = 21, - TYPE_ARRAY = 22 - }; -} - -#endif - diff --git a/qpid/cpp/src/qmf/Value.h b/qpid/cpp/src/qmf/Value.h deleted file mode 100644 index bb946d31d3..0000000000 --- a/qpid/cpp/src/qmf/Value.h +++ /dev/null @@ -1,113 +0,0 @@ -#ifndef _QmfValue_ -#define _QmfValue_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/ObjectId.h> -#include <qmf/Typecode.h> - -namespace qmf { - - class Object; - struct ValueImpl; - - class Value { - public: - // Value(); - Value(Typecode t, Typecode arrayType = TYPE_UINT8); - Value(ValueImpl* impl); - ~Value(); - - Typecode getType() const; - bool isNull() const; - void setNull(); - - bool isObjectId() const; - const ObjectId& asObjectId() const; - void setObjectId(const ObjectId& oid); - - bool isUint() const; - uint32_t asUint() const; - void setUint(uint32_t val); - - bool isInt() const; - int32_t asInt() const; - void setInt(int32_t val); - - bool isUint64() const; - uint64_t asUint64() const; - void setUint64(uint64_t val); - - bool isInt64() const; - int64_t asInt64() const; - void setInt64(int64_t val); - - bool isString() const; - const char* asString() const; - void setString(const char* val); - - bool isBool() const; - bool asBool() const; - void setBool(bool val); - - bool isFloat() const; - float asFloat() const; - void setFloat(float val); - - bool isDouble() const; - double asDouble() const; - void setDouble(double val); - - bool isUuid() const; - const uint8_t* asUuid() const; - void setUuid(const uint8_t* val); - - bool isObject() const; - Object* asObject() const; - void setObject(Object* val); - - bool isMap() const; - bool keyInMap(const char* key) const; - Value* byKey(const char* key); - const Value* byKey(const char* key) const; - void deleteKey(const char* key); - void insert(const char* key, Value* val); - uint32_t keyCount() const; - const char* key(uint32_t idx) const; - - bool isList() const; - uint32_t listItemCount() const; - Value* listItem(uint32_t idx); - void appendToList(Value* val); - void deleteListItem(uint32_t idx); - - bool isArray() const; - Typecode arrayType() const; - uint32_t arrayItemCount() const; - Value* arrayItem(uint32_t idx); - void appendToArray(Value* val); - void deleteArrayItem(uint32_t idx); - - ValueImpl* impl; - }; -} - -#endif - diff --git a/qpid/cpp/src/qmf/AgentEngine.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp index 9ea3be5907..c5d1bff2e0 100644 --- a/qpid/cpp/src/qmf/AgentEngine.cpp +++ b/qpid/cpp/src/qmf/engine/Agent.cpp @@ -17,15 +17,15 @@ * under the License. */ -#include "qmf/AgentEngine.h" -#include "qmf/MessageImpl.h" -#include "qmf/SchemaImpl.h" -#include "qmf/Typecode.h" -#include "qmf/ObjectImpl.h" -#include "qmf/ObjectIdImpl.h" -#include "qmf/QueryImpl.h" -#include "qmf/ValueImpl.h" -#include "qmf/Protocol.h" +#include "qmf/engine/Agent.h" +#include "qmf/engine/MessageImpl.h" +#include "qmf/engine/SchemaImpl.h" +#include "qmf/engine/Typecode.h" +#include "qmf/engine/ObjectImpl.h" +#include "qmf/engine/ObjectIdImpl.h" +#include "qmf/engine/QueryImpl.h" +#include "qmf/engine/ValueImpl.h" +#include "qmf/engine/Protocol.h" #include <qpid/framing/Buffer.h> #include <qpid/framing/Uuid.h> #include <qpid/framing/FieldTable.h> @@ -40,13 +40,15 @@ #include <iostream> #include <fstream> #include <boost/shared_ptr.hpp> +#include <boost/noncopyable.hpp> using namespace std; -using namespace qmf; +using namespace qmf::engine; using namespace qpid::framing; using namespace qpid::sys; namespace qmf { +namespace engine { struct AgentEventImpl { typedef boost::shared_ptr<AgentEventImpl> Ptr; @@ -61,7 +63,7 @@ namespace qmf { boost::shared_ptr<Value> arguments; string exchange; string bindingKey; - SchemaObjectClass* objectClass; + const SchemaObjectClass* objectClass; AgentEventImpl(AgentEvent::EventKind k) : kind(k), sequence(0), object(0), objectClass(0) {} @@ -74,14 +76,14 @@ namespace qmf { uint32_t sequence; string exchange; string key; - SchemaMethodImpl* schemaMethod; + const SchemaMethod* schemaMethod; AgentQueryContext() : schemaMethod(0) {} }; - class AgentEngineImpl { + class AgentImpl : public boost::noncopyable { public: - AgentEngineImpl(char* label, bool internalStore); - ~AgentEngineImpl(); + AgentImpl(char* label, bool internalStore); + ~AgentImpl(); void setStoreDir(const char* path); void setTransferDir(const char* path); @@ -163,8 +165,8 @@ namespace qmf { } }; - typedef map<AgentClassKey, SchemaObjectClassImpl*, AgentClassKeyComp> ObjectClassMap; - typedef map<AgentClassKey, SchemaEventClassImpl*, AgentClassKeyComp> EventClassMap; + typedef map<AgentClassKey, SchemaObjectClass*, AgentClassKeyComp> ObjectClassMap; + typedef map<AgentClassKey, SchemaEventClass*, AgentClassKeyComp> EventClassMap; struct ClassMaps { ObjectClassMap objectClasses; @@ -180,7 +182,7 @@ namespace qmf { boost::shared_ptr<ObjectId> oid); AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method, boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap, - SchemaObjectClass* objectClass); + const SchemaObjectClass* objectClass); void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); void sendPackageIndicationLH(const string& packageName); @@ -198,10 +200,11 @@ namespace qmf { void handleConsoleAddedIndication(); }; } +} -const char* AgentEngineImpl::QMF_EXCHANGE = "qpid.management"; -const char* AgentEngineImpl::DIR_EXCHANGE = "amq.direct"; -const char* AgentEngineImpl::BROKER_KEY = "broker"; +const char* AgentImpl::QMF_EXCHANGE = "qpid.management"; +const char* AgentImpl::DIR_EXCHANGE = "amq.direct"; +const char* AgentImpl::BROKER_KEY = "broker"; #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} @@ -227,7 +230,7 @@ AgentEvent AgentEventImpl::copy() return item; } -AgentEngineImpl::AgentEngineImpl(char* _label, bool i) : +AgentImpl::AgentImpl(char* _label, bool i) : label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), @@ -236,11 +239,11 @@ AgentEngineImpl::AgentEngineImpl(char* _label, bool i) : queueName += label; } -AgentEngineImpl::~AgentEngineImpl() +AgentImpl::~AgentImpl() { } -void AgentEngineImpl::setStoreDir(const char* path) +void AgentImpl::setStoreDir(const char* path) { Mutex::ScopedLock _lock(lock); if (path) @@ -249,7 +252,7 @@ void AgentEngineImpl::setStoreDir(const char* path) storeDir.clear(); } -void AgentEngineImpl::setTransferDir(const char* path) +void AgentImpl::setTransferDir(const char* path) { Mutex::ScopedLock _lock(lock); if (path) @@ -258,7 +261,7 @@ void AgentEngineImpl::setTransferDir(const char* path) transferDir.clear(); } -void AgentEngineImpl::handleRcvMessage(Message& message) +void AgentImpl::handleRcvMessage(Message& message) { Buffer inBuffer(message.body, message.length); uint8_t opcode; @@ -274,13 +277,13 @@ void AgentEngineImpl::handleRcvMessage(Message& message) else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId); else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId); else { - QPID_LOG(error, "AgentEngineImpl::handleRcvMessage invalid opcode=" << opcode); + QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode); break; } } } -bool AgentEngineImpl::getXmtMessage(Message& item) const +bool AgentImpl::getXmtMessage(Message& item) const { Mutex::ScopedLock _lock(lock); if (xmtQueue.empty()) @@ -289,14 +292,14 @@ bool AgentEngineImpl::getXmtMessage(Message& item) const return true; } -void AgentEngineImpl::popXmt() +void AgentImpl::popXmt() { Mutex::ScopedLock _lock(lock); if (!xmtQueue.empty()) xmtQueue.pop_front(); } -bool AgentEngineImpl::getEvent(AgentEvent& event) const +bool AgentImpl::getEvent(AgentEvent& event) const { Mutex::ScopedLock _lock(lock); if (eventQueue.empty()) @@ -305,14 +308,14 @@ bool AgentEngineImpl::getEvent(AgentEvent& event) const return true; } -void AgentEngineImpl::popEvent() +void AgentImpl::popEvent() { Mutex::ScopedLock _lock(lock); if (!eventQueue.empty()) eventQueue.pop_front(); } -void AgentEngineImpl::newSession() +void AgentImpl::newSession() { Mutex::ScopedLock _lock(lock); eventQueue.clear(); @@ -322,7 +325,7 @@ void AgentEngineImpl::newSession() eventQueue.push_back(eventSetupComplete()); } -void AgentEngineImpl::startProtocol() +void AgentImpl::startProtocol() { Mutex::ScopedLock _lock(lock); char rawbuffer[512]; @@ -338,7 +341,7 @@ void AgentEngineImpl::startProtocol() " reqAgent=" << requestedAgentBank); } -void AgentEngineImpl::heartbeat() +void AgentImpl::heartbeat() { Mutex::ScopedLock _lock(lock); Buffer buffer(outputBuffer, MA_BUFFER_SIZE); @@ -351,7 +354,7 @@ void AgentEngineImpl::heartbeat() QPID_LOG(trace, "SENT HeartbeatIndication"); } -void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, +void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& argMap) { Mutex::ScopedLock _lock(lock); @@ -366,15 +369,15 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t buffer.putLong(status); buffer.putMediumString(text); if (status == 0) { - for (vector<SchemaArgumentImpl*>::const_iterator aIter = context->schemaMethod->arguments.begin(); - aIter != context->schemaMethod->arguments.end(); aIter++) { - const SchemaArgumentImpl* schemaArg = *aIter; - if (schemaArg->dir == DIR_OUT || schemaArg->dir == DIR_IN_OUT) { - if (argMap.keyInMap(schemaArg->name.c_str())) { - const Value* val = argMap.byKey(schemaArg->name.c_str()); + for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin(); + aIter != context->schemaMethod->impl->arguments.end(); aIter++) { + const SchemaArgument* schemaArg = *aIter; + if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) { + if (argMap.keyInMap(schemaArg->getName())) { + const Value* val = argMap.byKey(schemaArg->getName()); val->impl->encode(buffer); } else { - Value val(schemaArg->typecode); + Value val(schemaArg->getType()); val.impl->encode(buffer); } } @@ -384,7 +387,7 @@ void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* t QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text); } -void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) +void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { Mutex::ScopedLock _lock(lock); map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); @@ -406,7 +409,7 @@ void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence); } -void AgentEngineImpl::queryComplete(uint32_t sequence) +void AgentImpl::queryComplete(uint32_t sequence) { Mutex::ScopedLock _lock(lock); map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); @@ -418,69 +421,67 @@ void AgentEngineImpl::queryComplete(uint32_t sequence) sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK"); } -void AgentEngineImpl::registerClass(SchemaObjectClass* cls) +void AgentImpl::registerClass(SchemaObjectClass* cls) { Mutex::ScopedLock _lock(lock); - SchemaObjectClassImpl* impl = cls->impl; - map<string, ClassMaps>::iterator iter = packages.find(impl->package); + map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName()); if (iter == packages.end()) { - packages[impl->package] = ClassMaps(); - iter = packages.find(impl->getClassKey()->getPackageName()); + packages[cls->getClassKey()->getPackageName()] = ClassMaps(); + iter = packages.find(cls->getClassKey()->getPackageName()); // TODO: Indicate this package if connected } - AgentClassKey key(impl->getClassKey()->getClassName(), impl->getClassKey()->getHash()); - iter->second.objectClasses[key] = impl; + AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash()); + iter->second.objectClasses[key] = cls; // TODO: Indicate this schema if connected. } -void AgentEngineImpl::registerClass(SchemaEventClass* cls) +void AgentImpl::registerClass(SchemaEventClass* cls) { Mutex::ScopedLock _lock(lock); - SchemaEventClassImpl* impl = cls->impl; - map<string, ClassMaps>::iterator iter = packages.find(impl->package); + map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName()); if (iter == packages.end()) { - packages[impl->package] = ClassMaps(); - iter = packages.find(impl->getClassKey()->getPackageName()); + packages[cls->getClassKey()->getPackageName()] = ClassMaps(); + iter = packages.find(cls->getClassKey()->getPackageName()); // TODO: Indicate this package if connected } - AgentClassKey key(impl->getClassKey()->getClassName(), impl->getClassKey()->getHash()); - iter->second.eventClasses[key] = impl; + AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash()); + iter->second.eventClasses[key] = cls; // TODO: Indicate this schema if connected. } -const ObjectId* AgentEngineImpl::addObject(Object&, uint64_t) +const ObjectId* AgentImpl::addObject(Object&, uint64_t) { Mutex::ScopedLock _lock(lock); return 0; } -const ObjectId* AgentEngineImpl::allocObjectId(uint64_t persistId) +const ObjectId* AgentImpl::allocObjectId(uint64_t persistId) { Mutex::ScopedLock _lock(lock); uint16_t sequence = persistId ? 0 : bootSequence; uint64_t objectNum = persistId ? persistId : nextObjectId++; - ObjectIdImpl* oid = new ObjectIdImpl(&attachment, 0, sequence, objectNum); - return oid->envelope; + ObjectId* oid = ObjectIdImpl::factory(&attachment, 0, sequence, objectNum); + return oid; } -const ObjectId* AgentEngineImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) +const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo); } -void AgentEngineImpl::raiseEvent(Event&) +void AgentImpl::raiseEvent(Event&) { Mutex::ScopedLock _lock(lock); } -AgentEventImpl::Ptr AgentEngineImpl::eventDeclareQueue(const string& name) +AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE)); event->name = name; @@ -488,7 +489,7 @@ AgentEventImpl::Ptr AgentEngineImpl::eventDeclareQueue(const string& name) return event; } -AgentEventImpl::Ptr AgentEngineImpl::eventBind(const string& exchange, const string& queue, +AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue, const string& key) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND)); @@ -499,13 +500,13 @@ AgentEventImpl::Ptr AgentEngineImpl::eventBind(const string& exchange, const str return event; } -AgentEventImpl::Ptr AgentEngineImpl::eventSetupComplete() +AgentEventImpl::Ptr AgentImpl::eventSetupComplete() { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE)); return event; } -AgentEventImpl::Ptr AgentEngineImpl::eventQuery(uint32_t num, const string& userId, const string& package, +AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package, const string& cls, boost::shared_ptr<ObjectId> oid) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); @@ -518,9 +519,9 @@ AgentEventImpl::Ptr AgentEngineImpl::eventQuery(uint32_t num, const string& user return event; } -AgentEventImpl::Ptr AgentEngineImpl::eventMethod(uint32_t num, const string& userId, const string& method, +AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method, boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap, - SchemaObjectClass* objectClass) + const SchemaObjectClass* objectClass) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL)); event->sequence = num; @@ -532,7 +533,7 @@ AgentEventImpl::Ptr AgentEngineImpl::eventMethod(uint32_t num, const string& use return event; } -void AgentEngineImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) +void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) { uint32_t length = buf.getPosition(); MessageImpl::Ptr message(new MessageImpl); @@ -547,7 +548,7 @@ void AgentEngineImpl::sendBufferLH(Buffer& buf, const string& destination, const xmtQueue.push_back(message); } -void AgentEngineImpl::sendPackageIndicationLH(const string& packageName) +void AgentImpl::sendPackageIndicationLH(const string& packageName) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION); @@ -556,7 +557,7 @@ void AgentEngineImpl::sendPackageIndicationLH(const string& packageName) QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName); } -void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key) +void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION); @@ -568,7 +569,7 @@ void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packag QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name); } -void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey, +void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey, uint32_t sequence, uint32_t code, const string& text) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); @@ -579,7 +580,7 @@ void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); } -void AgentEngineImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text) +void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence); @@ -605,7 +606,7 @@ void AgentEngineImpl::sendMethodErrorLH(uint32_t sequence, const string& key, ui QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext); } -void AgentEngineImpl::handleAttachResponse(Buffer& inBuffer) +void AgentImpl::handleAttachResponse(Buffer& inBuffer) { Mutex::ScopedLock _lock(lock); @@ -652,17 +653,17 @@ void AgentEngineImpl::handleAttachResponse(Buffer& inBuffer) } } -void AgentEngineImpl::handlePackageRequest(Buffer&) +void AgentImpl::handlePackageRequest(Buffer&) { Mutex::ScopedLock _lock(lock); } -void AgentEngineImpl::handleClassQuery(Buffer&) +void AgentImpl::handleClassQuery(Buffer&) { Mutex::ScopedLock _lock(lock); } -void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, +void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyExchange, const string& replyKey) { Mutex::ScopedLock _lock(lock); @@ -688,10 +689,10 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, ClassMaps cMap = pIter->second; ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key); if (ocIter != cMap.objectClasses.end()) { - SchemaObjectClassImpl* oImpl = ocIter->second; + SchemaObjectClass* oImpl = ocIter->second; Buffer buffer(outputBuffer, MA_BUFFER_SIZE); Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); - oImpl->encode(buffer); + oImpl->impl->encode(buffer); sendBufferLH(buffer, rExchange, rKey); QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name); return; @@ -699,10 +700,10 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, EventClassMap::iterator ecIter = cMap.eventClasses.find(key); if (ecIter != cMap.eventClasses.end()) { - SchemaEventClassImpl* eImpl = ecIter->second; + SchemaEventClass* eImpl = ecIter->second; Buffer buffer(outputBuffer, MA_BUFFER_SIZE); Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); - eImpl->encode(buffer); + eImpl->impl->encode(buffer); sendBufferLH(buffer, rExchange, rKey); QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name); return; @@ -711,7 +712,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found"); } -void AgentEngineImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId) +void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId) { Mutex::ScopedLock _lock(lock); FieldTable ft; @@ -763,13 +764,12 @@ void AgentEngineImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid)); } -void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId) +void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId) { Mutex::ScopedLock _lock(lock); string pname; string method; - ObjectIdImpl* oidImpl = new ObjectIdImpl(buffer); - boost::shared_ptr<ObjectId> oid(oidImpl->envelope); + boost::shared_ptr<ObjectId> oid(ObjectIdImpl::factory(buffer)); buffer.getShortString(pname); AgentClassKey classKey(buffer); buffer.getShortString(method); @@ -788,29 +788,29 @@ void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, con return; } - const SchemaObjectClassImpl* schema = cIter->second; - vector<SchemaMethodImpl*>::const_iterator mIter = schema->methods.begin(); - for (; mIter != schema->methods.end(); mIter++) { - if ((*mIter)->name == method) + const SchemaObjectClass* schema = cIter->second; + vector<const SchemaMethod*>::const_iterator mIter = schema->impl->methods.begin(); + for (; mIter != schema->impl->methods.end(); mIter++) { + if ((*mIter)->getName() == method) break; } - if (mIter == schema->methods.end()) { + if (mIter == schema->impl->methods.end()) { sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method); return; } - SchemaMethodImpl* schemaMethod = *mIter; + const SchemaMethod* schemaMethod = *mIter; boost::shared_ptr<Value> argMap(new Value(TYPE_MAP)); - ValueImpl* value; - for (vector<SchemaArgumentImpl*>::const_iterator aIter = schemaMethod->arguments.begin(); - aIter != schemaMethod->arguments.end(); aIter++) { - const SchemaArgumentImpl* schemaArg = *aIter; - if (schemaArg->dir == DIR_IN || schemaArg->dir == DIR_IN_OUT) - value = new ValueImpl(schemaArg->typecode, buffer); + Value* value; + for (vector<const SchemaArgument*>::const_iterator aIter = schemaMethod->impl->arguments.begin(); + aIter != schemaMethod->impl->arguments.end(); aIter++) { + const SchemaArgument* schemaArg = *aIter; + if (schemaArg->getDirection() == DIR_IN || schemaArg->getDirection() == DIR_IN_OUT) + value = ValueImpl::factory(schemaArg->getType(), buffer); else - value = new ValueImpl(schemaArg->typecode); - argMap->insert(schemaArg->name.c_str(), value->envelope); + value = ValueImpl::factory(schemaArg->getType()); + argMap->insert(schemaArg->getName(), value); } AgentQueryContext::Ptr context(new AgentQueryContext); @@ -821,10 +821,10 @@ void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, con context->schemaMethod = schemaMethod; contextMap[contextNum] = context; - eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema->envelope)); + eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema)); } -void AgentEngineImpl::handleConsoleAddedIndication() +void AgentImpl::handleConsoleAddedIndication() { Mutex::ScopedLock _lock(lock); } @@ -833,25 +833,25 @@ void AgentEngineImpl::handleConsoleAddedIndication() // Wrappers //================================================================== -AgentEngine::AgentEngine(char* label, bool internalStore) { impl = new AgentEngineImpl(label, internalStore); } -AgentEngine::~AgentEngine() { delete impl; } -void AgentEngine::setStoreDir(const char* path) { impl->setStoreDir(path); } -void AgentEngine::setTransferDir(const char* path) { impl->setTransferDir(path); } -void AgentEngine::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } -bool AgentEngine::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } -void AgentEngine::popXmt() { impl->popXmt(); } -bool AgentEngine::getEvent(AgentEvent& event) const { return impl->getEvent(event); } -void AgentEngine::popEvent() { impl->popEvent(); } -void AgentEngine::newSession() { impl->newSession(); } -void AgentEngine::startProtocol() { impl->startProtocol(); } -void AgentEngine::heartbeat() { impl->heartbeat(); } -void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); } -void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); } -void AgentEngine::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); } -void AgentEngine::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); } -void AgentEngine::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); } -const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); } -const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); } -const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); } -void AgentEngine::raiseEvent(Event& event) { impl->raiseEvent(event); } +Agent::Agent(char* label, bool internalStore) { impl = new AgentImpl(label, internalStore); } +Agent::~Agent() { delete impl; } +void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); } +void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); } +void Agent::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } +bool Agent::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } +void Agent::popXmt() { impl->popXmt(); } +bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); } +void Agent::popEvent() { impl->popEvent(); } +void Agent::newSession() { impl->newSession(); } +void Agent::startProtocol() { impl->startProtocol(); } +void Agent::heartbeat() { impl->heartbeat(); } +void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); } +void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); } +void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); } +void Agent::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); } +void Agent::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); } +const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); } +const ObjectId* Agent::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); } +const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); } +void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); } diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp new file mode 100644 index 0000000000..1a2b3e6555 --- /dev/null +++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp @@ -0,0 +1,763 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/engine/BrokerProxyImpl.h" +#include "qmf/engine/ConsoleImpl.h" +#include "qmf/engine/Protocol.h" +#include "qpid/Address.h" +#include "qpid/sys/SystemInfo.h" +#include <qpid/log/Statement.h> +#include <qpid/StringUtils.h> +#include <string.h> +#include <iostream> +#include <fstream> + +using namespace std; +using namespace qmf::engine; +using namespace qpid::framing; +using namespace qpid::sys; + +namespace { + const char* QMF_EXCHANGE = "qpid.management"; + const char* DIR_EXCHANGE = "amq.direct"; + const char* BROKER_KEY = "broker"; + const char* BROKER_PACKAGE = "org.apache.qpid.broker"; + const char* AGENT_CLASS = "agent"; + const char* BROKER_AGENT_KEY = "agent.1.0"; +} + +const Object* QueryResponseImpl::getObject(uint32_t idx) const +{ + vector<ObjectPtr>::const_iterator iter = results.begin(); + + while (idx > 0) { + if (iter == results.end()) + return 0; + iter++; + idx--; + } + + return iter->get(); +} + +#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} + +BrokerEvent BrokerEventImpl::copy() +{ + BrokerEvent item; + + ::memset(&item, 0, sizeof(BrokerEvent)); + item.kind = kind; + + STRING_REF(name); + STRING_REF(exchange); + STRING_REF(bindingKey); + item.context = context; + item.queryResponse = queryResponse.get(); + item.methodResponse = methodResponse.get(); + + return item; +} + +BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, Console& _console) : publicObject(pub), console(_console) +{ + stringstream qn; + qpid::TcpAddress addr; + + SystemInfo::getLocalHostname(addr); + qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId(); + queueName = qn.str(); + + seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this))); +} + +void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/) +{ + Mutex::ScopedLock _lock(lock); + agentList.clear(); + eventQueue.clear(); + xmtQueue.clear(); + eventQueue.push_back(eventDeclareQueue(queueName)); + eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName)); + eventQueue.push_back(eventSetupComplete()); + + // TODO: Store session handle +} + +void BrokerProxyImpl::sessionClosed() +{ + Mutex::ScopedLock _lock(lock); + agentList.clear(); + eventQueue.clear(); + xmtQueue.clear(); +} + +void BrokerProxyImpl::startProtocol() +{ + AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker")); + { + Mutex::ScopedLock _lock(lock); + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + agentList[0] = agent; + + requestsOutstanding = 1; + topicBound = false; + uint32_t sequence(seqMgr.reserve()); + Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence); + sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence); + } + + console.impl->eventAgentAdded(agent); +} + +void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) +{ + uint32_t length = buf.getPosition(); + MessageImpl::Ptr message(new MessageImpl); + + buf.reset(); + buf.getRawData(message->body, length); + message->destination = destination; + message->routingKey = routingKey; + message->replyExchange = DIR_EXCHANGE; + message->replyKey = queueName; + + xmtQueue.push_back(message); +} + +void BrokerProxyImpl::handleRcvMessage(Message& message) +{ + Buffer inBuffer(message.body, message.length); + uint8_t opcode; + uint32_t sequence; + + while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) + seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer); +} + +bool BrokerProxyImpl::getXmtMessage(Message& item) const +{ + Mutex::ScopedLock _lock(lock); + if (xmtQueue.empty()) + return false; + item = xmtQueue.front()->copy(); + return true; +} + +void BrokerProxyImpl::popXmt() +{ + Mutex::ScopedLock _lock(lock); + if (!xmtQueue.empty()) + xmtQueue.pop_front(); +} + +bool BrokerProxyImpl::getEvent(BrokerEvent& event) const +{ + Mutex::ScopedLock _lock(lock); + if (eventQueue.empty()) + return false; + event = eventQueue.front()->copy(); + return true; +} + +void BrokerProxyImpl::popEvent() +{ + Mutex::ScopedLock _lock(lock); + if (!eventQueue.empty()) + eventQueue.pop_front(); +} + +uint32_t BrokerProxyImpl::agentCount() const +{ + Mutex::ScopedLock _lock(lock); + return agentList.size(); +} + +const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const +{ + Mutex::ScopedLock _lock(lock); + for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin(); + iter != agentList.end(); iter++) + if (idx-- == 0) + return iter->second.get(); + return 0; +} + +void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent) +{ + SequenceContext::Ptr queryContext(new QueryContext(*this, context)); + Mutex::ScopedLock _lock(lock); + if (agent != 0) { + sendGetRequestLH(queryContext, query, agent); + } else { + // TODO (optimization) only send queries to agents that have the requested class+package + for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin(); + iter != agentList.end(); iter++) { + sendGetRequestLH(queryContext, query, iter->second.get()); + } + } +} + +void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent) +{ + stringstream key; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve(queryContext)); + agent->impl->addSequence(sequence); + + Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); + query.impl->encode(outBuffer); + key << "agent.1." << agent->impl->agentBank; + sendBufferLH(outBuffer, QMF_EXCHANGE, key.str()); + QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str()); +} + +string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const Value* argmap, Buffer& buffer) +{ + int argCount = schema->getArgumentCount(); + + if (argmap == 0 || !argmap->isMap()) + return string("Arguments must be in a map value"); + + for (int aIdx = 0; aIdx < argCount; aIdx++) { + const SchemaArgument* arg(schema->getArgument(aIdx)); + if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) { + if (argmap->keyInMap(arg->getName())) { + const Value* argVal(argmap->byKey(arg->getName())); + if (argVal->getType() != arg->getType()) + return string("Argument is the wrong type: ") + arg->getName(); + argVal->impl->encode(buffer); + } else { + Value defaultValue(arg->getType()); + defaultValue.impl->encode(buffer); + } + } + } + + return string(); +} + +void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, + const string& methodName, const Value* args, void* userContext) +{ + int methodCount = cls->getMethodCount(); + int idx; + for (idx = 0; idx < methodCount; idx++) { + const SchemaMethod* method = cls->getMethod(idx); + if (string(method->getName()) == methodName) { + Mutex::ScopedLock _lock(lock); + SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method)); + stringstream key; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve(methodContext)); + + Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST, sequence); + oid->impl->encode(outBuffer); + cls->getClassKey()->impl->encode(outBuffer); + outBuffer.putShortString(methodName); + + string argErrorString = encodeMethodArguments(method, args, outBuffer); + if (argErrorString.empty()) { + key << "agent.1." << oid->impl->getAgentBank(); + sendBufferLH(outBuffer, QMF_EXCHANGE, key.str()); + QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << " method=" << methodName << " key=" << key.str()); + } else { + MethodResponsePtr argError(MethodResponseImpl::factory(1, argErrorString)); + eventQueue.push_back(eventMethodResponse(userContext, argError)); + } + return; + } + } + + MethodResponsePtr error(MethodResponseImpl::factory(1, string("Unknown method: ") + methodName)); + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(eventMethodResponse(userContext, error)); +} + +void BrokerProxyImpl::addBinding(const string& exchange, const string& key) +{ + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(eventBind(exchange, queueName, key)); +} + +BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName) +{ + BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE)); + event->name = queueName; + return event; +} + +BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key) +{ + BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND)); + event->name = queue; + event->exchange = exchange; + event->bindingKey = key; + + return event; +} + +BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete() +{ + BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE)); + return event; +} + +BrokerEventImpl::Ptr BrokerProxyImpl::eventStable() +{ + QPID_LOG(trace, "Console Link to Broker Stable"); + BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE)); + return event; +} + +BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponsePtr response) +{ + BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE)); + event->context = context; + event->queryResponse = response; + return event; +} + +BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponsePtr response) +{ + BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::METHOD_RESPONSE)); + event->context = context; + event->methodResponse = response; + return event; +} + +void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq) +{ + brokerId.decode(inBuffer); + QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId); + Mutex::ScopedLock _lock(lock); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve()); + incOutstandingLH(); + Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence); + sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT PackageRequest seq=" << sequence); +} + +void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq) +{ + string package; + + inBuffer.getShortString(package); + QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package); + console.impl->learnPackage(package); + + Mutex::ScopedLock _lock(lock); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve()); + incOutstandingLH(); + Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence); + outBuffer.putShortString(package); + sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package); +} + +void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq) +{ + string text; + uint32_t code = inBuffer.getLong(); + inBuffer.getShortString(text); + QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text); +} + +void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq) +{ + uint8_t kind = inBuffer.getOctet(); + auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer)); + + QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey->impl->str()); + + if (!console.impl->haveClass(classKey.get())) { + Mutex::ScopedLock _lock(lock); + incOutstandingLH(); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve()); + Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence); + classKey->impl->encode(outBuffer); + sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey->impl->str()); + } +} + +MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema) +{ + MethodResponsePtr response(MethodResponseImpl::factory(inBuffer, schema)); + + QPID_LOG(trace, "RCVD MethodResponse seq=" << seq << " status=" << response->getStatus() << " text=" << + response->getException()->asString()); + + return response; +} + +void BrokerProxyImpl::handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq, const string& routingKey) +{ + vector<string> tokens = qpid::split(routingKey, "."); + uint32_t agentBank; + uint64_t timestamp; + + if (routingKey.empty() || tokens.size() != 4) + agentBank = 0; + else + agentBank = ::atoi(tokens[3].c_str()); + + timestamp = inBuffer.getLongLong(); + map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank); + if (iter != agentList.end()) { + console.impl->eventAgentHeartbeat(iter->second, timestamp); + } + QPID_LOG(trace, "RCVD HeartbeatIndication seq=" << seq << " agentBank=" << agentBank); +} + +void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) +{ + // TODO +} + +void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) +{ + SchemaObjectClass* oClassPtr; + SchemaEventClass* eClassPtr; + uint8_t kind = inBuffer.getOctet(); + const SchemaClassKey* key; + if (kind == CLASS_OBJECT) { + oClassPtr = SchemaObjectClassImpl::factory(inBuffer); + console.impl->learnClass(oClassPtr); + key = oClassPtr->getClassKey(); + QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str()); + + // + // If we have just learned about the org.apache.qpid.broker:agent class, send a get + // request for the current list of agents so we can have it on-hand before we declare + // this session "stable". + // + if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) { + Mutex::ScopedLock _lock(lock); + incOutstandingLH(); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t sequence(seqMgr.reserve()); + Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); + FieldTable ft; + ft.setString("_class", AGENT_CLASS); + ft.setString("_package", BROKER_PACKAGE); + ft.encode(outBuffer); + sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY); + QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY); + } + } else if (kind == CLASS_EVENT) { + eClassPtr = SchemaEventClassImpl::factory(inBuffer); + console.impl->learnClass(eClassPtr); + key = eClassPtr->getClassKey(); + QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str()); + } + else { + QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind); + } +} + +ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat) +{ + auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer)); + QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str()); + + SchemaObjectClass* schema = console.impl->getSchema(classKey.get()); + if (schema == 0) { + QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str()); + return ObjectPtr(); + } + + ObjectPtr optr(ObjectImpl::factory(schema, this, inBuffer, prop, stat, true)); + if (prop && classKey->impl->getPackageName() == BROKER_PACKAGE && classKey->impl->getClassName() == AGENT_CLASS) { + // + // We've intercepted information about a remote agent... update the agent list accordingly + // + updateAgentList(optr); + } + return optr; +} + +void BrokerProxyImpl::updateAgentList(ObjectPtr obj) +{ + Value* value = obj->getValue("agentBank"); + Mutex::ScopedLock _lock(lock); + if (value != 0 && value->isUint()) { + uint32_t agentBank = value->asUint(); + if (obj->isDeleted()) { + map<uint32_t, AgentProxyPtr>::iterator iter = agentList.find(agentBank); + if (iter != agentList.end()) { + AgentProxyPtr agent(iter->second); + console.impl->eventAgentDeleted(agent); + agentList.erase(agentBank); + QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list"); + + // + // Release all sequence numbers for requests in-flight to this agent. + // Since the agent is no longer connected, these requests would not + // otherwise complete. + // + agent->impl->releaseInFlight(seqMgr); + } + } else { + Value* str = obj->getValue("label"); + string label; + if (str != 0 && str->isString()) + label = str->asString(); + map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank); + if (iter == agentList.end()) { + AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, agentBank, label)); + agentList[agentBank] = agent; + console.impl->eventAgentAdded(agent); + QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank); + } + } + } +} + +void BrokerProxyImpl::incOutstandingLH() +{ + requestsOutstanding++; +} + +void BrokerProxyImpl::decOutstanding() +{ + Mutex::ScopedLock _lock(lock); + requestsOutstanding--; + if (requestsOutstanding == 0 && !topicBound) { + topicBound = true; + for (vector<pair<string, string> >::const_iterator iter = console.impl->bindingList.begin(); + iter != console.impl->bindingList.end(); iter++) { + string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first); + string key(iter->second); + eventQueue.push_back(eventBind(exchange, queueName, key)); + } + eventQueue.push_back(eventStable()); + } +} + +MethodResponseImpl::MethodResponseImpl(const MethodResponseImpl& from) : + status(from.status), schema(from.schema) +{ + if (from.exception.get()) + exception.reset(new Value(*(from.exception))); + if (from.arguments.get()) + arguments.reset(new Value(*(from.arguments))); +} + +MethodResponseImpl::MethodResponseImpl(Buffer& buf, const SchemaMethod* s) : schema(s) +{ + string text; + + status = buf.getLong(); + buf.getMediumString(text); + exception.reset(new Value(TYPE_LSTR)); + exception->setString(text.c_str()); + + if (status != 0) + return; + + arguments.reset(new Value(TYPE_MAP)); + int argCount(schema->getArgumentCount()); + for (int idx = 0; idx < argCount; idx++) { + const SchemaArgument* arg = schema->getArgument(idx); + if (arg->getDirection() == DIR_OUT || arg->getDirection() == DIR_IN_OUT) { + Value* value(ValueImpl::factory(arg->getType(), buf)); + arguments->insert(arg->getName(), value); + } + } +} + +MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : schema(0) +{ + status = s; + exception.reset(new Value(TYPE_LSTR)); + exception->setString(text.c_str()); +} + +MethodResponse* MethodResponseImpl::factory(Buffer& buf, const SchemaMethod* schema) +{ + MethodResponseImpl* impl(new MethodResponseImpl(buf, schema)); + return new MethodResponse(impl); +} + +MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string& text) +{ + MethodResponseImpl* impl(new MethodResponseImpl(status, text)); + return new MethodResponse(impl); +} + +bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& routingKey, Buffer& buffer) +{ + ObjectPtr object; + bool completeContext = false; + + if (opcode == Protocol::OP_BROKER_RESPONSE) { + broker.handleBrokerResponse(buffer, sequence); + completeContext = true; + } + else if (opcode == Protocol::OP_COMMAND_COMPLETE) { + broker.handleCommandComplete(buffer, sequence); + completeContext = true; + } + else if (opcode == Protocol::OP_SCHEMA_RESPONSE) { + broker.handleSchemaResponse(buffer, sequence); + completeContext = true; + } + else if (opcode == Protocol::OP_PACKAGE_INDICATION) + broker.handlePackageIndication(buffer, sequence); + else if (opcode == Protocol::OP_CLASS_INDICATION) + broker.handleClassIndication(buffer, sequence); + else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) + broker.handleHeartbeatIndication(buffer, sequence, routingKey); + else if (opcode == Protocol::OP_EVENT_INDICATION) + broker.handleEventIndication(buffer, sequence); + else if (opcode == Protocol::OP_PROPERTY_INDICATION) { + object = broker.handleObjectIndication(buffer, sequence, true, false); + broker.console.impl->eventObjectUpdate(object, true, false); + } + else if (opcode == Protocol::OP_STATISTIC_INDICATION) { + object = broker.handleObjectIndication(buffer, sequence, false, true); + broker.console.impl->eventObjectUpdate(object, false, true); + } + else if (opcode == Protocol::OP_OBJECT_INDICATION) { + object = broker.handleObjectIndication(buffer, sequence, true, true); + broker.console.impl->eventObjectUpdate(object, true, true); + } + else { + QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode); + completeContext = true; + } + + return completeContext; +} + +void QueryContext::reserve() +{ + Mutex::ScopedLock _lock(lock); + requestsOutstanding++; +} + +void QueryContext::release() +{ + { + Mutex::ScopedLock _lock(lock); + if (--requestsOutstanding > 0) + return; + } + + Mutex::ScopedLock _block(broker.lock); + broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse)); +} + +bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer) +{ + bool completeContext = false; + ObjectPtr object; + + if (opcode == Protocol::OP_COMMAND_COMPLETE) { + broker.handleCommandComplete(buffer, sequence); + completeContext = true; + + // + // Visit each agent and remove the sequence from that agent's in-flight list. + // This could be made more efficient because only one agent will have this sequence + // in its list. + // + map<uint32_t, AgentProxyPtr> copy; + { + Mutex::ScopedLock _block(broker.lock); + copy = broker.agentList; + } + for (map<uint32_t, AgentProxyPtr>::iterator iter = copy.begin(); iter != copy.end(); iter++) + iter->second->impl->delSequence(sequence); + } + else if (opcode == Protocol::OP_OBJECT_INDICATION) { + object = broker.handleObjectIndication(buffer, sequence, true, true); + if (object.get() != 0) + queryResponse->impl->results.push_back(object); + } + else { + QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode); + completeContext = true; + } + + return completeContext; +} + +void MethodContext::release() +{ + Mutex::ScopedLock _block(broker.lock); + broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse)); +} + +bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer) +{ + if (opcode == Protocol::OP_METHOD_RESPONSE) + methodResponse = broker.handleMethodResponse(buffer, sequence, schema); + else + QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode); + + return true; +} + + +//================================================================== +// Wrappers +//================================================================== + +AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {} +AgentProxy::~AgentProxy() { delete impl; } +const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); } +uint32_t AgentProxy::getBrokerBank() const { return impl->getBrokerBank(); } +uint32_t AgentProxy::getAgentBank() const { return impl->getAgentBank(); } + +BrokerProxy::BrokerProxy(Console& console) : impl(new BrokerProxyImpl(*this, console)) {} +BrokerProxy::~BrokerProxy() { delete impl; } +void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); } +void BrokerProxy::sessionClosed() { impl->sessionClosed(); } +void BrokerProxy::startProtocol() { impl->startProtocol(); } +void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } +bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } +void BrokerProxy::popXmt() { impl->popXmt(); } +bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); } +void BrokerProxy::popEvent() { impl->popEvent(); } +uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); } +const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); } +void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); } + +MethodResponse::MethodResponse(const MethodResponse& from) : impl(new MethodResponseImpl(*(from.impl))) {} +MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {} +MethodResponse::~MethodResponse() {} +uint32_t MethodResponse::getStatus() const { return impl->getStatus(); } +const Value* MethodResponse::getException() const { return impl->getException(); } +const Value* MethodResponse::getArgs() const { return impl->getArgs(); } + +QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {} +QueryResponse::~QueryResponse() {} +uint32_t QueryResponse::getStatus() const { return impl->getStatus(); } +const Value* QueryResponse::getException() const { return impl->getException(); } +uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); } +const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); } + diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h new file mode 100644 index 0000000000..798a5fdc76 --- /dev/null +++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h @@ -0,0 +1,239 @@ +#ifndef _QmfEngineBrokerProxyImpl_ +#define _QmfEngineBrokerProxyImpl_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/engine/Console.h" +#include "qmf/engine/ObjectImpl.h" +#include "qmf/engine/SchemaImpl.h" +#include "qmf/engine/ValueImpl.h" +#include "qmf/engine/QueryImpl.h" +#include "qmf/engine/SequenceManager.h" +#include "qmf/engine/MessageImpl.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/Uuid.h" +#include "qpid/sys/Mutex.h" +#include "boost/shared_ptr.hpp" +#include "boost/noncopyable.hpp" +#include <memory> +#include <string> +#include <deque> +#include <map> +#include <set> +#include <vector> + +namespace qmf { +namespace engine { + + typedef boost::shared_ptr<MethodResponse> MethodResponsePtr; + struct MethodResponseImpl { + uint32_t status; + const SchemaMethod* schema; + std::auto_ptr<Value> exception; + std::auto_ptr<Value> arguments; + + MethodResponseImpl(const MethodResponseImpl& from); + MethodResponseImpl(qpid::framing::Buffer& buf, const SchemaMethod* schema); + MethodResponseImpl(uint32_t status, const std::string& text); + static MethodResponse* factory(qpid::framing::Buffer& buf, const SchemaMethod* schema); + static MethodResponse* factory(uint32_t status, const std::string& text); + ~MethodResponseImpl() {} + uint32_t getStatus() const { return status; } + const Value* getException() const { return exception.get(); } + const Value* getArgs() const { return arguments.get(); } + }; + + typedef boost::shared_ptr<QueryResponse> QueryResponsePtr; + struct QueryResponseImpl { + uint32_t status; + std::auto_ptr<Value> exception; + std::vector<ObjectPtr> results; + + QueryResponseImpl() : status(0) {} + static QueryResponse* factory() { + QueryResponseImpl* impl(new QueryResponseImpl()); + return new QueryResponse(impl); + } + ~QueryResponseImpl() {} + uint32_t getStatus() const { return status; } + const Value* getException() const { return exception.get(); } + uint32_t getObjectCount() const { return results.size(); } + const Object* getObject(uint32_t idx) const; + }; + + struct BrokerEventImpl { + typedef boost::shared_ptr<BrokerEventImpl> Ptr; + BrokerEvent::EventKind kind; + std::string name; + std::string exchange; + std::string bindingKey; + void* context; + QueryResponsePtr queryResponse; + MethodResponsePtr methodResponse; + + BrokerEventImpl(BrokerEvent::EventKind k) : kind(k), context(0) {} + ~BrokerEventImpl() {} + BrokerEvent copy(); + }; + + typedef boost::shared_ptr<AgentProxy> AgentProxyPtr; + struct AgentProxyImpl { + Console& console; + BrokerProxy& broker; + uint32_t agentBank; + std::string label; + std::set<uint32_t> inFlightSequences; + + AgentProxyImpl(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) : console(c), broker(b), agentBank(ab), label(l) {} + static AgentProxy* factory(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) { + AgentProxyImpl* impl(new AgentProxyImpl(c, b, ab, l)); + return new AgentProxy(impl); + } + ~AgentProxyImpl() {} + const std::string& getLabel() const { return label; } + uint32_t getBrokerBank() const { return 1; } + uint32_t getAgentBank() const { return agentBank; } + void addSequence(uint32_t seq) { inFlightSequences.insert(seq); } + void delSequence(uint32_t seq) { inFlightSequences.erase(seq); } + void releaseInFlight(SequenceManager& seqMgr) { + for (std::set<uint32_t>::iterator iter = inFlightSequences.begin(); iter != inFlightSequences.end(); iter++) + seqMgr.release(*iter); + inFlightSequences.clear(); + } + }; + + class BrokerProxyImpl : public boost::noncopyable { + public: + BrokerProxyImpl(BrokerProxy& pub, Console& _console); + ~BrokerProxyImpl() {} + + void sessionOpened(SessionHandle& sh); + void sessionClosed(); + void startProtocol(); + + void sendBufferLH(qpid::framing::Buffer& buf, const std::string& destination, const std::string& routingKey); + void handleRcvMessage(Message& message); + bool getXmtMessage(Message& item) const; + void popXmt(); + + bool getEvent(BrokerEvent& event) const; + void popEvent(); + + uint32_t agentCount() const; + const AgentProxy* getAgent(uint32_t idx) const; + void sendQuery(const Query& query, void* context, const AgentProxy* agent); + void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent); + std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer); + void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context); + + void addBinding(const std::string& exchange, const std::string& key); + void staticRelease() { decOutstanding(); } + + private: + friend struct StaticContext; + friend struct QueryContext; + friend struct MethodContext; + BrokerProxy& publicObject; + mutable qpid::sys::Mutex lock; + Console& console; + std::string queueName; + qpid::framing::Uuid brokerId; + SequenceManager seqMgr; + uint32_t requestsOutstanding; + bool topicBound; + std::map<uint32_t, AgentProxyPtr> agentList; + std::deque<MessageImpl::Ptr> xmtQueue; + std::deque<BrokerEventImpl::Ptr> eventQueue; + +# define MA_BUFFER_SIZE 65536 + char outputBuffer[MA_BUFFER_SIZE]; + + BrokerEventImpl::Ptr eventDeclareQueue(const std::string& queueName); + BrokerEventImpl::Ptr eventBind(const std::string& exchange, const std::string& queue, const std::string& key); + BrokerEventImpl::Ptr eventSetupComplete(); + BrokerEventImpl::Ptr eventStable(); + BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponsePtr response); + BrokerEventImpl::Ptr eventMethodResponse(void* context, MethodResponsePtr response); + + void handleBrokerResponse(qpid::framing::Buffer& inBuffer, uint32_t seq); + void handlePackageIndication(qpid::framing::Buffer& inBuffer, uint32_t seq); + void handleCommandComplete(qpid::framing::Buffer& inBuffer, uint32_t seq); + void handleClassIndication(qpid::framing::Buffer& inBuffer, uint32_t seq); + MethodResponsePtr handleMethodResponse(qpid::framing::Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema); + void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, const std::string& routingKey); + void handleEventIndication(qpid::framing::Buffer& inBuffer, uint32_t seq); + void handleSchemaResponse(qpid::framing::Buffer& inBuffer, uint32_t seq); + ObjectPtr handleObjectIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, bool prop, bool stat); + void updateAgentList(ObjectPtr obj); + void incOutstandingLH(); + void decOutstanding(); + }; + + // + // StaticContext is used to handle: + // + // 1) Responses to console-level requests (for schema info, etc.) + // 2) Unsolicited messages from agents (events, published updates, etc.) + // + struct StaticContext : public SequenceContext { + StaticContext(BrokerProxyImpl& b) : broker(b) {} + virtual ~StaticContext() {} + void reserve() {} + void release() { broker.staticRelease(); } + bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer); + BrokerProxyImpl& broker; + }; + + // + // QueryContext is used to track and handle responses associated with a single Get Query + // + struct QueryContext : public SequenceContext { + QueryContext(BrokerProxyImpl& b, void* u) : + broker(b), userContext(u), requestsOutstanding(0), queryResponse(QueryResponseImpl::factory()) {} + virtual ~QueryContext() {} + void reserve(); + void release(); + bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer); + + mutable qpid::sys::Mutex lock; + BrokerProxyImpl& broker; + void* userContext; + uint32_t requestsOutstanding; + QueryResponsePtr queryResponse; + }; + + struct MethodContext : public SequenceContext { + MethodContext(BrokerProxyImpl& b, void* u, const SchemaMethod* s) : broker(b), userContext(u), schema(s) {} + virtual ~MethodContext() {} + void reserve() {} + void release(); + bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer); + + BrokerProxyImpl& broker; + void* userContext; + const SchemaMethod* schema; + MethodResponsePtr methodResponse; + }; + +} +} + +#endif + diff --git a/qpid/cpp/src/qmf/ConnectionSettingsImpl.cpp b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp index 034ab18395..2cd6af10f8 100644 --- a/qpid/cpp/src/qmf/ConnectionSettingsImpl.cpp +++ b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp @@ -17,11 +17,11 @@ * under the License. */ -#include "qmf/ConnectionSettingsImpl.h" -#include "qmf/Typecode.h" +#include "qmf/engine/ConnectionSettingsImpl.h" +#include "qmf/engine/Typecode.h" using namespace std; -using namespace qmf; +using namespace qmf::engine; using namespace qpid; const string attrProtocol("protocol"); @@ -43,19 +43,20 @@ const string attrMaxSsf("maxSsf"); const string attrRetryDelayMin("retryDelayMin"); const string attrRetryDelayMax("retryDelayMax"); const string attrRetryDelayFactor("retryDelayFactor"); +const string attrSendUserId("sendUserId"); -ConnectionSettingsImpl::ConnectionSettingsImpl(ConnectionSettings* e) : - envelope(e), retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2) +ConnectionSettingsImpl::ConnectionSettingsImpl() : + retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2), sendUserId(true) { } -ConnectionSettingsImpl::ConnectionSettingsImpl(ConnectionSettings* e, const string& /*url*/) : - envelope(e), retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2) +ConnectionSettingsImpl::ConnectionSettingsImpl(const string& /*url*/) : + retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2), sendUserId(true) { // TODO: Parse the URL } -void ConnectionSettingsImpl::setAttr(const string& key, const Value& value) +bool ConnectionSettingsImpl::setAttr(const string& key, const Value& value) { if (key == attrProtocol) clientSettings.protocol = value.asString(); else if (key == attrHost) clientSettings.host = value.asString(); @@ -77,6 +78,10 @@ void ConnectionSettingsImpl::setAttr(const string& key, const Value& value) else if (key == attrRetryDelayMin) retryDelayMin = value.asUint(); else if (key == attrRetryDelayMax) retryDelayMax = value.asUint(); else if (key == attrRetryDelayFactor) retryDelayFactor = value.asUint(); + else if (key == attrSendUserId) sendUserId = value.asBool(); + else + return false; + return true; } Value ConnectionSettingsImpl::getAttr(const string& key) const @@ -251,73 +256,18 @@ void ConnectionSettingsImpl::getRetrySettings(int* min, int* max, int* factor) c // Wrappers //================================================================== -ConnectionSettings::ConnectionSettings(const ConnectionSettings& from) -{ - impl = new ConnectionSettingsImpl(*from.impl); -} - -ConnectionSettings::ConnectionSettings() -{ - impl = new ConnectionSettingsImpl(this); -} - -ConnectionSettings::ConnectionSettings(const char* url) -{ - impl = new ConnectionSettingsImpl(this, url); -} - -ConnectionSettings::~ConnectionSettings() -{ - delete impl; -} - -void ConnectionSettings::setAttr(const char* key, const Value& value) -{ - impl->setAttr(key, value); -} - -Value ConnectionSettings::getAttr(const char* key) const -{ - return impl->getAttr(key); -} - -const char* ConnectionSettings::getAttrString() const -{ - return impl->getAttrString().c_str(); -} - -void ConnectionSettings::transportTcp(uint16_t port) -{ - impl->transportTcp(port); -} - -void ConnectionSettings::transportSsl(uint16_t port) -{ - impl->transportSsl(port); -} - -void ConnectionSettings::transportRdma(uint16_t port) -{ - impl->transportRdma(port); -} - -void ConnectionSettings::authAnonymous(const char* username) -{ - impl->authAnonymous(username); -} - -void ConnectionSettings::authPlain(const char* username, const char* password) -{ - impl->authPlain(username, password); -} - -void ConnectionSettings::authGssapi(const char* serviceName, uint32_t minSsf, uint32_t maxSsf) -{ - impl->authGssapi(serviceName, minSsf, maxSsf); -} - -void ConnectionSettings::setRetry(int delayMin, int delayMax, int delayFactor) -{ - impl->setRetry(delayMin, delayMax, delayFactor); -} +ConnectionSettings::ConnectionSettings(const ConnectionSettings& from) { impl = new ConnectionSettingsImpl(*from.impl); } +ConnectionSettings::ConnectionSettings() { impl = new ConnectionSettingsImpl(); } +ConnectionSettings::ConnectionSettings(const char* url) { impl = new ConnectionSettingsImpl(url); } +ConnectionSettings::~ConnectionSettings() { delete impl; } +bool ConnectionSettings::setAttr(const char* key, const Value& value) { return impl->setAttr(key, value); } +Value ConnectionSettings::getAttr(const char* key) const { return impl->getAttr(key); } +const char* ConnectionSettings::getAttrString() const { return impl->getAttrString().c_str(); } +void ConnectionSettings::transportTcp(uint16_t port) { impl->transportTcp(port); } +void ConnectionSettings::transportSsl(uint16_t port) { impl->transportSsl(port); } +void ConnectionSettings::transportRdma(uint16_t port) { impl->transportRdma(port); } +void ConnectionSettings::authAnonymous(const char* username) { impl->authAnonymous(username); } +void ConnectionSettings::authPlain(const char* username, const char* password) { impl->authPlain(username, password); } +void ConnectionSettings::authGssapi(const char* serviceName, uint32_t minSsf, uint32_t maxSsf) { impl->authGssapi(serviceName, minSsf, maxSsf); } +void ConnectionSettings::setRetry(int delayMin, int delayMax, int delayFactor) { impl->setRetry(delayMin, delayMax, delayFactor); } diff --git a/qpid/cpp/src/qmf/ConnectionSettingsImpl.h b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h index a177233cf3..98bf87868b 100644 --- a/qpid/cpp/src/qmf/ConnectionSettingsImpl.h +++ b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h @@ -1,5 +1,5 @@ -#ifndef _QmfConnectionSettingsImpl_ -#define _QmfConnectionSettingsImpl_ +#ifndef _QmfEngineConnectionSettingsImpl_ +#define _QmfEngineConnectionSettingsImpl_ /* * Licensed to the Apache Software Foundation (ASF) under one @@ -20,27 +20,28 @@ * under the License. */ -#include "qmf/ConnectionSettings.h" -#include "qmf/Value.h" +#include "qmf/engine/ConnectionSettings.h" +#include "qmf/engine/Value.h" #include "qpid/client/ConnectionSettings.h" #include <string> #include <map> namespace qmf { +namespace engine { class ConnectionSettingsImpl { - ConnectionSettings* envelope; qpid::client::ConnectionSettings clientSettings; mutable std::string attrString; int retryDelayMin; int retryDelayMax; int retryDelayFactor; + bool sendUserId; public: - ConnectionSettingsImpl(ConnectionSettings* e); - ConnectionSettingsImpl(ConnectionSettings* e, const std::string& url); + ConnectionSettingsImpl(); + ConnectionSettingsImpl(const std::string& url); ~ConnectionSettingsImpl() {} - void setAttr(const std::string& key, const Value& value); + bool setAttr(const std::string& key, const Value& value); Value getAttr(const std::string& key) const; const std::string& getAttrString() const; void transportTcp(uint16_t port); @@ -53,8 +54,10 @@ namespace qmf { const qpid::client::ConnectionSettings& getClientSettings() const; void getRetrySettings(int* delayMin, int* delayMax, int* delayFactor) const; + bool getSendUserId() const { return sendUserId; } }; } +} #endif diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp new file mode 100644 index 0000000000..c2d1f51f2b --- /dev/null +++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/engine/ConsoleImpl.h" +#include "qmf/engine/MessageImpl.h" +#include "qmf/engine/SchemaImpl.h" +#include "qmf/engine/Typecode.h" +#include "qmf/engine/ObjectImpl.h" +#include "qmf/engine/ObjectIdImpl.h" +#include "qmf/engine/QueryImpl.h" +#include "qmf/engine/ValueImpl.h" +#include "qmf/engine/Protocol.h" +#include "qmf/engine/SequenceManager.h" +#include "qmf/engine/BrokerProxyImpl.h" +#include <qpid/framing/Buffer.h> +#include <qpid/framing/Uuid.h> +#include <qpid/framing/FieldTable.h> +#include <qpid/framing/FieldValue.h> +#include <qpid/log/Statement.h> +#include <qpid/sys/Time.h> +#include <qpid/sys/SystemInfo.h> +#include <string.h> +#include <iostream> +#include <fstream> + +using namespace std; +using namespace qmf::engine; +using namespace qpid::framing; +using namespace qpid::sys; + +namespace { + const char* QMF_EXCHANGE = "qpid.management"; +} + +#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} + +ConsoleEvent ConsoleEventImpl::copy() +{ + ConsoleEvent item; + + ::memset(&item, 0, sizeof(ConsoleEvent)); + item.kind = kind; + item.agent = agent.get(); + item.classKey = classKey; + item.object = object.get(); + item.context = context; + item.event = event; + item.timestamp = timestamp; + item.hasProps = hasProps; + item.hasStats = hasStats; + + STRING_REF(name); + + return item; +} + +ConsoleImpl::ConsoleImpl(const ConsoleSettings& s) : settings(s) +{ + bindingList.push_back(pair<string, string>(string(), "schema.#")); + if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) { + bindingList.push_back(pair<string, string>(string(), "console.#")); + } else { + if (settings.rcvObjects && !settings.userBindings) + bindingList.push_back(pair<string, string>(string(), "console.obj.#")); + else + bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent")); + if (settings.rcvEvents) + bindingList.push_back(pair<string, string>(string(), "console.event.#")); + if (settings.rcvHeartbeats) + bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#")); + } +} + +ConsoleImpl::~ConsoleImpl() +{ + // This function intentionally left blank. +} + +bool ConsoleImpl::getEvent(ConsoleEvent& event) const +{ + Mutex::ScopedLock _lock(lock); + if (eventQueue.empty()) + return false; + event = eventQueue.front()->copy(); + return true; +} + +void ConsoleImpl::popEvent() +{ + Mutex::ScopedLock _lock(lock); + if (!eventQueue.empty()) + eventQueue.pop_front(); +} + +void ConsoleImpl::addConnection(BrokerProxy& broker, void* /*context*/) +{ + Mutex::ScopedLock _lock(lock); + brokerList.push_back(broker.impl); +} + +void ConsoleImpl::delConnection(BrokerProxy& broker) +{ + Mutex::ScopedLock _lock(lock); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + if (*iter == broker.impl) { + brokerList.erase(iter); + break; + } +} + +uint32_t ConsoleImpl::packageCount() const +{ + Mutex::ScopedLock _lock(lock); + return packages.size(); +} + +const string& ConsoleImpl::getPackageName(uint32_t idx) const +{ + const static string empty; + + Mutex::ScopedLock _lock(lock); + if (idx >= packages.size()) + return empty; + + PackageList::const_iterator iter = packages.begin(); + for (uint32_t i = 0; i < idx; i++) iter++; + return iter->first; +} + +uint32_t ConsoleImpl::classCount(const char* packageName) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(packageName); + if (pIter == packages.end()) + return 0; + + const ObjectClassList& oList = pIter->second.first; + const EventClassList& eList = pIter->second.second; + + return oList.size() + eList.size(); +} + +const SchemaClassKey* ConsoleImpl::getClass(const char* packageName, uint32_t idx) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(packageName); + if (pIter == packages.end()) + return 0; + + const ObjectClassList& oList = pIter->second.first; + const EventClassList& eList = pIter->second.second; + uint32_t count = 0; + + for (ObjectClassList::const_iterator oIter = oList.begin(); + oIter != oList.end(); oIter++) { + if (count == idx) + return oIter->second->getClassKey(); + count++; + } + + for (EventClassList::const_iterator eIter = eList.begin(); + eIter != eList.end(); eIter++) { + if (count == idx) + return eIter->second->getClassKey(); + count++; + } + + return 0; +} + +ClassKind ConsoleImpl::getClassKind(const SchemaClassKey* key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return CLASS_OBJECT; + + const EventClassList& eList = pIter->second.second; + if (eList.find(key) != eList.end()) + return CLASS_EVENT; + return CLASS_OBJECT; +} + +const SchemaObjectClass* ConsoleImpl::getObjectClass(const SchemaClassKey* key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return 0; + + const ObjectClassList& oList = pIter->second.first; + ObjectClassList::const_iterator iter = oList.find(key); + if (iter == oList.end()) + return 0; + return iter->second; +} + +const SchemaEventClass* ConsoleImpl::getEventClass(const SchemaClassKey* key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return 0; + + const EventClassList& eList = pIter->second.second; + EventClassList::const_iterator iter = eList.find(key); + if (iter == eList.end()) + return 0; + return iter->second; +} + +void ConsoleImpl::bindPackage(const char* packageName) +{ + stringstream key; + key << "console.obj.*.*." << packageName << ".#"; + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); +} + +void ConsoleImpl::bindClass(const SchemaClassKey* classKey) +{ + stringstream key; + key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#"; + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); +} + +void ConsoleImpl::bindClass(const char* packageName, const char* className) +{ + stringstream key; + key << "console.obj.*.*." << packageName << "." << className << ".#"; + Mutex::ScopedLock _lock(lock); + bindingList.push_back(pair<string, string>(string(), key.str())); + for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); + iter != brokerList.end(); iter++) + (*iter)->addBinding(QMF_EXCHANGE, key.str()); +} + +/* +void ConsoleImpl::startSync(const Query& query, void* context, SyncQuery& sync) +{ +} + +void ConsoleImpl::touchSync(SyncQuery& sync) +{ +} + +void ConsoleImpl::endSync(SyncQuery& sync) +{ +} +*/ + +void ConsoleImpl::learnPackage(const string& packageName) +{ + Mutex::ScopedLock _lock(lock); + if (packages.find(packageName) == packages.end()) { + packages.insert(pair<string, pair<ObjectClassList, EventClassList> > + (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList()))); + eventNewPackage(packageName); + } +} + +void ConsoleImpl::learnClass(SchemaObjectClass* cls) +{ + Mutex::ScopedLock _lock(lock); + const SchemaClassKey* key = cls->getClassKey(); + PackageList::iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return; + + ObjectClassList& list = pIter->second.first; + if (list.find(key) == list.end()) { + list[key] = cls; + eventNewClass(key); + } +} + +void ConsoleImpl::learnClass(SchemaEventClass* cls) +{ + Mutex::ScopedLock _lock(lock); + const SchemaClassKey* key = cls->getClassKey(); + PackageList::iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return; + + EventClassList& list = pIter->second.second; + if (list.find(key) == list.end()) { + list[key] = cls; + eventNewClass(key); + } +} + +bool ConsoleImpl::haveClass(const SchemaClassKey* key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return false; + + const ObjectClassList& oList = pIter->second.first; + const EventClassList& eList = pIter->second.second; + + return oList.find(key) != oList.end() || eList.find(key) != eList.end(); +} + +SchemaObjectClass* ConsoleImpl::getSchema(const SchemaClassKey* key) const +{ + Mutex::ScopedLock _lock(lock); + PackageList::const_iterator pIter = packages.find(key->getPackageName()); + if (pIter == packages.end()) + return 0; + + const ObjectClassList& oList = pIter->second.first; + ObjectClassList::const_iterator iter = oList.find(key); + if (iter == oList.end()) + return 0; + + return iter->second; +} + +void ConsoleImpl::eventAgentAdded(boost::shared_ptr<AgentProxy> agent) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_ADDED)); + event->agent = agent; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventAgentDeleted(boost::shared_ptr<AgentProxy> agent) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_DELETED)); + event->agent = agent; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventNewPackage(const string& packageName) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_PACKAGE)); + event->name = packageName; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventNewClass(const SchemaClassKey* key) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_CLASS)); + event->classKey = key; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventObjectUpdate(ObjectPtr object, bool prop, bool stat) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::OBJECT_UPDATE)); + event->object = object; + event->hasProps = prop; + event->hasStats = stat; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +void ConsoleImpl::eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp) +{ + ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_HEARTBEAT)); + event->agent = agent; + event->timestamp = timestamp; + Mutex::ScopedLock _lock(lock); + eventQueue.push_back(event); +} + +//================================================================== +// Wrappers +//================================================================== + +Console::Console(const ConsoleSettings& settings) : impl(new ConsoleImpl(settings)) {} +Console::~Console() { delete impl; } +bool Console::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); } +void Console::popEvent() { impl->popEvent(); } +void Console::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); } +void Console::delConnection(BrokerProxy& broker) { impl->delConnection(broker); } +uint32_t Console::packageCount() const { return impl->packageCount(); } +const char* Console::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); } +uint32_t Console::classCount(const char* packageName) const { return impl->classCount(packageName); } +const SchemaClassKey* Console::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); } +ClassKind Console::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); } +const SchemaObjectClass* Console::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); } +const SchemaEventClass* Console::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); } +void Console::bindPackage(const char* packageName) { impl->bindPackage(packageName); } +void Console::bindClass(const SchemaClassKey* key) { impl->bindClass(key); } +void Console::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); } +//void Console::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); } +//void Console::touchSync(SyncQuery& sync) { impl->touchSync(sync); } +//void Console::endSync(SyncQuery& sync) { impl->endSync(sync); } + + diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.h b/qpid/cpp/src/qmf/engine/ConsoleImpl.h new file mode 100644 index 0000000000..8f99c5e6b9 --- /dev/null +++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.h @@ -0,0 +1,145 @@ +#ifndef _QmfEngineConsoleEngineImpl_ +#define _QmfEngineConsoleEngineImpl_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/engine/Console.h" +#include "qmf/engine/MessageImpl.h" +#include "qmf/engine/SchemaImpl.h" +#include "qmf/engine/Typecode.h" +#include "qmf/engine/ObjectImpl.h" +#include "qmf/engine/ObjectIdImpl.h" +#include "qmf/engine/QueryImpl.h" +#include "qmf/engine/ValueImpl.h" +#include "qmf/engine/Protocol.h" +#include "qmf/engine/SequenceManager.h" +#include "qmf/engine/BrokerProxyImpl.h" +#include <qpid/framing/Buffer.h> +#include <qpid/framing/Uuid.h> +#include <qpid/framing/FieldTable.h> +#include <qpid/framing/FieldValue.h> +#include <qpid/sys/Mutex.h> +#include <qpid/sys/Time.h> +#include <qpid/sys/SystemInfo.h> +#include <string.h> +#include <string> +#include <deque> +#include <map> +#include <vector> +#include <iostream> +#include <fstream> +#include <boost/shared_ptr.hpp> +#include <boost/noncopyable.hpp> + +namespace qmf { +namespace engine { + + struct ConsoleEventImpl { + typedef boost::shared_ptr<ConsoleEventImpl> Ptr; + ConsoleEvent::EventKind kind; + boost::shared_ptr<AgentProxy> agent; + std::string name; + const SchemaClassKey* classKey; + boost::shared_ptr<Object> object; + void* context; + Event* event; + uint64_t timestamp; + bool hasProps; + bool hasStats; + + ConsoleEventImpl(ConsoleEvent::EventKind k) : + kind(k), classKey(0), context(0), event(0), timestamp(0) {} + ~ConsoleEventImpl() {} + ConsoleEvent copy(); + }; + + class ConsoleImpl : public boost::noncopyable { + public: + ConsoleImpl(const ConsoleSettings& settings = ConsoleSettings()); + ~ConsoleImpl(); + + bool getEvent(ConsoleEvent& event) const; + void popEvent(); + + void addConnection(BrokerProxy& broker, void* context); + void delConnection(BrokerProxy& broker); + + uint32_t packageCount() const; + const std::string& getPackageName(uint32_t idx) const; + + uint32_t classCount(const char* packageName) const; + const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const; + + ClassKind getClassKind(const SchemaClassKey* key) const; + const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const; + const SchemaEventClass* getEventClass(const SchemaClassKey* key) const; + + void bindPackage(const char* packageName); + void bindClass(const SchemaClassKey* key); + void bindClass(const char* packageName, const char* className); + + /* + void startSync(const Query& query, void* context, SyncQuery& sync); + void touchSync(SyncQuery& sync); + void endSync(SyncQuery& sync); + */ + + private: + friend class BrokerProxyImpl; + friend struct StaticContext; + const ConsoleSettings& settings; + mutable qpid::sys::Mutex lock; + std::deque<ConsoleEventImpl::Ptr> eventQueue; + std::vector<BrokerProxyImpl*> brokerList; + std::vector<std::pair<std::string, std::string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE) + + // Declare a compare class for the class maps that compares the dereferenced + // class key pointers. The default behavior would be to compare the pointer + // addresses themselves. + struct KeyCompare { + bool operator()(const SchemaClassKey* left, const SchemaClassKey* right) const { + return *left < *right; + } + }; + + typedef std::map<const SchemaClassKey*, SchemaObjectClass*, KeyCompare> ObjectClassList; + typedef std::map<const SchemaClassKey*, SchemaEventClass*, KeyCompare> EventClassList; + typedef std::map<std::string, std::pair<ObjectClassList, EventClassList> > PackageList; + + PackageList packages; + + void learnPackage(const std::string& packageName); + void learnClass(SchemaObjectClass* cls); + void learnClass(SchemaEventClass* cls); + bool haveClass(const SchemaClassKey* key) const; + SchemaObjectClass* getSchema(const SchemaClassKey* key) const; + + void eventAgentAdded(boost::shared_ptr<AgentProxy> agent); + void eventAgentDeleted(boost::shared_ptr<AgentProxy> agent); + void eventNewPackage(const std::string& packageName); + void eventNewClass(const SchemaClassKey* key); + void eventObjectUpdate(ObjectPtr object, bool prop, bool stat); + void eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp); + }; +} +} + +#endif + diff --git a/qpid/cpp/src/qmf/MessageImpl.cpp b/qpid/cpp/src/qmf/engine/MessageImpl.cpp index f2625c7202..0047d3eb9d 100644 --- a/qpid/cpp/src/qmf/MessageImpl.cpp +++ b/qpid/cpp/src/qmf/engine/MessageImpl.cpp @@ -17,11 +17,11 @@ * under the License. */ -#include "qmf/MessageImpl.h" +#include "qmf/engine/MessageImpl.h" #include <string.h> using namespace std; -using namespace qmf; +using namespace qmf::engine; #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} diff --git a/qpid/cpp/src/qmf/MessageImpl.h b/qpid/cpp/src/qmf/engine/MessageImpl.h index 137f435699..b91291d2e4 100644 --- a/qpid/cpp/src/qmf/MessageImpl.h +++ b/qpid/cpp/src/qmf/engine/MessageImpl.h @@ -1,5 +1,5 @@ -#ifndef _QmfMessageImpl_ -#define _QmfMessageImpl_ +#ifndef _QmfEngineMessageImpl_ +#define _QmfEngineMessageImpl_ /* * Licensed to the Apache Software Foundation (ASF) under one @@ -20,11 +20,12 @@ * under the License. */ -#include "qmf/Message.h" +#include "qmf/engine/Message.h" #include <string> #include <boost/shared_ptr.hpp> namespace qmf { +namespace engine { struct MessageImpl { typedef boost::shared_ptr<MessageImpl> Ptr; @@ -38,5 +39,6 @@ namespace qmf { Message copy(); }; } +} #endif diff --git a/qpid/cpp/src/qmf/ObjectIdImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp index c0618ccc49..b08ae2756c 100644 --- a/qpid/cpp/src/qmf/ObjectIdImpl.cpp +++ b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp @@ -17,11 +17,11 @@ * under the License. */ -#include "qmf/ObjectIdImpl.h" +#include "qmf/engine/ObjectIdImpl.h" #include <stdlib.h> using namespace std; -using namespace qmf; +using namespace qmf::engine; using qpid::framing::Buffer; @@ -32,13 +32,12 @@ void AgentAttachment::setBanks(uint32_t broker, uint32_t agent) ((uint64_t) (agent & 0x0fffffff)); } -ObjectIdImpl::ObjectIdImpl(Buffer& buffer) : envelope(new ObjectId(this)), agent(0) +ObjectIdImpl::ObjectIdImpl(Buffer& buffer) : agent(0) { decode(buffer); } -ObjectIdImpl::ObjectIdImpl(AgentAttachment* a, uint8_t flags, uint16_t seq, uint64_t object) : - envelope(new ObjectId(this)), agent(a) +ObjectIdImpl::ObjectIdImpl(AgentAttachment* a, uint8_t flags, uint16_t seq, uint64_t object) : agent(a) { first = ((uint64_t) (flags & 0x0f)) << 60 | @@ -46,6 +45,18 @@ ObjectIdImpl::ObjectIdImpl(AgentAttachment* a, uint8_t flags, uint16_t seq, uint second = object; } +ObjectId* ObjectIdImpl::factory(Buffer& buffer) +{ + ObjectIdImpl* impl(new ObjectIdImpl(buffer)); + return new ObjectId(impl); +} + +ObjectId* ObjectIdImpl::factory(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object) +{ + ObjectIdImpl* impl(new ObjectIdImpl(agent, flags, seq, object)); + return new ObjectId(impl); +} + void ObjectIdImpl::decode(Buffer& buffer) { first = buffer.getLongLong(); @@ -100,13 +111,14 @@ void ObjectIdImpl::fromString(const std::string& repr) agent = 0; } -std::string ObjectIdImpl::asString() const +const string& ObjectIdImpl::asString() const { stringstream val; - val << getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" << + val << (int) getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" << getAgentBank() << "-" << getObjectNum(); - return val.str(); + repr = val.str(); + return repr; } bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const @@ -135,58 +147,22 @@ bool ObjectIdImpl::operator>(const ObjectIdImpl& other) const // Wrappers //================================================================== -ObjectId::ObjectId() : impl(new ObjectIdImpl(this)) {} - +ObjectId::ObjectId() : impl(new ObjectIdImpl()) {} ObjectId::ObjectId(const ObjectId& from) : impl(new ObjectIdImpl(*(from.impl))) {} - ObjectId::ObjectId(ObjectIdImpl* i) : impl(i) {} +ObjectId::~ObjectId() { delete impl; } +uint64_t ObjectId::getObjectNum() const { return impl->getObjectNum(); } +uint32_t ObjectId::getObjectNumHi() const { return impl->getObjectNumHi(); } +uint32_t ObjectId::getObjectNumLo() const { return impl->getObjectNumLo(); } +bool ObjectId::isDurable() const { return impl->isDurable(); } +const char* ObjectId::str() const { return impl->asString().c_str(); } +uint8_t ObjectId::getFlags() const { return impl->getFlags(); } +uint16_t ObjectId::getSequence() const { return impl->getSequence(); } +uint32_t ObjectId::getBrokerBank() const { return impl->getBrokerBank(); } +uint32_t ObjectId::getAgentBank() const { return impl->getAgentBank(); } +bool ObjectId::operator==(const ObjectId& other) const { return *impl == *other.impl; } +bool ObjectId::operator<(const ObjectId& other) const { return *impl < *other.impl; } +bool ObjectId::operator>(const ObjectId& other) const { return *impl > *other.impl; } +bool ObjectId::operator<=(const ObjectId& other) const { return !(*impl > *other.impl); } +bool ObjectId::operator>=(const ObjectId& other) const { return !(*impl < *other.impl); } -ObjectId::~ObjectId() -{ - delete impl; -} - -uint64_t ObjectId::getObjectNum() const -{ - return impl->getObjectNum(); -} - -uint32_t ObjectId::getObjectNumHi() const -{ - return impl->getObjectNumHi(); -} - -uint32_t ObjectId::getObjectNumLo() const -{ - return impl->getObjectNumLo(); -} - -bool ObjectId::isDurable() const -{ - return impl->isDurable(); -} - -bool ObjectId::operator==(const ObjectId& other) const -{ - return *impl == *other.impl; -} - -bool ObjectId::operator<(const ObjectId& other) const -{ - return *impl < *other.impl; -} - -bool ObjectId::operator>(const ObjectId& other) const -{ - return *impl > *other.impl; -} - -bool ObjectId::operator<=(const ObjectId& other) const -{ - return !(*impl > *other.impl); -} - -bool ObjectId::operator>=(const ObjectId& other) const -{ - return !(*impl < *other.impl); -} diff --git a/qpid/cpp/src/qmf/ObjectIdImpl.h b/qpid/cpp/src/qmf/engine/ObjectIdImpl.h index 38d231237f..d9871ac217 100644 --- a/qpid/cpp/src/qmf/ObjectIdImpl.h +++ b/qpid/cpp/src/qmf/engine/ObjectIdImpl.h @@ -1,5 +1,5 @@ -#ifndef _QmfObjectIdImpl_ -#define _QmfObjectIdImpl_ +#ifndef _QmfEngineObjectIdImpl_ +#define _QmfEngineObjectIdImpl_ /* * Licensed to the Apache Software Foundation (ASF) under one @@ -20,10 +20,11 @@ * under the License. */ -#include <qmf/ObjectId.h> +#include <qmf/engine/ObjectId.h> #include <qpid/framing/Buffer.h> namespace qmf { +namespace engine { struct AgentAttachment { uint64_t first; @@ -34,19 +35,22 @@ namespace qmf { }; struct ObjectIdImpl { - ObjectId* envelope; AgentAttachment* agent; uint64_t first; uint64_t second; + mutable std::string repr; - ObjectIdImpl(ObjectId* e) : envelope(e), agent(0), first(0), second(0) {} + ObjectIdImpl() : agent(0), first(0), second(0) {} ObjectIdImpl(qpid::framing::Buffer& buffer); ObjectIdImpl(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object); + static ObjectId* factory(qpid::framing::Buffer& buffer); + static ObjectId* factory(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object); + void decode(qpid::framing::Buffer& buffer); void encode(qpid::framing::Buffer& buffer) const; void fromString(const std::string& repr); - std::string asString() const; + const std::string& asString() const; uint8_t getFlags() const { return (first & 0xF000000000000000LL) >> 60; } uint16_t getSequence() const { return (first & 0x0FFF000000000000LL) >> 48; } uint32_t getBrokerBank() const { return (first & 0x0000FFFFF0000000LL) >> 28; } @@ -62,6 +66,7 @@ namespace qmf { bool operator>(const ObjectIdImpl& other) const; }; } +} #endif diff --git a/qpid/cpp/src/qmf/ObjectImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp index 1ea2d54527..cae0e0da68 100644 --- a/qpid/cpp/src/qmf/ObjectImpl.cpp +++ b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp @@ -17,18 +17,17 @@ * under the License. */ -#include "qmf/ObjectImpl.h" -#include "qmf/ValueImpl.h" +#include "qmf/engine/ObjectImpl.h" +#include "qmf/engine/ValueImpl.h" +#include "qmf/engine/BrokerProxyImpl.h" #include <qpid/sys/Time.h> using namespace std; -using namespace qmf; +using namespace qmf::engine; using namespace qpid::sys; using qpid::framing::Buffer; -ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) : - envelope(e), objectClass(type), createTime(uint64_t(Duration(now()))), - destroyTime(0), lastUpdatedTime(createTime) +ObjectImpl::ObjectImpl(const SchemaObjectClass* type) : objectClass(type), broker(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime) { int propCount = objectClass->getPropertyCount(); int statCount = objectClass->getStatisticCount(); @@ -45,8 +44,8 @@ ObjectImpl::ObjectImpl(Object* e, const SchemaObjectClass* type) : } } -ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop, bool stat, bool managed) : - envelope(new Object(this)), objectClass(type), createTime(0), destroyTime(0), lastUpdatedTime(0) +ObjectImpl::ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) : + objectClass(type), broker(b), createTime(0), destroyTime(0), lastUpdatedTime(0) { int idx; @@ -54,7 +53,7 @@ ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop, lastUpdatedTime = buffer.getLongLong(); createTime = buffer.getLongLong(); destroyTime = buffer.getLongLong(); - objectId.reset(new ObjectIdImpl(buffer)); + objectId.reset(ObjectIdImpl::factory(buffer)); } if (prop) { @@ -66,8 +65,8 @@ ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop, if (excludes.count(prop->getName()) != 0) { properties[prop->getName()] = ValuePtr(new Value(prop->getType())); } else { - ValueImpl* pval = new ValueImpl(prop->getType(), buffer); - properties[prop->getName()] = ValuePtr(pval->envelope); + Value* pval = ValueImpl::factory(prop->getType(), buffer); + properties[prop->getName()] = ValuePtr(pval); } } } @@ -76,12 +75,18 @@ ObjectImpl::ObjectImpl(const SchemaObjectClass* type, Buffer& buffer, bool prop, int statCount = objectClass->getStatisticCount(); for (idx = 0; idx < statCount; idx++) { const SchemaStatistic* stat = objectClass->getStatistic(idx); - ValueImpl* sval = new ValueImpl(stat->getType(), buffer); - statistics[stat->getName()] = ValuePtr(sval->envelope); + Value* sval = ValueImpl::factory(stat->getType(), buffer); + statistics[stat->getName()] = ValuePtr(sval); } } } +Object* ObjectImpl::factory(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) +{ + ObjectImpl* impl(new ObjectImpl(type, b, buffer, prop, stat, managed)); + return new Object(impl); +} + ObjectImpl::~ObjectImpl() { } @@ -107,6 +112,22 @@ Value* ObjectImpl::getValue(const string& key) const return 0; } +void ObjectImpl::invokeMethod(const string& methodName, const Value* inArgs, void* context) const +{ + if (broker != 0 && objectId.get() != 0) + broker->sendMethodRequest(objectId.get(), objectClass, methodName, inArgs, context); +} + +void ObjectImpl::merge(const Object& from) +{ + for (map<string, ValuePtr>::const_iterator piter = from.impl->properties.begin(); + piter != from.impl->properties.end(); piter++) + properties[piter->first] = piter->second; + for (map<string, ValuePtr>::const_iterator siter = from.impl->statistics.begin(); + siter != from.impl->statistics.end(); siter++) + statistics[siter->first] = siter->second; +} + void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList) { int propCount = objectClass->getPropertyCount(); @@ -143,7 +164,7 @@ void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const buffer.putLongLong(lastUpdatedTime); buffer.putLongLong(createTime); buffer.putLongLong(destroyTime); - objectId->encode(buffer); + objectId->impl->encode(buffer); } void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const @@ -196,7 +217,7 @@ void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const // Wrappers //================================================================== -Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(this, type)) {} +Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(type)) {} Object::Object(ObjectImpl* i) : impl(i) {} Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {} Object::~Object() { delete impl; } @@ -204,5 +225,8 @@ void Object::destroy() { impl->destroy(); } const ObjectId* Object::getObjectId() const { return impl->getObjectId(); } void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); } const SchemaObjectClass* Object::getClass() const { return impl->getClass(); } -Value* Object::getValue(char* key) const { return impl->getValue(key); } +Value* Object::getValue(const char* key) const { return impl->getValue(key); } +void Object::invokeMethod(const char* m, const Value* a, void* c) const { impl->invokeMethod(m, a, c); } +bool Object::isDeleted() const { return impl->isDeleted(); } +void Object::merge(const Object& from) { impl->merge(from); } diff --git a/qpid/cpp/src/qmf/ObjectImpl.h b/qpid/cpp/src/qmf/engine/ObjectImpl.h index d69979e0da..ddd20bfea2 100644 --- a/qpid/cpp/src/qmf/ObjectImpl.h +++ b/qpid/cpp/src/qmf/engine/ObjectImpl.h @@ -1,5 +1,5 @@ -#ifndef _QmfObjectImpl_ -#define _QmfObjectImpl_ +#ifndef _QmfEngineObjectImpl_ +#define _QmfEngineObjectImpl_ /* * Licensed to the Apache Software Foundation (ASF) under one @@ -20,8 +20,8 @@ * under the License. */ -#include <qmf/Object.h> -#include <qmf/ObjectIdImpl.h> +#include <qmf/engine/Object.h> +#include <qmf/engine/ObjectIdImpl.h> #include <map> #include <set> #include <string> @@ -30,28 +30,38 @@ #include <qpid/sys/Mutex.h> namespace qmf { +namespace engine { + + class BrokerProxyImpl; + + typedef boost::shared_ptr<Object> ObjectPtr; struct ObjectImpl { - typedef boost::shared_ptr<ObjectImpl> Ptr; typedef boost::shared_ptr<Value> ValuePtr; - Object* envelope; const SchemaObjectClass* objectClass; - boost::shared_ptr<ObjectIdImpl> objectId; + BrokerProxyImpl* broker; + boost::shared_ptr<ObjectId> objectId; uint64_t createTime; uint64_t destroyTime; uint64_t lastUpdatedTime; mutable std::map<std::string, ValuePtr> properties; mutable std::map<std::string, ValuePtr> statistics; - ObjectImpl(Object* e, const SchemaObjectClass* type); - ObjectImpl(const SchemaObjectClass* type, qpid::framing::Buffer& buffer, bool prop, bool stat, bool managed); + ObjectImpl(const SchemaObjectClass* type); + ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer, + bool prop, bool stat, bool managed); + static Object* factory(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer, + bool prop, bool stat, bool managed); ~ObjectImpl(); void destroy(); - const ObjectId* getObjectId() const { return objectId.get() ? objectId->envelope : 0; } - void setObjectId(ObjectId* oid) { objectId.reset(oid->impl); } + const ObjectId* getObjectId() const { return objectId.get(); } + void setObjectId(ObjectId* oid) { objectId.reset(oid); } const SchemaObjectClass* getClass() const { return objectClass; } Value* getValue(const std::string& key) const; + void invokeMethod(const std::string& methodName, const Value* inArgs, void* context) const; + bool isDeleted() const { return destroyTime != 0; } + void merge(const Object& from); void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList); void encodeSchemaKey(qpid::framing::Buffer& buffer) const; @@ -60,6 +70,7 @@ namespace qmf { void encodeStatistics(qpid::framing::Buffer& buffer) const; }; } +} #endif diff --git a/qpid/cpp/src/qmf/Protocol.cpp b/qpid/cpp/src/qmf/engine/Protocol.cpp index 0a3beeb276..6061b70a8d 100644 --- a/qpid/cpp/src/qmf/Protocol.cpp +++ b/qpid/cpp/src/qmf/engine/Protocol.cpp @@ -17,11 +17,11 @@ * under the License. */ -#include "qmf/Protocol.h" +#include "qmf/engine/Protocol.h" #include "qpid/framing/Buffer.h" using namespace std; -using namespace qmf; +using namespace qmf::engine; using namespace qpid::framing; diff --git a/qpid/cpp/src/qmf/Protocol.h b/qpid/cpp/src/qmf/engine/Protocol.h index d5da08c1db..1cdfa60c84 100644 --- a/qpid/cpp/src/qmf/Protocol.h +++ b/qpid/cpp/src/qmf/engine/Protocol.h @@ -1,5 +1,5 @@ -#ifndef _QmfProtocol_ -#define _QmfProtocol_ +#ifndef _QmfEngineProtocol_ +#define _QmfEngineProtocol_ /* * Licensed to the Apache Software Foundation (ASF) under one @@ -29,6 +29,7 @@ namespace qpid { } namespace qmf { +namespace engine { class Protocol { public: @@ -62,6 +63,7 @@ namespace qmf { }; } +} #endif diff --git a/qpid/cpp/src/qmf/QueryImpl.cpp b/qpid/cpp/src/qmf/engine/QueryImpl.cpp index f75a9aa5d5..6f2beeee87 100644 --- a/qpid/cpp/src/qmf/QueryImpl.cpp +++ b/qpid/cpp/src/qmf/engine/QueryImpl.cpp @@ -17,13 +17,13 @@ * under the License. */ -#include "qmf/QueryImpl.h" -#include "qmf/ObjectIdImpl.h" +#include "qmf/engine/QueryImpl.h" +#include "qmf/engine/ObjectIdImpl.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/FieldTable.h" using namespace std; -using namespace qmf; +using namespace qmf::engine; using namespace qpid::framing; bool QueryElementImpl::evaluate(const Object* /*object*/) const @@ -45,6 +45,12 @@ QueryImpl::QueryImpl(Buffer& buffer) // TODO } +Query* QueryImpl::factory(Buffer& buffer) +{ + QueryImpl* impl(new QueryImpl(buffer)); + return new Query(impl); +} + void QueryImpl::encode(Buffer& buffer) const { FieldTable ft; @@ -69,14 +75,17 @@ QueryElement::QueryElement(const char* attrName, const Value* value, ValueOper o QueryElement::QueryElement(QueryElementImpl* i) : impl(i) {} QueryElement::~QueryElement() { delete impl; } bool QueryElement::evaluate(const Object* object) const { return impl->evaluate(object); } + QueryExpression::QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2) : impl(new QueryExpressionImpl(oper, operand1, operand2)) {} QueryExpression::QueryExpression(QueryExpressionImpl* i) : impl(i) {} QueryExpression::~QueryExpression() { delete impl; } bool QueryExpression::evaluate(const Object* object) const { return impl->evaluate(object); } + Query::Query(const char* className, const char* packageName) : impl(new QueryImpl(className, packageName)) {} Query::Query(const SchemaClassKey* key) : impl(new QueryImpl(key)) {} Query::Query(const ObjectId* oid) : impl(new QueryImpl(oid)) {} Query::Query(QueryImpl* i) : impl(i) {} +Query::Query(const Query& from) : impl(new QueryImpl(*(from.impl))) {} Query::~Query() { delete impl; } void Query::setSelect(const QueryOperand* criterion) { impl->setSelect(criterion); } void Query::setLimit(uint32_t maxResults) { impl->setLimit(maxResults); } diff --git a/qpid/cpp/src/qmf/QueryImpl.h b/qpid/cpp/src/qmf/engine/QueryImpl.h index 4a56a457c0..2c64c6739c 100644 --- a/qpid/cpp/src/qmf/QueryImpl.h +++ b/qpid/cpp/src/qmf/engine/QueryImpl.h @@ -1,5 +1,5 @@ -#ifndef _QmfQueryImpl_ -#define _QmfQueryImpl_ +#ifndef _QmfEngineQueryImpl_ +#define _QmfEngineQueryImpl_ /* * Licensed to the Apache Software Foundation (ASF) under one @@ -20,8 +20,8 @@ * under the License. */ -#include "qmf/Query.h" -#include "qmf/Schema.h" +#include "qmf/engine/Query.h" +#include "qmf/engine/Schema.h" #include <string> #include <boost/shared_ptr.hpp> @@ -32,41 +32,39 @@ namespace qpid { } namespace qmf { +namespace engine { struct QueryElementImpl { - QueryElementImpl(const std::string& a, const Value* v, ValueOper o) : - envelope(new QueryElement(this)), attrName(a), value(v), oper(o) {} + QueryElementImpl(const std::string& a, const Value* v, ValueOper o) : attrName(a), value(v), oper(o) {} ~QueryElementImpl() {} bool evaluate(const Object* object) const; - QueryElement* envelope; std::string attrName; const Value* value; ValueOper oper; }; struct QueryExpressionImpl { - QueryExpressionImpl(ExprOper o, const QueryOperand* operand1, const QueryOperand* operand2) : - envelope(new QueryExpression(this)), oper(o), left(operand1), right(operand2) {} + QueryExpressionImpl(ExprOper o, const QueryOperand* operand1, const QueryOperand* operand2) : oper(o), left(operand1), right(operand2) {} ~QueryExpressionImpl() {} bool evaluate(const Object* object) const; - QueryExpression* envelope; ExprOper oper; const QueryOperand* left; const QueryOperand* right; }; struct QueryImpl { - QueryImpl(Query* e) : envelope(e), select(0) {} - QueryImpl(const std::string& c, const std::string& p) : - envelope(new Query(this)), packageName(p), className(c) {} - QueryImpl(const SchemaClassKey* key) : - envelope(new Query(this)), packageName(key->getPackageName()), className(key->getClassName()) {} - QueryImpl(const ObjectId* oid) : - envelope(new Query(this)), oid(new ObjectId(*oid)) {} + // Constructors mapped to public + QueryImpl(const std::string& c, const std::string& p) : packageName(p), className(c), select(0), resultLimit(0) {} + QueryImpl(const SchemaClassKey* key) : packageName(key->getPackageName()), className(key->getClassName()), select(0), resultLimit(0) {} + QueryImpl(const ObjectId* oid) : oid(new ObjectId(*oid)), select(0), resultLimit(0) {} + + // Factory constructors QueryImpl(qpid::framing::Buffer& buffer); + ~QueryImpl() {}; + static Query* factory(qpid::framing::Buffer& buffer); void setSelect(const QueryOperand* criterion) { select = criterion; } void setLimit(uint32_t maxResults) { resultLimit = maxResults; } @@ -88,7 +86,6 @@ namespace qmf { void encode(qpid::framing::Buffer& buffer) const; - Query* envelope; std::string packageName; std::string className; boost::shared_ptr<ObjectId> oid; @@ -98,5 +95,6 @@ namespace qmf { bool orderDecreasing; }; } +} #endif diff --git a/qpid/cpp/src/qmf/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp index 7ec03cf4da..9502130288 100644 --- a/qpid/cpp/src/qmf/ResilientConnection.cpp +++ b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp @@ -17,9 +17,9 @@ * under the License. */ -#include "qmf/ResilientConnection.h" -#include "qmf/MessageImpl.h" -#include "qmf/ConnectionSettingsImpl.h" +#include "qmf/engine/ResilientConnection.h" +#include "qmf/engine/MessageImpl.h" +#include "qmf/engine/ConnectionSettingsImpl.h" #include <qpid/client/Connection.h> #include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> @@ -38,13 +38,15 @@ #include <vector> #include <set> #include <boost/intrusive_ptr.hpp> +#include <boost/noncopyable.hpp> using namespace std; -using namespace qmf; +using namespace qmf::engine; using namespace qpid; using qpid::sys::Mutex; namespace qmf { +namespace engine { struct ResilientConnectionEventImpl { ResilientConnectionEvent::EventKind kind; void* sessionContext; @@ -64,20 +66,19 @@ namespace qmf { client::Connection& connection; client::Session session; client::SubscriptionManager* subscriptions; + string userId; void* userContext; vector<string> dests; qpid::sys::Thread thread; - RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) : - connImpl(ci), name(n), connection(c), session(connection.newSession(name)), - subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this) {} + RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc); ~RCSession(); void received(client::Message& msg); void run(); void stop(); }; - class ResilientConnectionImpl : public qpid::sys::Runnable { + class ResilientConnectionImpl : public qpid::sys::Runnable, public boost::noncopyable { public: ResilientConnectionImpl(const ConnectionSettings& settings); ~ResilientConnectionImpl(); @@ -87,7 +88,7 @@ namespace qmf { void popEvent(); bool createSession(const char* name, void* sessionContext, SessionHandle& handle); void destroySession(SessionHandle handle); - void sendMessage(SessionHandle handle, qmf::Message& message); + void sendMessage(SessionHandle handle, qmf::engine::Message& message); void declareQueue(SessionHandle handle, char* queue); void deleteQueue(SessionHandle handle, char* queue); void bind(SessionHandle handle, char* exchange, char* queue, char* key); @@ -120,6 +121,7 @@ namespace qmf { set<RCSession::Ptr> sessions; }; } +} ResilientConnectionEvent ResilientConnectionEventImpl::copy() { @@ -134,6 +136,14 @@ ResilientConnectionEvent ResilientConnectionEventImpl::copy() return item; } +RCSession::RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) : + connImpl(ci), name(n), connection(c), session(connection.newSession(name)), + subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this) +{ + const qpid::client::ConnectionSettings& operSettings = connection.getNegotiatedSettings(); + userId = operSettings.username; +} + RCSession::~RCSession() { subscriptions->stop(); @@ -158,18 +168,23 @@ void RCSession::stop() void RCSession::received(client::Message& msg) { - qmf::MessageImpl qmsg; + MessageImpl qmsg; qmsg.body = msg.getData(); - qpid::framing::MessageProperties p = msg.getMessageProperties(); - if (p.hasReplyTo()) { - const qpid::framing::ReplyTo& rt = p.getReplyTo(); + qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties(); + if (dp.hasRoutingKey()) { + qmsg.routingKey = dp.getRoutingKey(); + } + + qpid::framing::MessageProperties mp = msg.getMessageProperties(); + if (mp.hasReplyTo()) { + const qpid::framing::ReplyTo& rt = mp.getReplyTo(); qmsg.replyExchange = rt.getExchange(); qmsg.replyKey = rt.getRoutingKey(); } - if (p.hasUserId()) { - qmsg.userId = p.getUserId(); + if (mp.hasUserId()) { + qmsg.userId = mp.getUserId(); } connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg); @@ -244,7 +259,7 @@ void ResilientConnectionImpl::destroySession(SessionHandle handle) } } -void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& message) +void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::engine::Message& message) { Mutex::ScopedLock _lock(lock); RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl); @@ -253,6 +268,8 @@ void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& me string data(message.body, message.length); msg.getDeliveryProperties().setRoutingKey(message.routingKey); msg.getMessageProperties().setReplyTo(qpid::framing::ReplyTo(message.replyExchange, message.replyKey)); + if (settings.impl->getSendUserId()) + msg.getMessageProperties().setUserId(sess->userId); msg.setData(data); try { @@ -383,7 +400,7 @@ void ResilientConnectionImpl::sessionClosed(RCSession*) void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind kind, void* sessionContext, - const qmf::MessageImpl& message, + const MessageImpl& message, const string& errorText) { Mutex::ScopedLock _lock(lock); @@ -440,7 +457,7 @@ void ResilientConnection::destroySession(SessionHandle handle) impl->destroySession(handle); } -void ResilientConnection::sendMessage(SessionHandle handle, qmf::Message& message) +void ResilientConnection::sendMessage(SessionHandle handle, qmf::engine::Message& message) { impl->sendMessage(handle, message); } diff --git a/qpid/cpp/src/qmf/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp index 3eb14c3952..e366a66826 100644 --- a/qpid/cpp/src/qmf/SchemaImpl.cpp +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp @@ -17,7 +17,7 @@ * under the License. */ -#include "qmf/SchemaImpl.h" +#include "qmf/engine/SchemaImpl.h" #include <qpid/framing/Buffer.h> #include <qpid/framing/FieldTable.h> #include <qpid/framing/Uuid.h> @@ -26,7 +26,7 @@ #include <vector> using namespace std; -using namespace qmf; +using namespace qmf::engine; using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::framing::Uuid; @@ -81,7 +81,7 @@ bool SchemaHash::operator>(const SchemaHash& other) const return ::memcmp(&hash, &other.hash, 16) > 0; } -SchemaArgumentImpl::SchemaArgumentImpl(Buffer& buffer) : envelope(new SchemaArgument(this)) +SchemaArgumentImpl::SchemaArgumentImpl(Buffer& buffer) { FieldTable map; map.decode(buffer); @@ -99,6 +99,12 @@ SchemaArgumentImpl::SchemaArgumentImpl(Buffer& buffer) : envelope(new SchemaArgu dir = DIR_IN_OUT; } +SchemaArgument* SchemaArgumentImpl::factory(Buffer& buffer) +{ + SchemaArgumentImpl* impl(new SchemaArgumentImpl(buffer)); + return new SchemaArgument(impl); +} + void SchemaArgumentImpl::encode(Buffer& buffer) const { FieldTable map; @@ -128,7 +134,7 @@ void SchemaArgumentImpl::updateHash(SchemaHash& hash) const hash.update(description); } -SchemaMethodImpl::SchemaMethodImpl(Buffer& buffer) : envelope(new SchemaMethod(this)) +SchemaMethodImpl::SchemaMethodImpl(Buffer& buffer) { FieldTable map; int argCount; @@ -139,11 +145,17 @@ SchemaMethodImpl::SchemaMethodImpl(Buffer& buffer) : envelope(new SchemaMethod(t description = map.getAsString("desc"); for (int idx = 0; idx < argCount; idx++) { - SchemaArgumentImpl* arg = new SchemaArgumentImpl(buffer); - addArgument(*arg->envelope); + SchemaArgument* arg = SchemaArgumentImpl::factory(buffer); + addArgument(arg); } } +SchemaMethod* SchemaMethodImpl::factory(Buffer& buffer) +{ + SchemaMethodImpl* impl(new SchemaMethodImpl(buffer)); + return new SchemaMethod(impl); +} + void SchemaMethodImpl::encode(Buffer& buffer) const { FieldTable map; @@ -154,23 +166,23 @@ void SchemaMethodImpl::encode(Buffer& buffer) const map.setString("desc", description); map.encode(buffer); - for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin(); + for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++) - (*iter)->encode(buffer); + (*iter)->impl->encode(buffer); } -void SchemaMethodImpl::addArgument(const SchemaArgument& argument) +void SchemaMethodImpl::addArgument(const SchemaArgument* argument) { - arguments.push_back(argument.impl); + arguments.push_back(argument); } const SchemaArgument* SchemaMethodImpl::getArgument(int idx) const { int count = 0; - for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin(); + for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++, count++) if (idx == count) - return (*iter)->envelope; + return (*iter); return 0; } @@ -178,12 +190,12 @@ void SchemaMethodImpl::updateHash(SchemaHash& hash) const { hash.update(name); hash.update(description); - for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin(); + for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++) - (*iter)->updateHash(hash); + (*iter)->impl->updateHash(hash); } -SchemaPropertyImpl::SchemaPropertyImpl(Buffer& buffer) : envelope(new SchemaProperty(this)) +SchemaPropertyImpl::SchemaPropertyImpl(Buffer& buffer) { FieldTable map; map.decode(buffer); @@ -197,6 +209,12 @@ SchemaPropertyImpl::SchemaPropertyImpl(Buffer& buffer) : envelope(new SchemaProp description = map.getAsString("desc"); } +SchemaProperty* SchemaPropertyImpl::factory(Buffer& buffer) +{ + SchemaPropertyImpl* impl(new SchemaPropertyImpl(buffer)); + return new SchemaProperty(impl); +} + void SchemaPropertyImpl::encode(Buffer& buffer) const { FieldTable map; @@ -225,7 +243,7 @@ void SchemaPropertyImpl::updateHash(SchemaHash& hash) const hash.update(description); } -SchemaStatisticImpl::SchemaStatisticImpl(Buffer& buffer) : envelope(new SchemaStatistic(this)) +SchemaStatisticImpl::SchemaStatisticImpl(Buffer& buffer) { FieldTable map; map.decode(buffer); @@ -236,6 +254,12 @@ SchemaStatisticImpl::SchemaStatisticImpl(Buffer& buffer) : envelope(new SchemaSt description = map.getAsString("desc"); } +SchemaStatistic* SchemaStatisticImpl::factory(Buffer& buffer) +{ + SchemaStatisticImpl* impl(new SchemaStatisticImpl(buffer)); + return new SchemaStatistic(impl); +} + void SchemaStatisticImpl::encode(Buffer& buffer) const { FieldTable map; @@ -258,16 +282,26 @@ void SchemaStatisticImpl::updateHash(SchemaHash& hash) const hash.update(description); } -SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) : - envelope(new SchemaClassKey(this)), package(p), name(n), hash(h) {} +SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) : package(p), name(n), hash(h) {} -SchemaClassKeyImpl::SchemaClassKeyImpl(Buffer& buffer) : - envelope(new SchemaClassKey(this)), package(packageContainer), name(nameContainer), hash(hashContainer) +SchemaClassKeyImpl::SchemaClassKeyImpl(Buffer& buffer) : package(packageContainer), name(nameContainer), hash(hashContainer) { buffer.getShortString(packageContainer); buffer.getShortString(nameContainer); hashContainer.decode(buffer); -} +} + +SchemaClassKey* SchemaClassKeyImpl::factory(const string& package, const string& name, const SchemaHash& hash) +{ + SchemaClassKeyImpl* impl(new SchemaClassKeyImpl(package, name, hash)); + return new SchemaClassKey(impl); +} + +SchemaClassKey* SchemaClassKeyImpl::factory(Buffer& buffer) +{ + SchemaClassKeyImpl* impl(new SchemaClassKeyImpl(buffer)); + return new SchemaClassKey(impl); +} void SchemaClassKeyImpl::encode(Buffer& buffer) const { @@ -292,16 +326,16 @@ bool SchemaClassKeyImpl::operator<(const SchemaClassKeyImpl& other) const return hash < other.hash; } -string SchemaClassKeyImpl::str() const +const string& SchemaClassKeyImpl::str() const { Uuid printableHash(hash.get()); stringstream str; str << package << ":" << name << "(" << printableHash << ")"; - return str.str(); + repr = str.str(); + return repr; } -SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : - envelope(new SchemaObjectClass(this)), hasHash(true), classKey(package, name, hash) +SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash)) { buffer.getShortString(package); buffer.getShortString(name); @@ -313,21 +347,27 @@ SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : uint16_t methodCount = buffer.getShort(); for (uint16_t idx = 0; idx < propCount; idx++) { - SchemaPropertyImpl* property = new SchemaPropertyImpl(buffer); - addProperty(*property->envelope); + const SchemaProperty* property = SchemaPropertyImpl::factory(buffer); + addProperty(property); } for (uint16_t idx = 0; idx < statCount; idx++) { - SchemaStatisticImpl* statistic = new SchemaStatisticImpl(buffer); - addStatistic(*statistic->envelope); + const SchemaStatistic* statistic = SchemaStatisticImpl::factory(buffer); + addStatistic(statistic); } for (uint16_t idx = 0; idx < methodCount; idx++) { - SchemaMethodImpl* method = new SchemaMethodImpl(buffer); - addMethod(*method->envelope); + SchemaMethod* method = SchemaMethodImpl::factory(buffer); + addMethod(method); } } +SchemaObjectClass* SchemaObjectClassImpl::factory(Buffer& buffer) +{ + SchemaObjectClassImpl* impl(new SchemaObjectClassImpl(buffer)); + return new SchemaObjectClass(impl); +} + void SchemaObjectClassImpl::encode(Buffer& buffer) const { buffer.putOctet((uint8_t) CLASS_OBJECT); @@ -339,15 +379,15 @@ void SchemaObjectClassImpl::encode(Buffer& buffer) const buffer.putShort((uint16_t) statistics.size()); buffer.putShort((uint16_t) methods.size()); - for (vector<SchemaPropertyImpl*>::const_iterator iter = properties.begin(); + for (vector<const SchemaProperty*>::const_iterator iter = properties.begin(); iter != properties.end(); iter++) - (*iter)->encode(buffer); - for (vector<SchemaStatisticImpl*>::const_iterator iter = statistics.begin(); + (*iter)->impl->encode(buffer); + for (vector<const SchemaStatistic*>::const_iterator iter = statistics.begin(); iter != statistics.end(); iter++) - (*iter)->encode(buffer); - for (vector<SchemaMethodImpl*>::const_iterator iter = methods.begin(); + (*iter)->impl->encode(buffer); + for (vector<const SchemaMethod*>::const_iterator iter = methods.begin(); iter != methods.end(); iter++) - (*iter)->encode(buffer); + (*iter)->impl->encode(buffer); } const SchemaClassKey* SchemaObjectClassImpl::getClassKey() const @@ -356,67 +396,66 @@ const SchemaClassKey* SchemaObjectClassImpl::getClassKey() const hasHash = true; hash.update(package); hash.update(name); - for (vector<SchemaPropertyImpl*>::const_iterator iter = properties.begin(); + for (vector<const SchemaProperty*>::const_iterator iter = properties.begin(); iter != properties.end(); iter++) - (*iter)->updateHash(hash); - for (vector<SchemaStatisticImpl*>::const_iterator iter = statistics.begin(); + (*iter)->impl->updateHash(hash); + for (vector<const SchemaStatistic*>::const_iterator iter = statistics.begin(); iter != statistics.end(); iter++) - (*iter)->updateHash(hash); - for (vector<SchemaMethodImpl*>::const_iterator iter = methods.begin(); + (*iter)->impl->updateHash(hash); + for (vector<const SchemaMethod*>::const_iterator iter = methods.begin(); iter != methods.end(); iter++) - (*iter)->updateHash(hash); + (*iter)->impl->updateHash(hash); } - return classKey.envelope; + return classKey.get(); } -void SchemaObjectClassImpl::addProperty(const SchemaProperty& property) +void SchemaObjectClassImpl::addProperty(const SchemaProperty* property) { - properties.push_back(property.impl); + properties.push_back(property); } -void SchemaObjectClassImpl::addStatistic(const SchemaStatistic& statistic) +void SchemaObjectClassImpl::addStatistic(const SchemaStatistic* statistic) { - statistics.push_back(statistic.impl); + statistics.push_back(statistic); } -void SchemaObjectClassImpl::addMethod(const SchemaMethod& method) +void SchemaObjectClassImpl::addMethod(const SchemaMethod* method) { - methods.push_back(method.impl); + methods.push_back(method); } const SchemaProperty* SchemaObjectClassImpl::getProperty(int idx) const { int count = 0; - for (vector<SchemaPropertyImpl*>::const_iterator iter = properties.begin(); + for (vector<const SchemaProperty*>::const_iterator iter = properties.begin(); iter != properties.end(); iter++, count++) if (idx == count) - return (*iter)->envelope; + return *iter; return 0; } const SchemaStatistic* SchemaObjectClassImpl::getStatistic(int idx) const { int count = 0; - for (vector<SchemaStatisticImpl*>::const_iterator iter = statistics.begin(); + for (vector<const SchemaStatistic*>::const_iterator iter = statistics.begin(); iter != statistics.end(); iter++, count++) if (idx == count) - return (*iter)->envelope; + return *iter; return 0; } const SchemaMethod* SchemaObjectClassImpl::getMethod(int idx) const { int count = 0; - for (vector<SchemaMethodImpl*>::const_iterator iter = methods.begin(); + for (vector<const SchemaMethod*>::const_iterator iter = methods.begin(); iter != methods.end(); iter++, count++) if (idx == count) - return (*iter)->envelope; + return *iter; return 0; } -SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : - envelope(new SchemaEventClass(this)), hasHash(true), classKey(package, name, hash) +SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash)) { buffer.getShortString(package); buffer.getShortString(name); @@ -426,11 +465,17 @@ SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : uint16_t argCount = buffer.getShort(); for (uint16_t idx = 0; idx < argCount; idx++) { - SchemaArgumentImpl* argument = new SchemaArgumentImpl(buffer); - addArgument(*argument->envelope); + SchemaArgument* argument = SchemaArgumentImpl::factory(buffer); + addArgument(argument); } } +SchemaEventClass* SchemaEventClassImpl::factory(Buffer& buffer) +{ + SchemaEventClassImpl* impl(new SchemaEventClassImpl(buffer)); + return new SchemaEventClass(impl); +} + void SchemaEventClassImpl::encode(Buffer& buffer) const { buffer.putOctet((uint8_t) CLASS_EVENT); @@ -439,9 +484,9 @@ void SchemaEventClassImpl::encode(Buffer& buffer) const hash.encode(buffer); buffer.putShort((uint16_t) arguments.size()); - for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin(); + for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++) - (*iter)->encode(buffer); + (*iter)->impl->encode(buffer); } const SchemaClassKey* SchemaEventClassImpl::getClassKey() const @@ -450,25 +495,25 @@ const SchemaClassKey* SchemaEventClassImpl::getClassKey() const hasHash = true; hash.update(package); hash.update(name); - for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin(); + for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++) - (*iter)->updateHash(hash); + (*iter)->impl->updateHash(hash); } - return classKey.envelope; + return classKey.get(); } -void SchemaEventClassImpl::addArgument(const SchemaArgument& argument) +void SchemaEventClassImpl::addArgument(const SchemaArgument* argument) { - arguments.push_back(argument.impl); + arguments.push_back(argument); } const SchemaArgument* SchemaEventClassImpl::getArgument(int idx) const { int count = 0; - for (vector<SchemaArgumentImpl*>::const_iterator iter = arguments.begin(); + for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++, count++) if (idx == count) - return (*iter)->envelope; + return (*iter); return 0; } @@ -477,8 +522,9 @@ const SchemaArgument* SchemaEventClassImpl::getArgument(int idx) const // Wrappers //================================================================== -SchemaArgument::SchemaArgument(const char* name, Typecode typecode) { impl = new SchemaArgumentImpl(this, name, typecode); } +SchemaArgument::SchemaArgument(const char* name, Typecode typecode) { impl = new SchemaArgumentImpl(name, typecode); } SchemaArgument::SchemaArgument(SchemaArgumentImpl* i) : impl(i) {} +SchemaArgument::SchemaArgument(const SchemaArgument& from) : impl(new SchemaArgumentImpl(*(from.impl))) {} SchemaArgument::~SchemaArgument() { delete impl; } void SchemaArgument::setDirection(Direction dir) { impl->setDirection(dir); } void SchemaArgument::setUnit(const char* val) { impl->setUnit(val); } @@ -488,17 +534,21 @@ Typecode SchemaArgument::getType() const { return impl->getType(); } Direction SchemaArgument::getDirection() const { return impl->getDirection(); } const char* SchemaArgument::getUnit() const { return impl->getUnit().c_str(); } const char* SchemaArgument::getDesc() const { return impl->getDesc().c_str(); } -SchemaMethod::SchemaMethod(const char* name) { impl = new SchemaMethodImpl(this, name); } + +SchemaMethod::SchemaMethod(const char* name) : impl(new SchemaMethodImpl(name)) {} SchemaMethod::SchemaMethod(SchemaMethodImpl* i) : impl(i) {} +SchemaMethod::SchemaMethod(const SchemaMethod& from) : impl(new SchemaMethodImpl(*(from.impl))) {} SchemaMethod::~SchemaMethod() { delete impl; } -void SchemaMethod::addArgument(const SchemaArgument& argument) { impl->addArgument(argument); } +void SchemaMethod::addArgument(const SchemaArgument* argument) { impl->addArgument(argument); } void SchemaMethod::setDesc(const char* desc) { impl->setDesc(desc); } const char* SchemaMethod::getName() const { return impl->getName().c_str(); } const char* SchemaMethod::getDesc() const { return impl->getDesc().c_str(); } int SchemaMethod::getArgumentCount() const { return impl->getArgumentCount(); } const SchemaArgument* SchemaMethod::getArgument(int idx) const { return impl->getArgument(idx); } -SchemaProperty::SchemaProperty(const char* name, Typecode typecode) { impl = new SchemaPropertyImpl(this, name, typecode); } + +SchemaProperty::SchemaProperty(const char* name, Typecode typecode) : impl(new SchemaPropertyImpl(name, typecode)) {} SchemaProperty::SchemaProperty(SchemaPropertyImpl* i) : impl(i) {} +SchemaProperty::SchemaProperty(const SchemaProperty& from) : impl(new SchemaPropertyImpl(*(from.impl))) {} SchemaProperty::~SchemaProperty() { delete impl; } void SchemaProperty::setAccess(Access access) { impl->setAccess(access); } void SchemaProperty::setIndex(bool val) { impl->setIndex(val); } @@ -512,8 +562,10 @@ bool SchemaProperty::isIndex() const { return impl->isIndex(); } bool SchemaProperty::isOptional() const { return impl->isOptional(); } const char* SchemaProperty::getUnit() const { return impl->getUnit().c_str(); } const char* SchemaProperty::getDesc() const { return impl->getDesc().c_str(); } -SchemaStatistic::SchemaStatistic(const char* name, Typecode typecode) { impl = new SchemaStatisticImpl(this, name, typecode); } + +SchemaStatistic::SchemaStatistic(const char* name, Typecode typecode) : impl(new SchemaStatisticImpl(name, typecode)) {} SchemaStatistic::SchemaStatistic(SchemaStatisticImpl* i) : impl(i) {} +SchemaStatistic::SchemaStatistic(const SchemaStatistic& from) : impl(new SchemaStatisticImpl(*(from.impl))) {} SchemaStatistic::~SchemaStatistic() { delete impl; } void SchemaStatistic::setUnit(const char* val) { impl->setUnit(val); } void SchemaStatistic::setDesc(const char* desc) { impl->setDesc(desc); } @@ -521,17 +573,24 @@ const char* SchemaStatistic::getName() const { return impl->getName().c_str(); } Typecode SchemaStatistic::getType() const { return impl->getType(); } const char* SchemaStatistic::getUnit() const { return impl->getUnit().c_str(); } const char* SchemaStatistic::getDesc() const { return impl->getDesc().c_str(); } + SchemaClassKey::SchemaClassKey(SchemaClassKeyImpl* i) : impl(i) {} +SchemaClassKey::SchemaClassKey(const SchemaClassKey& from) : impl(new SchemaClassKeyImpl(*(from.impl))) {} SchemaClassKey::~SchemaClassKey() { delete impl; } const char* SchemaClassKey::getPackageName() const { return impl->getPackageName().c_str(); } const char* SchemaClassKey::getClassName() const { return impl->getClassName().c_str(); } const uint8_t* SchemaClassKey::getHash() const { return impl->getHash(); } -SchemaObjectClass::SchemaObjectClass(const char* package, const char* name) { impl = new SchemaObjectClassImpl(this, package, name); } +const char* SchemaClassKey::asString() const { return impl->str().c_str(); } +bool SchemaClassKey::operator==(const SchemaClassKey& other) const { return *impl == *(other.impl); } +bool SchemaClassKey::operator<(const SchemaClassKey& other) const { return *impl < *(other.impl); } + +SchemaObjectClass::SchemaObjectClass(const char* package, const char* name) : impl(new SchemaObjectClassImpl(package, name)) {} SchemaObjectClass::SchemaObjectClass(SchemaObjectClassImpl* i) : impl(i) {} +SchemaObjectClass::SchemaObjectClass(const SchemaObjectClass& from) : impl(new SchemaObjectClassImpl(*(from.impl))) {} SchemaObjectClass::~SchemaObjectClass() { delete impl; } -void SchemaObjectClass::addProperty(const SchemaProperty& property) { impl->addProperty(property); } -void SchemaObjectClass::addStatistic(const SchemaStatistic& statistic) { impl->addStatistic(statistic); } -void SchemaObjectClass::addMethod(const SchemaMethod& method) { impl->addMethod(method); } +void SchemaObjectClass::addProperty(const SchemaProperty* property) { impl->addProperty(property); } +void SchemaObjectClass::addStatistic(const SchemaStatistic* statistic) { impl->addStatistic(statistic); } +void SchemaObjectClass::addMethod(const SchemaMethod* method) { impl->addMethod(method); } const SchemaClassKey* SchemaObjectClass::getClassKey() const { return impl->getClassKey(); } int SchemaObjectClass::getPropertyCount() const { return impl->getPropertyCount(); } int SchemaObjectClass::getStatisticCount() const { return impl->getStatisticCount(); } @@ -539,10 +598,12 @@ int SchemaObjectClass::getMethodCount() const { return impl->getMethodCount(); } const SchemaProperty* SchemaObjectClass::getProperty(int idx) const { return impl->getProperty(idx); } const SchemaStatistic* SchemaObjectClass::getStatistic(int idx) const { return impl->getStatistic(idx); } const SchemaMethod* SchemaObjectClass::getMethod(int idx) const { return impl->getMethod(idx); } -SchemaEventClass::SchemaEventClass(const char* package, const char* name) { impl = new SchemaEventClassImpl(this, package, name); } + +SchemaEventClass::SchemaEventClass(const char* package, const char* name) : impl(new SchemaEventClassImpl(package, name)) {} SchemaEventClass::SchemaEventClass(SchemaEventClassImpl* i) : impl(i) {} +SchemaEventClass::SchemaEventClass(const SchemaEventClass& from) : impl(new SchemaEventClassImpl(*(from.impl))) {} SchemaEventClass::~SchemaEventClass() { delete impl; } -void SchemaEventClass::addArgument(const SchemaArgument& argument) { impl->addArgument(argument); } +void SchemaEventClass::addArgument(const SchemaArgument* argument) { impl->addArgument(argument); } void SchemaEventClass::setDesc(const char* desc) { impl->setDesc(desc); } const SchemaClassKey* SchemaEventClass::getClassKey() const { return impl->getClassKey(); } int SchemaEventClass::getArgumentCount() const { return impl->getArgumentCount(); } diff --git a/qpid/cpp/src/qmf/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h index 035d99aecd..af3a1d98e4 100644 --- a/qpid/cpp/src/qmf/SchemaImpl.h +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h @@ -1,5 +1,5 @@ -#ifndef _QmfSchemaImpl_ -#define _QmfSchemaImpl_ +#ifndef _QmfEngineSchemaImpl_ +#define _QmfEngineSchemaImpl_ /* * Licensed to the Apache Software Foundation (ASF) under one @@ -20,13 +20,13 @@ * under the License. */ -#include "qmf/Schema.h" -#include <boost/shared_ptr.hpp> +#include "qmf/engine/Schema.h" #include <string> #include <vector> #include <qpid/framing/Buffer.h> namespace qmf { +namespace engine { // TODO: Destructors for schema classes // TODO: Add "frozen" attribute for schema classes so they can't be modified after @@ -52,16 +52,15 @@ namespace qmf { }; struct SchemaArgumentImpl { - SchemaArgument* envelope; std::string name; Typecode typecode; Direction dir; std::string unit; std::string description; - SchemaArgumentImpl(SchemaArgument* e, const char* n, Typecode t) : - envelope(e), name(n), typecode(t), dir(DIR_IN) {} + SchemaArgumentImpl(const char* n, Typecode t) : name(n), typecode(t), dir(DIR_IN) {} SchemaArgumentImpl(qpid::framing::Buffer& buffer); + static SchemaArgument* factory(qpid::framing::Buffer& buffer); void encode(qpid::framing::Buffer& buffer) const; void setDirection(Direction d) { dir = d; } void setUnit(const char* val) { unit = val; } @@ -75,15 +74,15 @@ namespace qmf { }; struct SchemaMethodImpl { - SchemaMethod* envelope; std::string name; std::string description; - std::vector<SchemaArgumentImpl*> arguments; + std::vector<const SchemaArgument*> arguments; - SchemaMethodImpl(SchemaMethod* e, const char* n) : envelope(e), name(n) {} + SchemaMethodImpl(const char* n) : name(n) {} SchemaMethodImpl(qpid::framing::Buffer& buffer); + static SchemaMethod* factory(qpid::framing::Buffer& buffer); void encode(qpid::framing::Buffer& buffer) const; - void addArgument(const SchemaArgument& argument); + void addArgument(const SchemaArgument* argument); void setDesc(const char* desc) { description = desc; } const std::string& getName() const { return name; } const std::string& getDesc() const { return description; } @@ -93,7 +92,6 @@ namespace qmf { }; struct SchemaPropertyImpl { - SchemaProperty* envelope; std::string name; Typecode typecode; Access access; @@ -102,10 +100,9 @@ namespace qmf { std::string unit; std::string description; - SchemaPropertyImpl(SchemaProperty* e, const char* n, Typecode t) : - envelope(e), name(n), typecode(t), access(ACCESS_READ_ONLY), - index(false), optional(false) {} + SchemaPropertyImpl(const char* n, Typecode t) : name(n), typecode(t), access(ACCESS_READ_ONLY), index(false), optional(false) {} SchemaPropertyImpl(qpid::framing::Buffer& buffer); + static SchemaProperty* factory(qpid::framing::Buffer& buffer); void encode(qpid::framing::Buffer& buffer) const; void setAccess(Access a) { access = a; } void setIndex(bool val) { index = val; } @@ -123,15 +120,14 @@ namespace qmf { }; struct SchemaStatisticImpl { - SchemaStatistic* envelope; std::string name; Typecode typecode; std::string unit; std::string description; - SchemaStatisticImpl(SchemaStatistic* e, const char* n, Typecode t) : - envelope(e), name(n), typecode(t) {} + SchemaStatisticImpl(const char* n, Typecode t) : name(n), typecode(t) {} SchemaStatisticImpl(qpid::framing::Buffer& buffer); + static SchemaStatistic* factory(qpid::framing::Buffer& buffer); void encode(qpid::framing::Buffer& buffer) const; void setUnit(const char* val) { unit = val; } void setDesc(const char* desc) { description = desc; } @@ -143,10 +139,10 @@ namespace qmf { }; struct SchemaClassKeyImpl { - const SchemaClassKey* envelope; const std::string& package; const std::string& name; const SchemaHash& hash; + mutable std::string repr; // The *Container elements are only used if there isn't an external place to // store these values. @@ -156,6 +152,8 @@ namespace qmf { SchemaClassKeyImpl(const std::string& package, const std::string& name, const SchemaHash& hash); SchemaClassKeyImpl(qpid::framing::Buffer& buffer); + static SchemaClassKey* factory(const std::string& package, const std::string& name, const SchemaHash& hash); + static SchemaClassKey* factory(qpid::framing::Buffer& buffer); const std::string& getPackageName() const { return package; } const std::string& getClassName() const { return name; } @@ -164,28 +162,28 @@ namespace qmf { void encode(qpid::framing::Buffer& buffer) const; bool operator==(const SchemaClassKeyImpl& other) const; bool operator<(const SchemaClassKeyImpl& other) const; - std::string str() const; + const std::string& str() const; }; struct SchemaObjectClassImpl { - typedef boost::shared_ptr<SchemaObjectClassImpl> Ptr; - SchemaObjectClass* envelope; std::string package; std::string name; mutable SchemaHash hash; mutable bool hasHash; - SchemaClassKeyImpl classKey; - std::vector<SchemaPropertyImpl*> properties; - std::vector<SchemaStatisticImpl*> statistics; - std::vector<SchemaMethodImpl*> methods; + std::auto_ptr<SchemaClassKey> classKey; + std::vector<const SchemaProperty*> properties; + std::vector<const SchemaStatistic*> statistics; + std::vector<const SchemaMethod*> methods; - SchemaObjectClassImpl(SchemaObjectClass* e, const char* p, const char* n) : - envelope(e), package(p), name(n), hasHash(false), classKey(package, name, hash) {} + SchemaObjectClassImpl(const char* p, const char* n) : + package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {} SchemaObjectClassImpl(qpid::framing::Buffer& buffer); + static SchemaObjectClass* factory(qpid::framing::Buffer& buffer); + void encode(qpid::framing::Buffer& buffer) const; - void addProperty(const SchemaProperty& property); - void addStatistic(const SchemaStatistic& statistic); - void addMethod(const SchemaMethod& method); + void addProperty(const SchemaProperty* property); + void addStatistic(const SchemaStatistic* statistic); + void addMethod(const SchemaMethod* method); const SchemaClassKey* getClassKey() const; int getPropertyCount() const { return properties.size(); } @@ -197,21 +195,21 @@ namespace qmf { }; struct SchemaEventClassImpl { - typedef boost::shared_ptr<SchemaEventClassImpl> Ptr; - SchemaEventClass* envelope; std::string package; std::string name; mutable SchemaHash hash; mutable bool hasHash; - SchemaClassKeyImpl classKey; + std::auto_ptr<SchemaClassKey> classKey; std::string description; - std::vector<SchemaArgumentImpl*> arguments; + std::vector<const SchemaArgument*> arguments; - SchemaEventClassImpl(SchemaEventClass* e, const char* p, const char* n) : - envelope(e), package(p), name(n), hasHash(false), classKey(package, name, hash) {} + SchemaEventClassImpl(const char* p, const char* n) : + package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {} SchemaEventClassImpl(qpid::framing::Buffer& buffer); + static SchemaEventClass* factory(qpid::framing::Buffer& buffer); + void encode(qpid::framing::Buffer& buffer) const; - void addArgument(const SchemaArgument& argument); + void addArgument(const SchemaArgument* argument); void setDesc(const char* desc) { description = desc; } const SchemaClassKey* getClassKey() const; @@ -219,6 +217,7 @@ namespace qmf { const SchemaArgument* getArgument(int idx) const; }; } +} #endif diff --git a/qpid/cpp/src/qmf/SequenceManager.cpp b/qpid/cpp/src/qmf/engine/SequenceManager.cpp index 3171e66fac..4a4644a8b9 100644 --- a/qpid/cpp/src/qmf/SequenceManager.cpp +++ b/qpid/cpp/src/qmf/engine/SequenceManager.cpp @@ -17,10 +17,10 @@ * under the License. */ -#include "qmf/SequenceManager.h" +#include "qmf/engine/SequenceManager.h" using namespace std; -using namespace qmf; +using namespace qmf::engine; using namespace qpid::sys; SequenceManager::SequenceManager() : nextSequence(1) {} @@ -68,14 +68,14 @@ void SequenceManager::releaseAll() contextMap.clear(); } -void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer) +void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, const string& routingKey, qpid::framing::Buffer& buffer) { Mutex::ScopedLock _lock(lock); bool done; if (sequence == 0) { if (unsolicitedContext.get() != 0) { - done = unsolicitedContext->handleMessage(opcode, sequence, buffer); + done = unsolicitedContext->handleMessage(opcode, sequence, routingKey, buffer); if (done) unsolicitedContext->release(); } @@ -85,7 +85,7 @@ void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, qpid::framing: map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence); if (iter != contextMap.end()) { if (iter->second != 0) { - done = iter->second->handleMessage(opcode, sequence, buffer); + done = iter->second->handleMessage(opcode, sequence, routingKey, buffer); if (done) { iter->second->release(); contextMap.erase(iter); diff --git a/qpid/cpp/src/qmf/SequenceManager.h b/qpid/cpp/src/qmf/engine/SequenceManager.h index bbfd0728a7..9e47e38610 100644 --- a/qpid/cpp/src/qmf/SequenceManager.h +++ b/qpid/cpp/src/qmf/engine/SequenceManager.h @@ -1,5 +1,5 @@ -#ifndef _QmfSequenceManager_ -#define _QmfSequenceManager_ +#ifndef _QmfEngineSequenceManager_ +#define _QmfEngineSequenceManager_ /* * Licensed to the Apache Software Foundation (ASF) under one @@ -31,6 +31,7 @@ namespace qpid { } namespace qmf { +namespace engine { class SequenceContext { public: @@ -39,7 +40,7 @@ namespace qmf { virtual ~SequenceContext() {} virtual void reserve() = 0; - virtual bool handleMessage(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer) = 0; + virtual bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer) = 0; virtual void release() = 0; }; @@ -51,7 +52,7 @@ namespace qmf { uint32_t reserve(SequenceContext::Ptr ctx = SequenceContext::Ptr()); void release(uint32_t sequence); void releaseAll(); - void dispatch(uint8_t opcode, uint32_t sequence, qpid::framing::Buffer& buffer); + void dispatch(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer); private: mutable qpid::sys::Mutex lock; @@ -61,6 +62,7 @@ namespace qmf { }; } +} #endif diff --git a/qpid/cpp/src/qmf/ValueImpl.cpp b/qpid/cpp/src/qmf/engine/ValueImpl.cpp index f42c85eb33..f80bdab866 100644 --- a/qpid/cpp/src/qmf/ValueImpl.cpp +++ b/qpid/cpp/src/qmf/engine/ValueImpl.cpp @@ -17,14 +17,14 @@ * under the License. */ -#include "qmf/ValueImpl.h" +#include "qmf/engine/ValueImpl.h" #include <qpid/framing/FieldTable.h> using namespace std; -using namespace qmf; +using namespace qmf::engine; using qpid::framing::Buffer; -ValueImpl::ValueImpl(Typecode t, Buffer& buf) : envelope(new Value(this)), typecode(t) +ValueImpl::ValueImpl(Typecode t, Buffer& buf) : typecode(t) { uint64_t first; uint64_t second; @@ -42,8 +42,8 @@ ValueImpl::ValueImpl(Typecode t, Buffer& buf) : envelope(new Value(this)), typec case TYPE_BOOL : value.boolVal = (buf.getOctet() != 0); break; case TYPE_FLOAT : value.floatVal = buf.getFloat(); break; case TYPE_DOUBLE : value.doubleVal = buf.getDouble(); break; - case TYPE_INT8 : value.s32 = (int32_t) buf.getOctet(); break; - case TYPE_INT16 : value.s32 = (int32_t) buf.getShort(); break; + case TYPE_INT8 : value.s32 = (int32_t) ((int8_t) buf.getOctet()); break; + case TYPE_INT16 : value.s32 = (int32_t) ((int16_t) buf.getShort()); break; case TYPE_INT32 : value.s32 = (int32_t) buf.getLong(); break; case TYPE_INT64 : value.s64 = buf.getLongLong(); break; case TYPE_UUID : buf.getBin128(value.uuidVal); break; @@ -67,11 +67,27 @@ ValueImpl::ValueImpl(Typecode t, Buffer& buf) : envelope(new Value(this)), typec } } -ValueImpl::ValueImpl(Typecode t) : envelope(new Value(this)), typecode(t) +ValueImpl::ValueImpl(Typecode t, Typecode at) : typecode(t), valid(false), arrayTypecode(at) +{ +} + +ValueImpl::ValueImpl(Typecode t) : typecode(t) { ::memset(&value, 0, sizeof(value)); } +Value* ValueImpl::factory(Typecode t, Buffer& b) +{ + ValueImpl* impl(new ValueImpl(t, b)); + return new Value(impl); +} + +Value* ValueImpl::factory(Typecode t) +{ + ValueImpl* impl(new ValueImpl(t)); + return new Value(impl); +} + ValueImpl::~ValueImpl() { } @@ -113,9 +129,9 @@ bool ValueImpl::keyInMap(const char* key) const Value* ValueImpl::byKey(const char* key) { if (keyInMap(key)) { - map<std::string, VPtr>::iterator iter = mapVal.find(key); + map<std::string, Value>::iterator iter = mapVal.find(key); if (iter != mapVal.end()) - return iter->second.get(); + return &iter->second; } return 0; } @@ -123,9 +139,9 @@ Value* ValueImpl::byKey(const char* key) const Value* ValueImpl::byKey(const char* key) const { if (keyInMap(key)) { - map<std::string, VPtr>::const_iterator iter = mapVal.find(key); + map<std::string, Value>::const_iterator iter = mapVal.find(key); if (iter != mapVal.end()) - return iter->second.get(); + return &iter->second; } return 0; } @@ -137,12 +153,13 @@ void ValueImpl::deleteKey(const char* key) void ValueImpl::insert(const char* key, Value* val) { - mapVal[key] = VPtr(val); + pair<string, Value> entry(key, *val); + mapVal.insert(entry); } const char* ValueImpl::key(uint32_t idx) const { - map<std::string, VPtr>::const_iterator iter = mapVal.begin(); + map<std::string, Value>::const_iterator iter = mapVal.begin(); for (uint32_t i = 0; i < idx; i++) { if (iter == mapVal.end()) break; @@ -186,293 +203,64 @@ void ValueImpl::deleteArrayItem(uint32_t) // Wrappers //================================================================== -Value::Value(Typecode t, Typecode at) -{ - impl = new ValueImpl(this, t, at); -} - -Value::Value(ValueImpl* i) -{ - impl = i; -} - -Value::~Value() -{ - delete impl; -} - -Typecode Value::getType() const -{ - return impl->getType(); -} - -bool Value::isNull() const -{ - return impl->isNull(); -} - -void Value::setNull() -{ - impl->setNull(); -} - -bool Value::isObjectId() const -{ - return impl->isObjectId(); -} - -const ObjectId& Value::asObjectId() const -{ - return impl->asObjectId(); -} - -void Value::setObjectId(const ObjectId& oid) -{ - impl->setObjectId(oid); -} - -bool Value::isUint() const -{ - return impl->isUint(); -} - -uint32_t Value::asUint() const -{ - return impl->asUint(); -} - -void Value::setUint(uint32_t val) -{ - impl->setUint(val); -} - -bool Value::isInt() const -{ - return impl->isInt(); -} - -int32_t Value::asInt() const -{ - return impl->asInt(); -} - -void Value::setInt(int32_t val) -{ - impl->setInt(val); -} - -bool Value::isUint64() const -{ - return impl->isUint64(); -} - -uint64_t Value::asUint64() const -{ - return impl->asUint64(); -} - -void Value::setUint64(uint64_t val) -{ - impl->setUint64(val); -} - -bool Value::isInt64() const -{ - return impl->isInt64(); -} - -int64_t Value::asInt64() const -{ - return impl->asInt64(); -} - -void Value::setInt64(int64_t val) -{ - impl->setInt64(val); -} - -bool Value::isString() const -{ - return impl->isString(); -} - -const char* Value::asString() const -{ - return impl->asString(); -} - -void Value::setString(const char* val) -{ - impl->setString(val); -} - -bool Value::isBool() const -{ - return impl->isBool(); -} - -bool Value::asBool() const -{ - return impl->asBool(); -} - -void Value::setBool(bool val) -{ - impl->setBool(val); -} - -bool Value::isFloat() const -{ - return impl->isFloat(); -} - -float Value::asFloat() const -{ - return impl->asFloat(); -} - -void Value::setFloat(float val) -{ - impl->setFloat(val); -} - -bool Value::isDouble() const -{ - return impl->isDouble(); -} - -double Value::asDouble() const -{ - return impl->asDouble(); -} - -void Value::setDouble(double val) -{ - impl->setDouble(val); -} - -bool Value::isUuid() const -{ - return impl->isUuid(); -} - -const uint8_t* Value::asUuid() const -{ - return impl->asUuid(); -} - -void Value::setUuid(const uint8_t* val) -{ - impl->setUuid(val); -} - -bool Value::isObject() const -{ - return impl->isObject(); -} - -Object* Value::asObject() const -{ - return impl->asObject(); -} - -void Value::setObject(Object* val) -{ - impl->setObject(val); -} - -bool Value::isMap() const -{ - return impl->isMap(); -} - -bool Value::keyInMap(const char* key) const -{ - return impl->keyInMap(key); -} - -Value* Value::byKey(const char* key) -{ - return impl->byKey(key); -} - -const Value* Value::byKey(const char* key) const -{ - return impl->byKey(key); -} - -void Value::deleteKey(const char* key) -{ - impl->deleteKey(key); -} - -void Value::insert(const char* key, Value* val) -{ - impl->insert(key, val); -} - -uint32_t Value::keyCount() const -{ - return impl->keyCount(); -} - -const char* Value::key(uint32_t idx) const -{ - return impl->key(idx); -} - -bool Value::isList() const -{ - return impl->isList(); -} - -uint32_t Value::listItemCount() const -{ - return impl->listItemCount(); -} - -Value* Value::listItem(uint32_t idx) -{ - return impl->listItem(idx); -} - -void Value::appendToList(Value* val) -{ - impl->appendToList(val); -} - -void Value::deleteListItem(uint32_t idx) -{ - impl->deleteListItem(idx); -} - -bool Value::isArray() const -{ - return impl->isArray(); -} - -Typecode Value::arrayType() const -{ - return impl->arrayType(); -} - -uint32_t Value::arrayItemCount() const -{ - return impl->arrayItemCount(); -} - -Value* Value::arrayItem(uint32_t idx) -{ - return impl->arrayItem(idx); -} - -void Value::appendToArray(Value* val) -{ - impl->appendToArray(val); -} - -void Value::deleteArrayItem(uint32_t idx) -{ - impl->deleteArrayItem(idx); -} +Value::Value(const Value& from) : impl(new ValueImpl(*(from.impl))) {} +Value::Value(Typecode t, Typecode at) : impl(new ValueImpl(t, at)) {} +Value::Value(ValueImpl* i) : impl(i) {} +Value::~Value() { delete impl;} + +Typecode Value::getType() const { return impl->getType(); } +bool Value::isNull() const { return impl->isNull(); } +void Value::setNull() { impl->setNull(); } +bool Value::isObjectId() const { return impl->isObjectId(); } +const ObjectId& Value::asObjectId() const { return impl->asObjectId(); } +void Value::setObjectId(const ObjectId& oid) { impl->setObjectId(oid); } +bool Value::isUint() const { return impl->isUint(); } +uint32_t Value::asUint() const { return impl->asUint(); } +void Value::setUint(uint32_t val) { impl->setUint(val); } +bool Value::isInt() const { return impl->isInt(); } +int32_t Value::asInt() const { return impl->asInt(); } +void Value::setInt(int32_t val) { impl->setInt(val); } +bool Value::isUint64() const { return impl->isUint64(); } +uint64_t Value::asUint64() const { return impl->asUint64(); } +void Value::setUint64(uint64_t val) { impl->setUint64(val); } +bool Value::isInt64() const { return impl->isInt64(); } +int64_t Value::asInt64() const { return impl->asInt64(); } +void Value::setInt64(int64_t val) { impl->setInt64(val); } +bool Value::isString() const { return impl->isString(); } +const char* Value::asString() const { return impl->asString(); } +void Value::setString(const char* val) { impl->setString(val); } +bool Value::isBool() const { return impl->isBool(); } +bool Value::asBool() const { return impl->asBool(); } +void Value::setBool(bool val) { impl->setBool(val); } +bool Value::isFloat() const { return impl->isFloat(); } +float Value::asFloat() const { return impl->asFloat(); } +void Value::setFloat(float val) { impl->setFloat(val); } +bool Value::isDouble() const { return impl->isDouble(); } +double Value::asDouble() const { return impl->asDouble(); } +void Value::setDouble(double val) { impl->setDouble(val); } +bool Value::isUuid() const { return impl->isUuid(); } +const uint8_t* Value::asUuid() const { return impl->asUuid(); } +void Value::setUuid(const uint8_t* val) { impl->setUuid(val); } +bool Value::isObject() const { return impl->isObject(); } +const Object* Value::asObject() const { return impl->asObject(); } +void Value::setObject(Object* val) { impl->setObject(val); } +bool Value::isMap() const { return impl->isMap(); } +bool Value::keyInMap(const char* key) const { return impl->keyInMap(key); } +Value* Value::byKey(const char* key) { return impl->byKey(key); } +const Value* Value::byKey(const char* key) const { return impl->byKey(key); } +void Value::deleteKey(const char* key) { impl->deleteKey(key); } +void Value::insert(const char* key, Value* val) { impl->insert(key, val); } +uint32_t Value::keyCount() const { return impl->keyCount(); } +const char* Value::key(uint32_t idx) const { return impl->key(idx); } +bool Value::isList() const { return impl->isList(); } +uint32_t Value::listItemCount() const { return impl->listItemCount(); } +Value* Value::listItem(uint32_t idx) { return impl->listItem(idx); } +void Value::appendToList(Value* val) { impl->appendToList(val); } +void Value::deleteListItem(uint32_t idx) { impl->deleteListItem(idx); } +bool Value::isArray() const { return impl->isArray(); } +Typecode Value::arrayType() const { return impl->arrayType(); } +uint32_t Value::arrayItemCount() const { return impl->arrayItemCount(); } +Value* Value::arrayItem(uint32_t idx) { return impl->arrayItem(idx); } +void Value::appendToArray(Value* val) { impl->appendToArray(val); } +void Value::deleteArrayItem(uint32_t idx) { impl->deleteArrayItem(idx); } diff --git a/qpid/cpp/src/qmf/ValueImpl.h b/qpid/cpp/src/qmf/engine/ValueImpl.h index cf33035bf7..b6adae5d93 100644 --- a/qpid/cpp/src/qmf/ValueImpl.h +++ b/qpid/cpp/src/qmf/engine/ValueImpl.h @@ -1,5 +1,5 @@ -#ifndef _QmfValueImpl_ -#define _QmfValueImpl_ +#ifndef _QmfEngineValueImpl_ +#define _QmfEngineValueImpl_ /* * Licensed to the Apache Software Foundation (ASF) under one @@ -20,9 +20,9 @@ * under the License. */ -#include <qmf/Value.h> -#include <qmf/ObjectIdImpl.h> -#include <qmf/Object.h> +#include <qmf/engine/Value.h> +#include <qmf/engine/ObjectIdImpl.h> +#include <qmf/engine/Object.h> #include <qpid/framing/Buffer.h> #include <string> #include <string.h> @@ -31,22 +31,20 @@ #include <boost/shared_ptr.hpp> namespace qmf { +namespace engine { // TODO: set valid flag on all value settors // TODO: add a modified flag and accessors struct ValueImpl { - typedef boost::shared_ptr<Value> VPtr; - typedef boost::shared_ptr<Object> OPtr; - Value* envelope; const Typecode typecode; bool valid; ObjectId refVal; std::string stringVal; - OPtr objectVal; - std::map<std::string, VPtr> mapVal; - std::vector<VPtr> vectorVal; + std::auto_ptr<Object> objectVal; + std::map<std::string, Value> mapVal; + std::vector<Value> vectorVal; Typecode arrayTypecode; union { @@ -60,10 +58,17 @@ namespace qmf { uint8_t uuidVal[16]; } value; - ValueImpl(Value* e, Typecode t, Typecode at) : - envelope(e), typecode(t), valid(false), arrayTypecode(at) {} + ValueImpl(const ValueImpl& from) : + typecode(from.typecode), valid(from.valid), refVal(from.refVal), stringVal(from.stringVal), + objectVal(from.objectVal.get() ? new Object(*(from.objectVal)) : 0), + mapVal(from.mapVal), vectorVal(from.vectorVal), arrayTypecode(from.arrayTypecode), + value(from.value) {} + + ValueImpl(Typecode t, Typecode at); ValueImpl(Typecode t, qpid::framing::Buffer& b); ValueImpl(Typecode t); + static Value* factory(Typecode t, qpid::framing::Buffer& b); + static Value* factory(Typecode t); ~ValueImpl(); void encode(qpid::framing::Buffer& b) const; @@ -139,6 +144,7 @@ namespace qmf { void deleteArrayItem(uint32_t idx); }; } +} #endif |