diff options
author | Ted Ross <tross@apache.org> | 2010-02-19 16:06:22 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-02-19 16:06:22 +0000 |
commit | 2530d09b874d23edf5ec9743d1e4ad1589bd4443 (patch) | |
tree | 2c790e64d8004e945ec60c039db3ef630d059b23 | |
parent | 255334aa9b170f6ad8054677bdbdc5fd30ea2c1d (diff) | |
download | qpid-python-2530d09b874d23edf5ec9743d1e4ad1589bd4443.tar.gz |
Checking in work-in-progress for the qmf branch.
This branch will not compile until further changes are checked in.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@911854 13f79535-47bb-0310-9956-ffa450edef68
32 files changed, 582 insertions, 3476 deletions
diff --git a/qpid/cpp/include/qmf/Connection.h b/qpid/cpp/include/qmf/Connection.h deleted file mode 100644 index f648b1427f..0000000000 --- a/qpid/cpp/include/qmf/Connection.h +++ /dev/null @@ -1,125 +0,0 @@ -#ifndef _QmfConnection_ -#define _QmfConnection_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/QmfImportExport.h" -#include "qmf/ConnectionSettings.h" - -namespace qmf { - - /** - * Operational states for Connections. - * - * \ingroup qmfapi - */ - enum ConnectionState { - CONNECTION_UP = 1, - CONNECTION_DOWN = 2 - }; - - /** - * Implement a subclass of ConnectionListener and provide it with the - * Connection constructor to receive notification of changes in the - * connection state. - * - * \ingroup qmfapi - */ - class ConnectionListener { - QMF_EXTERN virtual ~ConnectionListener(); - - /** - * Called each time the state of the connection changes. - * - * @param state the new state - */ - virtual void newState(ConnectionState state); - - /** - * Called if the connection requires input from an interactive client. - * - * @param prompt Text of the prompt - describes what information is required. - * @param answer The interactive user input. - * @param answerLen on Input - the maximum number of bytes that can be copied to answer. - * on Output - the number of bytes copied to answer. - */ - virtual void interactivePrompt(const char* prompt, char* answer, uint32_t answerLen); - }; - - class ConnectionImpl; - - /** - * The Connection class represents a connection to a QPID broker that can - * be used by agents and consoles, possibly multiple at the same time. - * - * \ingroup qmfapi - */ - class Connection { - public: - - /** - * Creates a connection object and begins the process of attempting to - * connect to the QPID broker. - * - * @param settings The settings that control how the connection is set - * up. - * - * @param listener An optional pointer to a subclass of - * ConnectionListener to receive notifications of events related to - * this connection. - */ - QMF_EXTERN Connection(const ConnectionSettings& settings, - const ConnectionListener* listener = 0); - - /** - * Destroys a connection, causing the connection to be closed. - */ - QMF_EXTERN ~Connection(); - - /** - * Set the administrative state of the connection (enabled or disabled). - * - * @param enabled True => enable connection, False => disable connection - */ - QMF_EXTERN void setAdminState(bool enabled); - - /** - * Return the current operational state of the connection (up or down). - * - * @return the current connection state. - */ - QMF_EXTERN ConnectionState getOperState() const; - - /** - * Get the error message from the last failure to connect. - * - * @return Null-terminated string containing the error message. - */ - QMF_EXTERN const char* getLastError() const; - - private: - friend class AgentImpl; - friend class ConsoleImpl; - ConnectionImpl* impl; - }; - -} - -#endif diff --git a/qpid/cpp/include/qmf/ConnectionSettings.h b/qpid/cpp/include/qmf/ConnectionSettings.h deleted file mode 100644 index 11af73d797..0000000000 --- a/qpid/cpp/include/qmf/ConnectionSettings.h +++ /dev/null @@ -1,32 +0,0 @@ -#ifndef _QmfConnectionSettings_ -#define _QmfConnectionSettings_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace qmf { - namespace engine { - class ConnectionSettings; - } - - typedef class engine::ConnectionSettings ConnectionSettings; -} - -#endif - diff --git a/qpid/cpp/src/qmf/engine/Protocol.h b/qpid/cpp/include/qmf/Protocol.h index 1cdfa60c84..07870b0eb5 100644 --- a/qpid/cpp/src/qmf/engine/Protocol.h +++ b/qpid/cpp/include/qmf/Protocol.h @@ -1,5 +1,5 @@ -#ifndef _QmfEngineProtocol_ -#define _QmfEngineProtocol_ +#ifndef _QmfProtocol_ +#define _QmfProtocol_ /* * Licensed to the Apache Software Foundation (ASF) under one @@ -21,21 +21,40 @@ */ #include <qpid/sys/IntegerTypes.h> +#include <string> namespace qpid { - namespace framing { - class Buffer; + namespace messaging { + class Message; } } namespace qmf { -namespace engine { class Protocol { public: - static bool checkHeader(qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - static void encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + //static bool checkHeader(const qpid::messaging::Message& msg, std::string& opcode, uint32_t *seq); + //static void encodeHeader(qpid::messaging::Message& msg, const std::string& opcode, uint32_t seq = 0); + const static std::string SCHEMA_ELT_NAME; + const static std::string SCHEMA_ELT_TYPE; + const static std::string SCHEMA_ELT_DIR; + const static std::string SCHEMA_ELT_UNIT; + const static std::string SCHEMA_ELT_DESC; + const static std::string SCHEMA_ELT_ACCESS; + const static std::string SCHEMA_ELT_OPTIONAL; + const static std::string SCHEMA_ARGS; + const static std::string SCHEMA_PACKAGE; + const static std::string SCHEMA_CLASS_KIND; + const static std::string SCHEMA_CLASS_KIND_DATA; + const static std::string SCHEMA_CLASS_KIND_EVENT; + const static std::string SCHEMA_CLASS; + const static std::string SCHEMA_HASH; + const static std::string AGENT_NAME; + const static std::string OBJECT_NAME; + const static std::string SCHEMA_ID; + + /* const static uint8_t OP_ATTACH_REQUEST = 'A'; const static uint8_t OP_ATTACH_RESPONSE = 'a'; @@ -60,10 +79,10 @@ namespace engine { const static uint8_t OP_PROPERTY_INDICATION = 'c'; const static uint8_t OP_STATISTIC_INDICATION = 'i'; const static uint8_t OP_EVENT_INDICATION = 'e'; + */ }; } -} #endif diff --git a/qpid/cpp/include/qmf/engine/Agent.h b/qpid/cpp/include/qmf/engine/Agent.h index 71abf82254..c645b3cc01 100644 --- a/qpid/cpp/include/qmf/engine/Agent.h +++ b/qpid/cpp/include/qmf/engine/Agent.h @@ -25,8 +25,8 @@ #include <qmf/engine/Object.h> #include <qmf/engine/Event.h> #include <qmf/engine/Query.h> -#include <qmf/engine/Value.h> -#include <qmf/engine/Message.h> +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Variant.h> namespace qmf { namespace engine { @@ -42,12 +42,7 @@ namespace engine { GET_QUERY = 1, START_SYNC = 2, END_SYNC = 3, - METHOD_CALL = 4, - DECLARE_QUEUE = 5, - DELETE_QUEUE = 6, - BIND = 7, - UNBIND = 8, - SETUP_COMPLETE = 9 + METHOD_CALL = 4 }; EventKind kind; @@ -55,13 +50,11 @@ namespace engine { char* authUserId; // Authenticated user ID (for all kinds) char* authToken; // Authentication token if issued (for all kinds) char* name; // Name of the method/sync query - // (METHOD_CALL, START_SYNC, END_SYNC, DECLARE_QUEUE, BIND, UNBIND) + // (METHOD_CALL, START_SYNC, END_SYNC) Object* object; // Object involved in method call (METHOD_CALL) - ObjectId* objectId; // ObjectId for method call (METHOD_CALL) + char* objectKey; // Object key for method call (METHOD_CALL) Query* query; // Query parameters (GET_QUERY, START_SYNC) - Value* arguments; // Method parameters (METHOD_CALL) - char* exchange; // Exchange for bind (BIND, UNBIND) - char* bindingKey; // Key for bind (BIND, UNBIND) + qpid::messaging::Variant::Map* arguments; // Method parameters (METHOD_CALL) const SchemaObjectClass* objectClass; // (METHOD_CALL) }; @@ -72,10 +65,17 @@ namespace engine { */ class Agent { public: - Agent(char* label, bool internalStore=true); + Agent(const char* vendor, const char* product, const char* name, const char* domain=0, bool internalStore=true); ~Agent(); /** + * Set an agent attribute that can be used to describe this agent to consoles. + *@param key Null-terminated string that is the name of the attribute. + *@param value Variant value (or any API type) of the attribute. + */ + void setAttr(const char* key, const qpid::messaging::Variant& value); + + /** * Configure the directory path for storing persistent data. *@param path Null-terminated string containing a directory path where files can be * created, written, and read. If NULL, no persistent storage will be @@ -92,24 +92,6 @@ namespace engine { void setTransferDir(const char* path); /** - * Pass messages received from the AMQP session to the Agent engine. - *@param message AMQP messages received on the agent session. - */ - void handleRcvMessage(Message& message); - - /** - * Get the next message to be sent to the AMQP network. - *@param item The Message structure describing the message to be produced. - *@return true if the Message is valid, false if there are no messages to send. - */ - bool getXmtMessage(Message& item) const; - - /** - * Remove and discard one message from the head of the transmit queue. - */ - void popXmt(); - - /** * Get the next application event from the agent engine. *@param event The event iff the return value is true *@return true if event is valid, false if there are no events to process @@ -122,20 +104,9 @@ namespace engine { void popEvent(); /** - * A new AMQP session has been established for Agent communication. - */ - void newSession(); - - /** - * Start the QMF Agent protocol. This should be invoked after a SETUP_COMPLETE event - * is received from the Agent engine. - */ - void startProtocol(); - - /** - * This method is called periodically so the agent can supply a heartbeat. + * Provide the AMQP connection to be used for this agent. */ - void heartbeat(); + void setConnection(qpid::messaging::Connection& conn); /** * Respond to a method request. @@ -144,7 +115,7 @@ namespace engine { *@param text Status text ("OK" or an error message) *@param arguments The list of output arguments from the method call. */ - void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments); + void methodResponse(uint32_t sequence, uint32_t status, char* text, const qpid::messaging::Variant::Map& arguments); /** * Send a content indication to the QMF bus. This is only needed for objects that are @@ -152,10 +123,8 @@ namespace engine { * (inserted using addObject). *@param sequence The sequence number of the GET request or the SYNC_START request. *@param object The object (annotated with "changed" flags) for publication. - *@param prop If true, changed object properties are transmitted. - *@param stat If true, changed object statistics are transmitted. */ - void queryResponse(uint32_t sequence, Object& object, bool prop = true, bool stat = true); + void queryResponse(uint32_t sequence, Object& object); /** * Indicate the completion of a query. This is not used for SYNC_START requests. @@ -178,20 +147,12 @@ namespace engine { /** * Give an object to the Agent for storage and management. Once added, the agent takes * responsibility for the life cycle of the object. - *@param obj The object to be managed by the Agent. - *@param persistId A unique non-zero value if the object-id is to be persistent. - *@return The objectId of the managed object. - */ - const ObjectId* addObject(Object& obj, uint64_t persistId); - // const ObjectId* addObject(Object& obj, uint32_t persistIdLo, uint32_t persistIdHi); - - /** - * Allocate an object-id for an object that will be managed by the application. - *@param persistId A unique non-zero value if the object-id is to be persistent. - *@return The objectId structure for the allocated ID. + *@param obj The object to be managed by the Agent. + *@param key A unique name (a primary key) to be used to address this object. If + * left null, the agent will create a unique name for the object. + *@return The key for the managed object. */ - const ObjectId* allocObjectId(uint64_t persistId); - const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi); + const char* addObject(Object& obj, const char* key=0); /** * Raise an event into the QMF network.. diff --git a/qpid/cpp/include/qmf/engine/ConnectionSettings.h b/qpid/cpp/include/qmf/engine/ConnectionSettings.h deleted file mode 100644 index 36312400b1..0000000000 --- a/qpid/cpp/include/qmf/engine/ConnectionSettings.h +++ /dev/null @@ -1,150 +0,0 @@ -#ifndef _QmfEngineConnectionSettings_ -#define _QmfEngineConnectionSettings_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/QmfEngineImportExport.h" -#include "qpid/sys/IntegerTypes.h" - -namespace qmf { -namespace engine { - - class ConnectionSettingsImpl; - class Value; - - /** - * Settings for AMQP connections to the broker. - * - * \ingroup qmfapi - */ - class ConnectionSettings { - public: - - /** - * Create a set of default connection settings. - * - * If no further attributes are set, the settings will cause a connection to be made to - * the default broker (on localhost or at a host/port supplied by service discovery) and - * authentication will be the best-available (GSSAPI/Kerberos, Anonymous, Plain with prompts - * for username and password). - */ - QMFE_EXTERN ConnectionSettings(); - - /** - * Create a set of connection settings by URL. - * - * @param url Universal resource locator describing the broker address and additional attributes. - * - * The URL is of the form: - * amqp[s]://host[:port][?key=value[&key=value]*] - * - * For example: - * amqp://localhost - * amqp://broker?transport=rdma&authmech=GSSAPI&authservice=qpidd - * amqps://broker?authmech=PLAIN&authuser=guest&authpass=guest - */ - QMFE_EXTERN ConnectionSettings(const char* url); - - /** - * Copy Constructor. - */ - ConnectionSettings(const ConnectionSettings& from); - - /** - * Destroy the connection settings object. - */ - QMFE_EXTERN ~ConnectionSettings(); - - /** - * Set an attribute to control connection setup. - * - * @param key A null-terminated string that is an attribute name. - * - * @param value Reference to a value to be stored as the attribute. The type of the value - * is specific to the key. - * - * @return True if success, False if invalid attribute - */ - QMFE_EXTERN bool setAttr(const char* key, const Value& value); - - /** - * Get the value of an attribute. - * - * @param key A null-terminated attribute name. - * - * @return The value associated with the attribute name. - */ - QMFE_EXTERN Value getAttr(const char* key) const; - - /** - * Get the attribute string (the portion of the URL following the '?') for the settings. - * - * @return A pointer to the attribute string. If the content of this string needs to be - * available beyond the scope of the calling function, it should be copied. The - * returned pointer may become invalid if the set of attributes is changed. - */ - QMFE_EXTERN const char* getAttrString() const; - - /** - * Shortcuts for setting the transport for the connection. - * - * @param port The port value for the connection address. - */ - QMFE_EXTERN void transportTcp(uint16_t port = 5672); - QMFE_EXTERN void transportSsl(uint16_t port = 5671); - QMFE_EXTERN void transportRdma(uint16_t port = 5672); - - /** - * Shortcuts for setting authentication mechanisms. - * - * @param username Null-terminated authentication user name. - * - * @param password Null-terminated authentication password. - * - * @param serviceName Null-terminated GSSAPI service name (Kerberos service principal) - * - * @param minSsf Minimum security factor for connections. 0 = encryption not required. - * - * @param maxSsf Maximum security factor for connections. 0 = encryption not permitted. - */ - QMFE_EXTERN void authAnonymous(const char* username = 0); - QMFE_EXTERN void authPlain(const char* username = 0, const char* password = 0); - QMFE_EXTERN void authGssapi(const char* serviceName, uint32_t minSsf = 0, uint32_t maxSsf = 256); - - /** - * Shortcut for setting connection retry attributes. - * - * @param delayMin Minimum delay (in seconds) between connection attempts. - * - * @param delaxMax Maximum delay (in seconds) between connection attempts. - * - * @param delayFactor Factor to multiply the delay by between failed connection attempts. - */ - QMFE_EXTERN void setRetry(int delayMin = 1, int delayMax = 128, int delayFactor = 2); - - private: - friend class ResilientConnectionImpl; - ConnectionSettingsImpl* impl; - }; - -} -} - -#endif diff --git a/qpid/cpp/include/qmf/engine/Console.h b/qpid/cpp/include/qmf/engine/Console.h index 03b3993395..b99d63b9a6 100644 --- a/qpid/cpp/include/qmf/engine/Console.h +++ b/qpid/cpp/include/qmf/engine/Console.h @@ -20,14 +20,12 @@ * under the License. */ -#include <qmf/engine/ResilientConnection.h> #include <qmf/engine/Schema.h> #include <qmf/engine/ObjectId.h> #include <qmf/engine/Object.h> #include <qmf/engine/Event.h> #include <qmf/engine/Query.h> -#include <qmf/engine/Value.h> -#include <qmf/engine/Message.h> +#include <qpid/messaging/Variant.h> namespace qmf { namespace engine { @@ -49,8 +47,8 @@ namespace engine { MethodResponse(const MethodResponse& from); ~MethodResponse(); uint32_t getStatus() const; - const Value* getException() const; - const Value* getArgs() const; + const qpid::messaging::Variant* getException() const; + const qpid::messaging::Variant::Map* getArgs() const; private: friend struct MethodResponseImpl; @@ -66,7 +64,7 @@ namespace engine { public: ~QueryResponse(); uint32_t getStatus() const; - const Value* getException() const; + const qpid::messaging::Variant* getException() const; uint32_t getObjectCount() const; const Object* getObject(uint32_t idx) const; @@ -161,10 +159,6 @@ namespace engine { void sessionClosed(); void startProtocol(); - void handleRcvMessage(Message& message); - bool getXmtMessage(Message& item) const; - void popXmt(); - bool getEvent(BrokerEvent& event) const; void popEvent(); diff --git a/qpid/cpp/include/qmf/engine/Message.h b/qpid/cpp/include/qmf/engine/Message.h deleted file mode 100644 index 1e95cc6afe..0000000000 --- a/qpid/cpp/include/qmf/engine/Message.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef _QmfEngineMessage_ -#define _QmfEngineMessage_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qpid/sys/IntegerTypes.h" - -namespace qmf { -namespace engine { - - struct Message { - char* body; - uint32_t length; - char* destination; - char* routingKey; - char* replyExchange; - char* replyKey; - char* userId; - }; - -} -} - -#endif diff --git a/qpid/cpp/include/qmf/engine/Object.h b/qpid/cpp/include/qmf/engine/Object.h index ad67cfdb95..81dbf45521 100644 --- a/qpid/cpp/include/qmf/engine/Object.h +++ b/qpid/cpp/include/qmf/engine/Object.h @@ -22,7 +22,7 @@ #include <qmf/engine/Schema.h> #include <qmf/engine/ObjectId.h> -#include <qmf/engine/Value.h> +#include <qpid/messaging/Variant.h> namespace qmf { namespace engine { @@ -30,23 +30,26 @@ namespace engine { struct ObjectImpl; class Object { public: - Object(const SchemaObjectClass* type); + Object(); + Object(SchemaObjectClass* type); Object(const Object& from); virtual ~Object(); + const qpid::messaging::Variant::Map& getValues() const; + qpid::messaging::Variant::Map& getValues(); + + const SchemaObjectClass* getSchema() const; + void setSchema(SchemaObjectClass* schema); + + const char* getKey() const; + void setKey(const char* key); + + void touch(); void destroy(); - const ObjectId* getObjectId() const; - void setObjectId(ObjectId* oid); - const SchemaObjectClass* getClass() const; - Value* getValue(const char* key) const; - void invokeMethod(const char* methodName, const Value* inArgs, void* context) const; - bool isDeleted() const; - void merge(const Object& from); private: friend struct ObjectImpl; friend class AgentImpl; - Object(ObjectImpl* impl); ObjectImpl* impl; }; } diff --git a/qpid/cpp/include/qmf/engine/Query.h b/qpid/cpp/include/qmf/engine/Query.h index 3ed08c5d8e..08e4ebf3cc 100644 --- a/qpid/cpp/include/qmf/engine/Query.h +++ b/qpid/cpp/include/qmf/engine/Query.h @@ -20,89 +20,39 @@ * under the License. */ -#include <qmf/engine/ObjectId.h> -#include <qmf/engine/Value.h> +#include <qpid/messaging/Variant.h> namespace qmf { namespace engine { class Object; - struct QueryElementImpl; - struct QueryImpl; - struct QueryExpressionImpl; - class SchemaClassKey; - - enum ValueOper { - O_EQ = 1, - O_NE = 2, - O_LT = 3, - O_LE = 4, - O_GT = 5, - O_GE = 6, - O_RE_MATCH = 7, - O_RE_NOMATCH = 8, - O_PRESENT = 9, - O_NOT_PRESENT = 10 - }; - - struct QueryOperand { - virtual ~QueryOperand() {} - virtual bool evaluate(const Object* object) const = 0; - }; - - struct QueryElement : public QueryOperand { - QueryElement(const char* attrName, const Value* value, ValueOper oper); - QueryElement(QueryElementImpl* impl); - virtual ~QueryElement(); - bool evaluate(const Object* object) const; - - QueryElementImpl* impl; - }; - - enum ExprOper { - E_NOT = 1, - E_AND = 2, - E_OR = 3, - E_XOR = 4 - }; - - struct QueryExpression : public QueryOperand { - QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2); - QueryExpression(QueryExpressionImpl* impl); - virtual ~QueryExpression(); - bool evaluate(const Object* object) const; - - QueryExpressionImpl* impl; - }; + class QueryImpl; class Query { public: - Query(const char* className, const char* packageName); - Query(const SchemaClassKey* key); - Query(const ObjectId* oid); + Query(const char* target); + Query(const char* target, const qpid::messaging::Variant::List& predicate); + Query(const char* target, const char* expression); Query(const Query& from); ~Query(); - void setSelect(const QueryOperand* criterion); - void setLimit(uint32_t maxResults); - void setOrderBy(const char* attrName, bool decreasing); - - const char* getPackage() const; - const char* getClass() const; - const ObjectId* getObjectId() const; + void where(const qpid::messaging::Variant::List& predicate); + void where(const char* expression); + void limit(uint32_t maxResults); + void orderBy(const char* attrName, bool decreasing); - bool haveSelect() const; + bool havePredicate() const; bool haveLimit() const; bool haveOrderBy() const; - const QueryOperand* getSelect() const; + const qpid::messaging::Variant::List& getPredicate() const; uint32_t getLimit() const; const char* getOrderBy() const; bool getDecreasing() const; + bool matches(const Object& object) const; + private: friend struct QueryImpl; - friend class BrokerProxyImpl; - Query(QueryImpl* impl); QueryImpl* impl; }; } diff --git a/qpid/cpp/include/qmf/engine/ResilientConnection.h b/qpid/cpp/include/qmf/engine/ResilientConnection.h deleted file mode 100644 index c03d08cb96..0000000000 --- a/qpid/cpp/include/qmf/engine/ResilientConnection.h +++ /dev/null @@ -1,173 +0,0 @@ -#ifndef _QmfEngineResilientConnection_ -#define _QmfEngineResilientConnection_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/engine/Message.h> -#include <qmf/engine/ConnectionSettings.h> -#include <string> - -namespace qmf { -namespace engine { - - class ResilientConnectionImpl; - - /** - * Represents events that occur, unsolicited, from ResilientConnection. - */ - struct ResilientConnectionEvent { - enum EventKind { - CONNECTED = 1, - DISCONNECTED = 2, - SESSION_CLOSED = 3, - RECV = 4 - }; - - EventKind kind; - void* sessionContext; // SESSION_CLOSED, RECV - char* errorText; // DISCONNECTED, SESSION_CLOSED - Message message; // RECV - }; - - class SessionHandle { - friend class ResilientConnectionImpl; - void* impl; - }; - - /** - * ResilientConnection represents a Qpid connection that is resilient. - * - * Upon creation, ResilientConnection attempts to establish a connection to the - * messaging broker. If it fails, it will continue to retry at an interval that - * increases over time (to a maximum interval). If an extablished connection is - * dropped, a reconnect will be attempted. - */ - class ResilientConnection { - public: - - /** - * Create a new resilient connection. - *@param settings Settings that define how the connection is to be made. - *@param delayMin Minimum delay (in seconds) between retries. - *@param delayMax Maximum delay (in seconds) between retries. - *@param delayFactor Factor to multiply retry delay by after each failure. - */ - ResilientConnection(const ConnectionSettings& settings); - ~ResilientConnection(); - - /** - * Get the connected status of the resilient connection. - *@return true iff the connection is established. - */ - bool isConnected() const; - - /** - * Get the next event (if present) from the connection. - *@param event Returned event if one is available. - *@return true if event is valid, false if there are no more events to handle. - */ - bool getEvent(ResilientConnectionEvent& event); - - /** - * Discard the event on the front of the queue. This should be invoked after processing - * the event from getEvent. - */ - void popEvent(); - - /** - * Create a new AMQP session. - *@param name Unique name for the session. - *@param sessionContext Optional user-context value that will be provided in events - * pertaining to this session. - *@param handle Output handle to be stored and used in subsequent calls pertaining to - * this session. - *@return true iff the session was successfully created. - */ - bool createSession(const char* name, void* sessionContext, SessionHandle& handle); - - /** - * Destroy a created session. - *@param handle SessionHandle returned by createSession. - */ - void destroySession(SessionHandle handle); - - /** - * Send a message into the AMQP broker via a session. - *@param handle The session handle of the session to transmit through. - *@param message The QMF message to transmit. - */ - void sendMessage(SessionHandle handle, Message& message); - - /** - * Declare an exclusive, auto-delete queue for a session. - *@param handle The session handle for the owner of the queue. - *@param queue The name of the queue. - */ - void declareQueue(SessionHandle handle, char* queue); - - /** - * Delete a queue. - *@param handle The session handle for the owner of the queue. - *@param queue The name of the queue. - */ - void deleteQueue(SessionHandle handle, char* queue); - - /** - * Bind a queue to an exchange. - *@param handle The session handle of the session to use for binding. - *@param exchange The name of the exchange for binding. - *@param queue The name of the queue for binding. - *@param key The binding key. - */ - void bind(SessionHandle handle, char* exchange, char* queue, char* key); - - /** - * Remove a binding. - *@param handle The session handle of the session to use for un-binding. - *@param exchange The name of the exchange. - *@param queue The name of the queue. - *@param key The binding key. - */ - void unbind(SessionHandle handle, char* exchange, char* queue, char* key); - - /** - * Establish a file descriptor for event notification. - *@param fd A file descriptor into which the connection shall write a character each - * time an event is enqueued. This fd may be in a pair, the other fd of which - * is used in a select loop to control execution. - */ - void setNotifyFd(int fd); - - /** - * Send a byte into the notify file descriptor. - * - * This can be used to wake up the event processing portion of the engine from either the - * wrapped implementation or the engine itself. - */ - void notify(); - - private: - ResilientConnectionImpl* impl; - }; -} -} - -#endif - diff --git a/qpid/cpp/include/qmf/engine/Schema.h b/qpid/cpp/include/qmf/engine/Schema.h index f53e84324a..18a4cef8de 100644 --- a/qpid/cpp/include/qmf/engine/Schema.h +++ b/qpid/cpp/include/qmf/engine/Schema.h @@ -20,8 +20,8 @@ * under the License. */ -#include <qmf/engine/Typecode.h> #include <qpid/sys/IntegerTypes.h> +#include <qpid/messaging/Variant.h> namespace qmf { namespace engine { @@ -29,12 +29,10 @@ namespace engine { enum Access { ACCESS_READ_CREATE = 1, ACCESS_READ_WRITE = 2, ACCESS_READ_ONLY = 3 }; enum Direction { DIR_IN = 1, DIR_OUT = 2, DIR_IN_OUT = 3 }; enum ClassKind { CLASS_OBJECT = 1, CLASS_EVENT = 2 }; - enum Severity { SEV_EMERG = 0, SEV_ALERT = 1, SEV_CRIT = 2, SEV_ERROR = 3, SEV_WARN = 4, SEV_NOTICE = 5, SEV_INFORM = 6, SEV_DEBUG = 7 }; struct SchemaArgumentImpl; struct SchemaMethodImpl; struct SchemaPropertyImpl; - struct SchemaStatisticImpl; struct SchemaObjectClassImpl; struct SchemaEventClassImpl; struct SchemaClassKeyImpl; @@ -43,14 +41,14 @@ namespace engine { */ class SchemaArgument { public: - SchemaArgument(const char* name, Typecode typecode); + SchemaArgument(const char* name, qpid::messaging::VariantType typecode); SchemaArgument(const SchemaArgument& from); ~SchemaArgument(); void setDirection(Direction dir); void setUnit(const char* val); void setDesc(const char* desc); const char* getName() const; - Typecode getType() const; + qpid::messaging::VariantType getType() const; Direction getDirection() const; const char* getUnit() const; const char* getDesc() const; @@ -89,7 +87,7 @@ namespace engine { */ class SchemaProperty { public: - SchemaProperty(const char* name, Typecode typecode); + SchemaProperty(const char* name, qpid::messaging::VariantType typecode); SchemaProperty(const SchemaProperty& from); ~SchemaProperty(); void setAccess(Access access); @@ -98,7 +96,7 @@ namespace engine { void setUnit(const char* val); void setDesc(const char* desc); const char* getName() const; - Typecode getType() const; + qpid::messaging::VariantType getType() const; Access getAccess() const; bool isIndex() const; bool isOptional() const; @@ -114,27 +112,6 @@ namespace engine { /** */ - class SchemaStatistic { - public: - SchemaStatistic(const char* name, Typecode typecode); - SchemaStatistic(const SchemaStatistic& from); - ~SchemaStatistic(); - void setUnit(const char* val); - void setDesc(const char* desc); - const char* getName() const; - Typecode getType() const; - const char* getUnit() const; - const char* getDesc() const; - - private: - friend struct SchemaStatisticImpl; - friend struct SchemaObjectClassImpl; - SchemaStatistic(SchemaStatisticImpl* impl); - SchemaStatisticImpl* impl; - }; - - /** - */ class SchemaClassKey { public: SchemaClassKey(const SchemaClassKey& from); @@ -164,15 +141,12 @@ namespace engine { SchemaObjectClass(const SchemaObjectClass& from); ~SchemaObjectClass(); void addProperty(const SchemaProperty* property); - void addStatistic(const SchemaStatistic* statistic); void addMethod(const SchemaMethod* method); const SchemaClassKey* getClassKey() const; int getPropertyCount() const; - int getStatisticCount() const; int getMethodCount() const; const SchemaProperty* getProperty(int idx) const; - const SchemaStatistic* getStatistic(int idx) const; const SchemaMethod* getMethod(int idx) const; private: @@ -187,14 +161,13 @@ namespace engine { */ class SchemaEventClass { public: - SchemaEventClass(const char* package, const char* name, Severity severity); + SchemaEventClass(const char* package, const char* name); SchemaEventClass(const SchemaEventClass& from); ~SchemaEventClass(); void addArgument(const SchemaArgument* argument); void setDesc(const char* desc); const SchemaClassKey* getClassKey() const; - Severity getSeverity() const; int getArgumentCount() const; const SchemaArgument* getArgument(int idx) const; diff --git a/qpid/cpp/include/qmf/engine/Typecode.h b/qpid/cpp/include/qmf/engine/Typecode.h deleted file mode 100644 index 613f96a483..0000000000 --- a/qpid/cpp/include/qmf/engine/Typecode.h +++ /dev/null @@ -1,53 +0,0 @@ -#ifndef _QmfEngineTypecode_ -#define _QmfEngineTypecode_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace qmf { -namespace engine { - - enum Typecode { - TYPE_UINT8 = 1, - TYPE_UINT16 = 2, - TYPE_UINT32 = 3, - TYPE_UINT64 = 4, - TYPE_SSTR = 6, - TYPE_LSTR = 7, - TYPE_ABSTIME = 8, - TYPE_DELTATIME = 9, - TYPE_REF = 10, - TYPE_BOOL = 11, - TYPE_FLOAT = 12, - TYPE_DOUBLE = 13, - TYPE_UUID = 14, - TYPE_MAP = 15, - TYPE_INT8 = 16, - TYPE_INT16 = 17, - TYPE_INT32 = 18, - TYPE_INT64 = 19, - TYPE_OBJECT = 20, - TYPE_LIST = 21, - TYPE_ARRAY = 22 - }; -} -} - -#endif - diff --git a/qpid/cpp/include/qmf/engine/Value.h b/qpid/cpp/include/qmf/engine/Value.h deleted file mode 100644 index 5b45061b78..0000000000 --- a/qpid/cpp/include/qmf/engine/Value.h +++ /dev/null @@ -1,122 +0,0 @@ -#ifndef _QmfEngineValue_ -#define _QmfEngineValue_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/engine/ObjectId.h> -#include <qmf/engine/Typecode.h> - -namespace qmf { -namespace engine { - - class Object; - struct ValueImpl; - - class Value { - public: - // Value(); - Value(const Value& from); - Value(Typecode t, Typecode arrayType = TYPE_UINT8); - ~Value(); - - Typecode getType() const; - bool isNull() const; - void setNull(); - - bool isObjectId() const; - const ObjectId& asObjectId() const; - void setObjectId(const ObjectId& oid); - - bool isUint() const; - uint32_t asUint() const; - void setUint(uint32_t val); - - bool isInt() const; - int32_t asInt() const; - void setInt(int32_t val); - - bool isUint64() const; - uint64_t asUint64() const; - void setUint64(uint64_t val); - - bool isInt64() const; - int64_t asInt64() const; - void setInt64(int64_t val); - - bool isString() const; - const char* asString() const; - void setString(const char* val); - - bool isBool() const; - bool asBool() const; - void setBool(bool val); - - bool isFloat() const; - float asFloat() const; - void setFloat(float val); - - bool isDouble() const; - double asDouble() const; - void setDouble(double val); - - bool isUuid() const; - const uint8_t* asUuid() const; - void setUuid(const uint8_t* val); - - bool isObject() const; - const Object* asObject() const; - void setObject(Object* val); - - bool isMap() const; - bool keyInMap(const char* key) const; - Value* byKey(const char* key); - const Value* byKey(const char* key) const; - void deleteKey(const char* key); - void insert(const char* key, Value* val); - uint32_t keyCount() const; - const char* key(uint32_t idx) const; - - bool isList() const; - uint32_t listItemCount() const; - Value* listItem(uint32_t idx); - void appendToList(Value* val); - void deleteListItem(uint32_t idx); - - bool isArray() const; - Typecode arrayType() const; - uint32_t arrayItemCount() const; - Value* arrayItem(uint32_t idx); - void appendToArray(Value* val); - void deleteArrayItem(uint32_t idx); - - private: - friend struct ValueImpl; - friend class BrokerProxyImpl; - friend struct ObjectImpl; - friend struct EventImpl; - friend class AgentImpl; - Value(ValueImpl* impl); - ValueImpl* impl; - }; -} -} - -#endif - diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk index 1e4c59b19e..34f93c63ed 100644 --- a/qpid/cpp/src/qmf.mk +++ b/qpid/cpp/src/qmf.mk @@ -31,9 +31,7 @@ QMF_API = \ ../include/qpid/agent/ManagementAgent.h \ ../include/qpid/agent/QmfAgentImportExport.h \ ../include/qmf/Agent.h \ - ../include/qmf/Connection.h \ ../include/qmf/QmfImportExport.h \ - ../include/qmf/ConnectionSettings.h \ ../include/qmf/AgentObject.h # @@ -41,18 +39,14 @@ QMF_API = \ # QMF_ENGINE_API = \ ../include/qmf/engine/Agent.h \ - ../include/qmf/engine/ConnectionSettings.h \ ../include/qmf/engine/Console.h \ ../include/qmf/engine/Event.h \ - ../include/qmf/engine/Message.h \ ../include/qmf/engine/Object.h \ - ../include/qmf/engine/ObjectId.h \ ../include/qmf/engine/QmfEngineImportExport.h \ ../include/qmf/engine/Query.h \ - ../include/qmf/engine/ResilientConnection.h \ - ../include/qmf/engine/Schema.h \ - ../include/qmf/engine/Typecode.h \ - ../include/qmf/engine/Value.h + ../include/qmf/engine/Schema.h + +# ../include/qmf/engine/ObjectId.h # Public header files nobase_include_HEADERS += \ @@ -67,31 +61,23 @@ libqmf_la_SOURCES = \ libqmfengine_la_SOURCES = \ $(QMF_ENGINE_API) \ qmf/engine/Agent.cpp \ - qmf/engine/BrokerProxyImpl.cpp \ - qmf/engine/BrokerProxyImpl.h \ - qmf/engine/ConnectionSettingsImpl.cpp \ - qmf/engine/ConnectionSettingsImpl.h \ - qmf/engine/ConsoleImpl.cpp \ - qmf/engine/ConsoleImpl.h \ - qmf/engine/EventImpl.cpp \ - qmf/engine/EventImpl.h \ - qmf/engine/MessageImpl.cpp \ - qmf/engine/MessageImpl.h \ - qmf/engine/ObjectIdImpl.cpp \ - qmf/engine/ObjectIdImpl.h \ qmf/engine/ObjectImpl.cpp \ qmf/engine/ObjectImpl.h \ - qmf/engine/Protocol.cpp \ - qmf/engine/Protocol.h \ + qmf/Protocol.cpp \ + qmf/Protocol.h \ qmf/engine/QueryImpl.cpp \ qmf/engine/QueryImpl.h \ - qmf/engine/ResilientConnection.cpp \ - qmf/engine/SequenceManager.cpp \ - qmf/engine/SequenceManager.h \ qmf/engine/SchemaImpl.cpp \ - qmf/engine/SchemaImpl.h \ - qmf/engine/ValueImpl.cpp \ - qmf/engine/ValueImpl.h + qmf/engine/SchemaImpl.h + +# qmf/engine/BrokerProxyImpl.cpp +# qmf/engine/BrokerProxyImpl.h +# qmf/engine/ConsoleImpl.cpp +# qmf/engine/ConsoleImpl.h +# qmf/engine/ObjectIdImpl.cpp +# qmf/engine/ObjectIdImpl.h +# qmf/engine/SequenceManager.cpp +# qmf/engine/SequenceManager.h libqmf_la_LIBADD = libqmfengine.la libqmfengine_la_LIBADD = libqpidclient.la diff --git a/qpid/cpp/src/qmf/Protocol.cpp b/qpid/cpp/src/qmf/Protocol.cpp new file mode 100644 index 0000000000..154081c9d4 --- /dev/null +++ b/qpid/cpp/src/qmf/Protocol.cpp @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/Protocol.h" + +using namespace std; +using namespace qmf; + +const string Protocol::SCHEMA_ELT_NAME("name"); +const string Protocol::SCHEMA_ELT_TYPE("type"); +const string Protocol::SCHEMA_ELT_DIR("dir"); +const string Protocol::SCHEMA_ELT_UNIT("unit"); +const string Protocol::SCHEMA_ELT_DESC("desc"); +const string Protocol::SCHEMA_ELT_ACCESS("access"); +const string Protocol::SCHEMA_ELT_OPTIONAL("optional"); +const string Protocol::SCHEMA_ARGS("args"); +const string Protocol::SCHEMA_PACKAGE("_package_name"); +const string Protocol::SCHEMA_CLASS_KIND("_type"); +const string Protocol::SCHEMA_CLASS_KIND_DATA("_data"); +const string Protocol::SCHEMA_CLASS_KIND_EVENT("_event"); +const string Protocol::SCHEMA_CLASS("_class_name"); +const string Protocol::SCHEMA_HASH("_hash_str"); +const string Protocol::AGENT_NAME("_agent_name"); +const string Protocol::OBJECT_NAME("_object_name"); +const string Protocol::SCHEMA_ID("_schema_id"); + +#if 0 +bool Protocol::checkHeader(const Message& /*msg*/, string& /*opcode*/, uint32_t* /*seq*/) +{ + // TODO + return true; +} + +void Protocol::encodeHeader(Message& /*msg*/, const string& /*opcode*/, uint32_t /*seq*/) +{ + // TODO +} +#endif diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp index fe9b84c565..ec84b4381d 100644 --- a/qpid/cpp/src/qmf/engine/Agent.cpp +++ b/qpid/cpp/src/qmf/engine/Agent.cpp @@ -18,23 +18,19 @@ */ #include "qmf/engine/Agent.h" -#include "qmf/engine/MessageImpl.h" #include "qmf/engine/SchemaImpl.h" -#include "qmf/engine/Typecode.h" -#include "qmf/engine/EventImpl.h" #include "qmf/engine/ObjectImpl.h" -#include "qmf/engine/ObjectIdImpl.h" #include "qmf/engine/QueryImpl.h" -#include "qmf/engine/ValueImpl.h" -#include "qmf/engine/Protocol.h" -#include <qpid/framing/Buffer.h> -#include <qpid/framing/Uuid.h> -#include <qpid/framing/FieldTable.h> -#include <qpid/framing/FieldValue.h> +#include "qmf/Protocol.h" #include <qpid/sys/Mutex.h> #include <qpid/log/Statement.h> #include <qpid/sys/Time.h> -#include <string.h> +#include <qpid/sys/Thread.h> +#include <qpid/sys/Runnable.h> +#include <qpid/messaging/Session.h> +#include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Sender.h> +#include <qpid/messaging/Message.h> #include <string> #include <deque> #include <map> @@ -45,8 +41,8 @@ using namespace std; using namespace qmf::engine; -using namespace qpid::framing; using namespace qpid::sys; +using namespace qpid::messaging; namespace qmf { namespace engine { @@ -59,11 +55,9 @@ namespace engine { string authToken; string name; Object* object; - boost::shared_ptr<ObjectId> objectId; + string objectKey; boost::shared_ptr<Query> query; - boost::shared_ptr<Value> arguments; - string exchange; - string bindingKey; + boost::shared_ptr<Variant::Map> arguments; const SchemaObjectClass* objectClass; AgentEventImpl(AgentEvent::EventKind k) : @@ -72,71 +66,64 @@ namespace engine { AgentEvent copy(); }; + /** + * AgentQueryContext is used to track asynchronous requests (Query, Sync, or Method) + * sent up to the application. + */ struct AgentQueryContext { typedef boost::shared_ptr<AgentQueryContext> Ptr; uint32_t sequence; - string exchange; - string key; + string consoleAddr; const SchemaMethod* schemaMethod; AgentQueryContext() : schemaMethod(0) {} }; - class AgentImpl : public boost::noncopyable { + class AgentImpl : public boost::noncopyable, public qpid::sys::Runnable { public: - AgentImpl(char* label, bool internalStore); + AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore); ~AgentImpl(); + void setAttr(const char* key, const Variant& value); void setStoreDir(const char* path); void setTransferDir(const char* path); - void handleRcvMessage(Message& message); - bool getXmtMessage(Message& item) const; - void popXmt(); bool getEvent(AgentEvent& event) const; void popEvent(); - void newSession(); - void startProtocol(); - void heartbeat(); - void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments); - void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat); + void setConnection(Connection& conn); + void methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments); + void queryResponse(uint32_t sequence, Object& object); void queryComplete(uint32_t sequence); void registerClass(SchemaObjectClass* cls); void registerClass(SchemaEventClass* cls); - const ObjectId* addObject(Object& obj, uint64_t persistId); - const ObjectId* allocObjectId(uint64_t persistId); - const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi); + const char* addObject(Object& obj, const char* key); void raiseEvent(Event& event); + void run(); + void stop(); + private: mutable Mutex lock; Mutex addLock; - string label; - string queueName; + const string vendor; + const string product; + const string name; + const string domain; + string directAddr; + map<string, Variant> attrMap; string storeDir; string transferDir; bool internalStore; - uint64_t nextTransientId; Uuid systemId; - uint32_t requestedBrokerBank; - uint32_t requestedAgentBank; - uint32_t assignedBrokerBank; - uint32_t assignedAgentBank; - AgentAttachment attachment; uint16_t bootSequence; - uint64_t nextObjectId; uint32_t nextContextNum; + bool running; deque<AgentEventImpl::Ptr> eventQueue; - deque<MessageImpl::Ptr> xmtQueue; map<uint32_t, AgentQueryContext::Ptr> contextMap; - - static const char* QMF_EXCHANGE; - static const char* DIR_EXCHANGE; - static const char* BROKER_KEY; - static const uint32_t MERR_UNKNOWN_METHOD = 2; - static const uint32_t MERR_UNKNOWN_PACKAGE = 8; - static const uint32_t MERR_UNKNOWN_CLASS = 9; - static const uint32_t MERR_INTERNAL_ERROR = 10; -# define MA_BUFFER_SIZE 65536 - char outputBuffer[MA_BUFFER_SIZE]; + Connection connection; + Session session; + Receiver directReceiver; + Receiver topicReceiver; + Sender sender; + qpid::sys::Thread* thread; struct AgentClassKey { string name; @@ -144,10 +131,6 @@ namespace engine { AgentClassKey(const string& n, const uint8_t* h) : name(n) { memcpy(hash, h, 16); } - AgentClassKey(Buffer& buffer) { - buffer.getShortString(name); - buffer.getBin128(hash); - } string repr() { return name; } @@ -176,37 +159,29 @@ namespace engine { map<string, ClassMaps> packages; - AgentEventImpl::Ptr eventDeclareQueue(const string& queueName); - AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); - AgentEventImpl::Ptr eventSetupComplete(); AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls, - boost::shared_ptr<ObjectId> oid); + const string& key); AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method, - boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap, + const string& key, boost::shared_ptr<Variant::Map> argMap, const SchemaObjectClass* objectClass); - void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); + void handleRcvMessageLH(qpid::messaging::Message& message); void sendPackageIndicationLH(const string& packageName); void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key); void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq, uint32_t code = 0, const string& text = "OK"); void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text=""); - void handleAttachResponse(Buffer& inBuffer); - void handlePackageRequest(Buffer& inBuffer); - void handleClassQuery(Buffer& inBuffer); - void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, + void handleAttachResponse(Message& msg); + void handlePackageRequest(Message& msg); + void handleClassQuery(Message& msg); + void handleSchemaRequest(Message& msg, uint32_t sequence, const string& replyToExchange, const string& replyToKey); - void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId); - void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId); - void handleConsoleAddedIndication(); + void handleGetQuery(Message& msg, uint32_t sequence, const string& replyTo, const string& userId); + void handleMethodRequest(Message& msg, uint32_t sequence, const string& replyTo, const string& userId); }; } } -const char* AgentImpl::QMF_EXCHANGE = "qpid.management"; -const char* AgentImpl::DIR_EXCHANGE = "amq.direct"; -const char* AgentImpl::BROKER_KEY = "broker"; - #define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} AgentEvent AgentEventImpl::copy() @@ -217,33 +192,38 @@ AgentEvent AgentEventImpl::copy() item.kind = kind; item.sequence = sequence; item.object = object; - item.objectId = objectId.get(); item.query = query.get(); item.arguments = arguments.get(); item.objectClass = objectClass; + STRING_REF(objectKey); STRING_REF(authUserId); STRING_REF(authToken); STRING_REF(name); - STRING_REF(exchange); - STRING_REF(bindingKey); return item; } -AgentImpl::AgentImpl(char* _label, bool i) : - label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1), - requestedBrokerBank(0), requestedAgentBank(0), - assignedBrokerBank(0), assignedAgentBank(0), - bootSequence(1), nextObjectId(1), nextContextNum(1) +AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) : + vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i), + bootSequence(1), nextContextNum(1), running(true), thread(0) { - queueName += label; + directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name; + if (_d == 0) { + directAddr += " { create:always }"; + } } + AgentImpl::~AgentImpl() { } +void AgentImpl::setAttr(const char* key, const Variant& value) +{ + attrMap.insert(pair<string, Variant>(key, value)); +} + void AgentImpl::setStoreDir(const char* path) { Mutex::ScopedLock _lock(lock); @@ -262,6 +242,7 @@ void AgentImpl::setTransferDir(const char* path) transferDir.clear(); } +/* void AgentImpl::handleRcvMessage(Message& message) { Buffer inBuffer(message.body, message.length); @@ -283,22 +264,7 @@ void AgentImpl::handleRcvMessage(Message& message) } } } - -bool AgentImpl::getXmtMessage(Message& item) const -{ - Mutex::ScopedLock _lock(lock); - if (xmtQueue.empty()) - return false; - item = xmtQueue.front()->copy(); - return true; -} - -void AgentImpl::popXmt() -{ - Mutex::ScopedLock _lock(lock); - if (!xmtQueue.empty()) - xmtQueue.pop_front(); -} +*/ bool AgentImpl::getEvent(AgentEvent& event) const { @@ -316,47 +282,16 @@ void AgentImpl::popEvent() eventQueue.pop_front(); } -void AgentImpl::newSession() -{ - Mutex::ScopedLock _lock(lock); - eventQueue.clear(); - xmtQueue.clear(); - eventQueue.push_back(eventDeclareQueue(queueName)); - eventQueue.push_back(eventBind("amq.direct", queueName, queueName)); - eventQueue.push_back(eventSetupComplete()); -} - -void AgentImpl::startProtocol() -{ - Mutex::ScopedLock _lock(lock); - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST); - buffer.putShortString("qmfa"); - systemId.encode(buffer); - buffer.putLong(requestedBrokerBank); - buffer.putLong(requestedAgentBank); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank << - " reqAgent=" << requestedAgentBank); -} - -void AgentImpl::heartbeat() +void AgentImpl::setConnection(Connection& conn) { Mutex::ScopedLock _lock(lock); - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - - Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION); - buffer.putLongLong(uint64_t(Duration(now()))); - stringstream key; - key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; - sendBufferLH(buffer, QMF_EXCHANGE, key.str()); - QPID_LOG(trace, "SENT HeartbeatIndication"); + if (connection == 0) + return; + connection = conn; + thread = new qpid::sys::Thread(*this); } -void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, - const Value& argMap) +void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& /*argMap*/) { Mutex::ScopedLock _lock(lock); map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); @@ -365,30 +300,11 @@ void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, AgentQueryContext::Ptr context = iter->second; contextMap.erase(iter); - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence); - buffer.putLong(status); - buffer.putMediumString(text); - if (status == 0) { - for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin(); - aIter != context->schemaMethod->impl->arguments.end(); aIter++) { - const SchemaArgument* schemaArg = *aIter; - if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) { - if (argMap.keyInMap(schemaArg->getName())) { - const Value* val = argMap.byKey(schemaArg->getName()); - val->impl->encode(buffer); - } else { - Value val(schemaArg->getType()); - val.impl->encode(buffer); - } - } - } - } - sendBufferLH(buffer, context->exchange, context->key); + // TODO: Encode method response QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text); } -void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) +void AgentImpl::queryResponse(uint32_t sequence, Object&) { Mutex::ScopedLock _lock(lock); map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); @@ -396,17 +312,8 @@ void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool return; AgentQueryContext::Ptr context = iter->second; - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence); - - object.impl->encodeSchemaKey(buffer); - object.impl->encodeManagedObjectData(buffer); - if (prop) - object.impl->encodeProperties(buffer); - if (stat) - object.impl->encodeStatistics(buffer); - - sendBufferLH(buffer, context->exchange, context->key); + // TODO: accumulate data records and send response messages when we have "enough" + QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence); } @@ -417,9 +324,11 @@ void AgentImpl::queryComplete(uint32_t sequence) if (iter == contextMap.end()) return; + // TODO: send a response message if there are any unsent data records + AgentQueryContext::Ptr context = iter->second; contextMap.erase(iter); - sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK"); + //sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK"); } void AgentImpl::registerClass(SchemaObjectClass* cls) @@ -456,413 +365,148 @@ void AgentImpl::registerClass(SchemaEventClass* cls) // TODO: Indicate this schema if connected. } -const ObjectId* AgentImpl::addObject(Object&, uint64_t) +const char* AgentImpl::addObject(Object&, const char*) { Mutex::ScopedLock _lock(lock); return 0; } -const ObjectId* AgentImpl::allocObjectId(uint64_t persistId) +void AgentImpl::raiseEvent(Event&) { Mutex::ScopedLock _lock(lock); - uint16_t sequence = persistId ? 0 : bootSequence; - uint64_t objectNum = persistId ? persistId : nextObjectId++; - - ObjectId* oid = ObjectIdImpl::factory(&attachment, 0, sequence, objectNum); - return oid; } -const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) +void AgentImpl::run() { - return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo); -} - -void AgentImpl::raiseEvent(Event& event) -{ - Mutex::ScopedLock _lock(lock); - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_EVENT_INDICATION); - - event.impl->encodeSchemaKey(buffer); - buffer.putLongLong(uint64_t(Duration(now()))); - event.impl->encode(buffer); - string key(event.impl->getRoutingKey(assignedBrokerBank, assignedAgentBank)); + qpid::sys::Duration duration = qpid::sys::TIME_MSEC * 500; - sendBufferLH(buffer, QMF_EXCHANGE, key); - QPID_LOG(trace, "SENT EventIndication"); -} - -AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name) -{ - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE)); - event->name = name; + session = connection.newSession(); + directReceiver = session.createReceiver(directAddr); + directReceiver.setCapacity(10); - return event; -} + Mutex::ScopedLock _lock(lock); + while (running) { + Receiver rcvr; + bool available; + { + Mutex::ScopedUnlock _unlock(lock); + available = session.nextReceiver(rcvr, duration); + } -AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue, - const string& key) -{ - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND)); - event->name = queue; - event->exchange = exchange; - event->bindingKey = key; + if (available) { + Message msg(rcvr.get()); + handleRcvMessageLH(msg); + } + } - return event; + directReceiver.close(); + session.close(); } -AgentEventImpl::Ptr AgentImpl::eventSetupComplete() +void AgentImpl::stop() { - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE)); - return event; + Mutex::ScopedLock _lock(lock); + running = false; } -AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package, - const string& cls, boost::shared_ptr<ObjectId> oid) +AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); event->sequence = num; event->authUserId = userId; - if (oid.get()) - event->query.reset(new Query(oid.get())); - else - event->query.reset(new Query(cls.c_str(), package.c_str())); + event->objectKey = key; return event; } AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method, - boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap, - const SchemaObjectClass* objectClass) + const string& key, boost::shared_ptr<Variant::Map> argMap, + const SchemaObjectClass* objectClass) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL)); event->sequence = num; event->authUserId = userId; event->name = method; - event->objectId = oid; + event->objectKey = key; event->arguments = argMap; event->objectClass = objectClass; return event; } -void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) +void AgentImpl::handleRcvMessageLH(qpid::messaging::Message& /*msg*/) { - uint32_t length = buf.getPosition(); - MessageImpl::Ptr message(new MessageImpl); - - buf.reset(); - buf.getRawData(message->body, length); - message->destination = destination; - message->routingKey = routingKey; - message->replyExchange = "amq.direct"; - message->replyKey = queueName; - - xmtQueue.push_back(message); } void AgentImpl::sendPackageIndicationLH(const string& packageName) { - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION); - buffer.putShortString(packageName); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + // TODO QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName); } -void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key) +void AgentImpl::sendClassIndicationLH(ClassKind /*kind*/, const string& packageName, const AgentClassKey& key) { - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION); - buffer.putOctet((int) kind); - buffer.putShortString(packageName); - buffer.putShortString(key.name); - buffer.putBin128(const_cast<uint8_t*>(key.hash)); // const_cast needed for older Qpid libraries - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + // TODO QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name); } -void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey, - uint32_t sequence, uint32_t code, const string& text) +void AgentImpl::sendCommandCompleteLH(const string&, const string&, uint32_t sequence, uint32_t code, const string& text) { - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence); - buffer.putLong(code); - buffer.putShortString(text); - sendBufferLH(buffer, exchange, replyToKey); + // TODO QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); } -void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text) -{ - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence); - buffer.putLong(code); - - string fulltext; - switch (code) { - case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break; - case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break; - case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break; - case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break; - default: fulltext = "Unspecified Error"; break; - } - - if (!text.empty()) { - fulltext += " ("; - fulltext += text; - fulltext += ")"; - } - - buffer.putMediumString(fulltext); - sendBufferLH(buffer, DIR_EXCHANGE, key); - QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext); -} - -void AgentImpl::handleAttachResponse(Buffer& inBuffer) +void AgentImpl::sendMethodErrorLH(uint32_t /*sequence*/, const string& /*key*/, uint32_t code, const string& text) { - Mutex::ScopedLock _lock(lock); - - assignedBrokerBank = inBuffer.getLong(); - assignedAgentBank = inBuffer.getLong(); - - QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank); - - if ((assignedBrokerBank != requestedBrokerBank) || - (assignedAgentBank != requestedAgentBank)) { - if (requestedAgentBank == 0) { - QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." << - assignedAgentBank); - } else { - QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank << - "." << assignedAgentBank); - } - //storeData(); // TODO - requestedBrokerBank = assignedBrokerBank; - requestedAgentBank = assignedAgentBank; - } - - attachment.setBanks(assignedBrokerBank, assignedAgentBank); - - // Bind to qpid.management to receive commands - stringstream key; - key << "agent." << assignedBrokerBank << "." << assignedAgentBank; - eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str())); - - // Send package indications for all local packages - for (map<string, ClassMaps>::iterator pIter = packages.begin(); - pIter != packages.end(); - pIter++) { - sendPackageIndicationLH(pIter->first); - - // Send class indications for all local classes - ClassMaps cMap = pIter->second; - for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin(); - cIter != cMap.objectClasses.end(); cIter++) - sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first); - for (EventClassMap::iterator cIter = cMap.eventClasses.begin(); - cIter != cMap.eventClasses.end(); cIter++) - sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first); - } + // TODO + QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << text); } -void AgentImpl::handlePackageRequest(Buffer&) +void AgentImpl::handlePackageRequest(Message&) { Mutex::ScopedLock _lock(lock); } -void AgentImpl::handleClassQuery(Buffer&) +void AgentImpl::handleClassQuery(Message&) { Mutex::ScopedLock _lock(lock); } -void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, - const string& replyExchange, const string& replyKey) +void AgentImpl::handleSchemaRequest(Message&, uint32_t, const string&, const string&) { Mutex::ScopedLock _lock(lock); - string rExchange(replyExchange); - string rKey(replyKey); - string packageName; - inBuffer.getShortString(packageName); - AgentClassKey key(inBuffer); - - if (rExchange.empty()) - rExchange = QMF_EXCHANGE; - if (rKey.empty()) - rKey = BROKER_KEY; - - QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name); - - map<string, ClassMaps>::iterator pIter = packages.find(packageName); - if (pIter == packages.end()) { - sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found"); - return; - } - - ClassMaps cMap = pIter->second; - ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key); - if (ocIter != cMap.objectClasses.end()) { - SchemaObjectClass* oImpl = ocIter->second; - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); - oImpl->impl->encode(buffer); - sendBufferLH(buffer, rExchange, rKey); - QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name); - return; - } - - EventClassMap::iterator ecIter = cMap.eventClasses.find(key); - if (ecIter != cMap.eventClasses.end()) { - SchemaEventClass* eImpl = ecIter->second; - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); - eImpl->impl->encode(buffer); - sendBufferLH(buffer, rExchange, rKey); - QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name); - return; - } - - sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found"); } -void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId) +void AgentImpl::handleGetQuery(Message&, uint32_t, const string&, const string&) { Mutex::ScopedLock _lock(lock); - FieldTable ft; - FieldTable::ValuePtr value; - map<string, ClassMaps>::const_iterator pIter = packages.end(); - string pname; - string cname; - string oidRepr; - boost::shared_ptr<ObjectId> oid; - - ft.decode(inBuffer); - - QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft); - - value = ft.get("_package"); - if (value.get() && value->convertsTo<string>()) { - pname = value->get<string>(); - pIter = packages.find(pname); - if (pIter == packages.end()) { - sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence); - return; - } - } - - value = ft.get("_class"); - if (value.get() && value->convertsTo<string>()) { - cname = value->get<string>(); - // TODO - check for validity of class (in package or any package) - if (pIter == packages.end()) { - } else { - - } - } - - value = ft.get("_objectid"); - if (value.get() && value->convertsTo<string>()) { - oidRepr = value->get<string>(); - oid.reset(new ObjectId()); - oid->impl->fromString(oidRepr); - } - - AgentQueryContext::Ptr context(new AgentQueryContext); - uint32_t contextNum = nextContextNum++; - context->sequence = sequence; - context->exchange = DIR_EXCHANGE; - context->key = replyTo; - contextMap[contextNum] = context; - - eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid)); } -void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId) +void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const string& /*replyTo*/, const string& /*userId*/) { Mutex::ScopedLock _lock(lock); - string pname; - string method; - boost::shared_ptr<ObjectId> oid(ObjectIdImpl::factory(buffer)); - buffer.getShortString(pname); - AgentClassKey classKey(buffer); - buffer.getShortString(method); - - QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method); - - map<string, ClassMaps>::const_iterator pIter = packages.find(pname); - if (pIter == packages.end()) { - sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname); - return; - } - - ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey); - if (cIter == pIter->second.objectClasses.end()) { - sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr()); - return; - } - - const SchemaObjectClass* schema = cIter->second; - vector<const SchemaMethod*>::const_iterator mIter = schema->impl->methods.begin(); - for (; mIter != schema->impl->methods.end(); mIter++) { - if ((*mIter)->getName() == method) - break; - } - - if (mIter == schema->impl->methods.end()) { - sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method); - return; - } - - const SchemaMethod* schemaMethod = *mIter; - boost::shared_ptr<Value> argMap(new Value(TYPE_MAP)); - Value* value; - for (vector<const SchemaArgument*>::const_iterator aIter = schemaMethod->impl->arguments.begin(); - aIter != schemaMethod->impl->arguments.end(); aIter++) { - const SchemaArgument* schemaArg = *aIter; - if (schemaArg->getDirection() == DIR_IN || schemaArg->getDirection() == DIR_IN_OUT) - value = ValueImpl::factory(schemaArg->getType(), buffer); - else - value = ValueImpl::factory(schemaArg->getType()); - argMap->insert(schemaArg->getName(), value); - } + QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method="); AgentQueryContext::Ptr context(new AgentQueryContext); uint32_t contextNum = nextContextNum++; - context->sequence = sequence; - context->exchange = DIR_EXCHANGE; - context->key = replyTo; - context->schemaMethod = schemaMethod; contextMap[contextNum] = context; - - eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema)); -} - -void AgentImpl::handleConsoleAddedIndication() -{ - Mutex::ScopedLock _lock(lock); } //================================================================== // Wrappers //================================================================== -Agent::Agent(char* label, bool internalStore) { impl = new AgentImpl(label, internalStore); } +Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); } Agent::~Agent() { delete impl; } +void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); } void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); } void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); } -void Agent::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } -bool Agent::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } -void Agent::popXmt() { impl->popXmt(); } bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); } void Agent::popEvent() { impl->popEvent(); } -void Agent::newSession() { impl->newSession(); } -void Agent::startProtocol() { impl->startProtocol(); } -void Agent::heartbeat() { impl->heartbeat(); } -void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); } -void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); } +void Agent::setConnection(Connection& conn) { impl->setConnection(conn); } +void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments) { impl->methodResponse(sequence, status, text, arguments); } +void Agent::queryResponse(uint32_t sequence, Object& object) { impl->queryResponse(sequence, object); } void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); } void Agent::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); } void Agent::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); } -const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); } -const ObjectId* Agent::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); } -const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); } +const char* Agent::addObject(Object& obj, const char* key) { return impl->addObject(obj, key); } void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); } diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h index b651b52345..031eb698e0 100644 --- a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h +++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h @@ -23,12 +23,11 @@ #include "qmf/engine/Console.h" #include "qmf/engine/ObjectImpl.h" #include "qmf/engine/SchemaImpl.h" -#include "qmf/engine/ValueImpl.h" #include "qmf/engine/QueryImpl.h" #include "qmf/engine/SequenceManager.h" #include "qmf/engine/MessageImpl.h" -#include "qpid/framing/Buffer.h" #include "qpid/framing/Uuid.h" +#include "qpid/messaging/Variant.h" #include "qpid/sys/Mutex.h" #include "boost/shared_ptr.hpp" #include "boost/noncopyable.hpp" @@ -46,8 +45,8 @@ namespace engine { struct MethodResponseImpl { uint32_t status; const SchemaMethod* schema; - std::auto_ptr<Value> exception; - std::auto_ptr<Value> arguments; + std::auto_ptr<qpid::messaging::Variant> exception; + std::auto_ptr<qpid::messaging::Variant::Map> arguments; MethodResponseImpl(const MethodResponseImpl& from); MethodResponseImpl(qpid::framing::Buffer& buf, const SchemaMethod* schema); @@ -56,14 +55,14 @@ namespace engine { static MethodResponse* factory(uint32_t status, const std::string& text); ~MethodResponseImpl() {} uint32_t getStatus() const { return status; } - const Value* getException() const { return exception.get(); } - const Value* getArgs() const { return arguments.get(); } + const qpid::messaging::Variant* getException() const { return exception.get(); } + const qpid::messaging::Variant::Map* getArgs() const { return arguments.get(); } }; typedef boost::shared_ptr<QueryResponse> QueryResponsePtr; struct QueryResponseImpl { uint32_t status; - std::auto_ptr<Value> exception; + std::auto_ptr<qpid::messaging::Variant> exception; std::vector<ObjectPtr> results; QueryResponseImpl() : status(0) {} @@ -73,7 +72,7 @@ namespace engine { } ~QueryResponseImpl() {} uint32_t getStatus() const { return status; } - const Value* getException() const { return exception.get(); } + const qpid::messaging::Variant* getException() const { return exception.get(); } uint32_t getObjectCount() const { return results.size(); } const Object* getObject(uint32_t idx) const; }; @@ -140,8 +139,8 @@ namespace engine { const AgentProxy* getAgent(uint32_t idx) const; void sendQuery(const Query& query, void* context, const AgentProxy* agent); bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent); - std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer); - void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context); + std::string encodeMethodArguments(const SchemaMethod* schema, const qpid::messaging::Variant::Map* args, qpid::framing::Buffer& buffer); + void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context); void addBinding(const std::string& exchange, const std::string& key); void staticRelease() { decOutstanding(); } @@ -219,6 +218,9 @@ namespace engine { QueryResponsePtr queryResponse; }; + // + // MethodContext is used to track and handle the response associated with a single Method Request + // struct MethodContext : public SequenceContext { MethodContext(BrokerProxyImpl& b, void* u, const SchemaMethod* s) : broker(b), userContext(u), schema(s) {} virtual ~MethodContext() {} diff --git a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp deleted file mode 100644 index 22a65f28ca..0000000000 --- a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/ConnectionSettingsImpl.h" -#include "qmf/engine/Typecode.h" - -using namespace std; -using namespace qmf::engine; -using namespace qpid; - -const string attrProtocol("protocol"); -const string attrHost("host"); -const string attrPort("port"); -const string attrVirtualhost("virtualhost"); -const string attrUsername("username"); -const string attrPassword("password"); -const string attrMechanism("mechanism"); -const string attrLocale("locale"); -const string attrHeartbeat("heartbeat"); -const string attrMaxChannels("maxChannels"); -const string attrMaxFrameSize("maxFrameSize"); -const string attrBounds("bounds"); -const string attrTcpNoDelay("tcpNoDelay"); -const string attrService("service"); -const string attrMinSsf("minSsf"); -const string attrMaxSsf("maxSsf"); -const string attrRetryDelayMin("retryDelayMin"); -const string attrRetryDelayMax("retryDelayMax"); -const string attrRetryDelayFactor("retryDelayFactor"); -const string attrSendUserId("sendUserId"); - -ConnectionSettingsImpl::ConnectionSettingsImpl() : - retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2), sendUserId(true) -{ -} - -ConnectionSettingsImpl::ConnectionSettingsImpl(const string& /*url*/) : - retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2), sendUserId(true) -{ - // TODO: Parse the URL -} - -bool ConnectionSettingsImpl::setAttr(const string& key, const Value& value) -{ - if (key == attrProtocol) clientSettings.protocol = value.asString(); - else if (key == attrHost) clientSettings.host = value.asString(); - else if (key == attrPort) clientSettings.port = value.asUint(); - else if (key == attrVirtualhost) clientSettings.virtualhost = value.asString(); - else if (key == attrUsername) clientSettings.username = value.asString(); - else if (key == attrPassword) clientSettings.password = value.asString(); - else if (key == attrMechanism) clientSettings.mechanism = value.asString(); - else if (key == attrLocale) clientSettings.locale = value.asString(); - else if (key == attrHeartbeat) clientSettings.heartbeat = value.asUint(); - else if (key == attrMaxChannels) clientSettings.maxChannels = value.asUint(); - else if (key == attrMaxFrameSize) clientSettings.maxFrameSize = value.asUint(); - else if (key == attrBounds) clientSettings.bounds = value.asUint(); - else if (key == attrTcpNoDelay) clientSettings.tcpNoDelay = value.asBool(); - else if (key == attrService) clientSettings.service = value.asString(); - else if (key == attrMinSsf) clientSettings.minSsf = value.asUint(); - else if (key == attrMaxSsf) clientSettings.maxSsf = value.asUint(); - - else if (key == attrRetryDelayMin) retryDelayMin = value.asUint(); - else if (key == attrRetryDelayMax) retryDelayMax = value.asUint(); - else if (key == attrRetryDelayFactor) retryDelayFactor = value.asUint(); - else if (key == attrSendUserId) sendUserId = value.asBool(); - else - return false; - return true; -} - -Value ConnectionSettingsImpl::getAttr(const string& key) const -{ - Value strval(TYPE_LSTR); - Value intval(TYPE_UINT32); - Value boolval(TYPE_BOOL); - - if (key == attrProtocol) { - strval.setString(clientSettings.protocol.c_str()); - return strval; - } - - if (key == attrHost) { - strval.setString(clientSettings.host.c_str()); - return strval; - } - - if (key == attrPort) { - intval.setUint(clientSettings.port); - return intval; - } - - if (key == attrVirtualhost) { - strval.setString(clientSettings.virtualhost.c_str()); - return strval; - } - - if (key == attrUsername) { - strval.setString(clientSettings.username.c_str()); - return strval; - } - - if (key == attrPassword) { - strval.setString(clientSettings.password.c_str()); - return strval; - } - - if (key == attrMechanism) { - strval.setString(clientSettings.mechanism.c_str()); - return strval; - } - - if (key == attrLocale) { - strval.setString(clientSettings.locale.c_str()); - return strval; - } - - if (key == attrHeartbeat) { - intval.setUint(clientSettings.heartbeat); - return intval; - } - - if (key == attrMaxChannels) { - intval.setUint(clientSettings.maxChannels); - return intval; - } - - if (key == attrMaxFrameSize) { - intval.setUint(clientSettings.maxFrameSize); - return intval; - } - - if (key == attrBounds) { - intval.setUint(clientSettings.bounds); - return intval; - } - - if (key == attrTcpNoDelay) { - boolval.setBool(clientSettings.tcpNoDelay); - return boolval; - } - - if (key == attrService) { - strval.setString(clientSettings.service.c_str()); - return strval; - } - - if (key == attrMinSsf) { - intval.setUint(clientSettings.minSsf); - return intval; - } - - if (key == attrMaxSsf) { - intval.setUint(clientSettings.maxSsf); - return intval; - } - - if (key == attrRetryDelayMin) { - intval.setUint(retryDelayMin); - return intval; - } - - if (key == attrRetryDelayMax) { - intval.setUint(retryDelayMax); - return intval; - } - - if (key == attrRetryDelayFactor) { - intval.setUint(retryDelayFactor); - return intval; - } - - if (key == attrSendUserId) { - boolval.setBool(sendUserId); - return boolval; - } - - return strval; -} - -const string& ConnectionSettingsImpl::getAttrString() const -{ - // TODO: build and return attribute string - return attrString; -} - -void ConnectionSettingsImpl::transportTcp(uint16_t port) -{ - clientSettings.protocol = "tcp"; - clientSettings.port = port; -} - -void ConnectionSettingsImpl::transportSsl(uint16_t port) -{ - clientSettings.protocol = "ssl"; - clientSettings.port = port; -} - -void ConnectionSettingsImpl::transportRdma(uint16_t port) -{ - clientSettings.protocol = "rdma"; - clientSettings.port = port; -} - -void ConnectionSettingsImpl::authAnonymous(const string& username) -{ - clientSettings.mechanism = "ANONYMOUS"; - clientSettings.username = username; -} - -void ConnectionSettingsImpl::authPlain(const string& username, const string& password) -{ - clientSettings.mechanism = "PLAIN"; - clientSettings.username = username; - clientSettings.password = password; -} - -void ConnectionSettingsImpl::authGssapi(const string& serviceName, uint32_t minSsf, uint32_t maxSsf) -{ - clientSettings.mechanism = "GSSAPI"; - clientSettings.service = serviceName; - clientSettings.minSsf = minSsf; - clientSettings.maxSsf = maxSsf; -} - -void ConnectionSettingsImpl::setRetry(int delayMin, int delayMax, int delayFactor) -{ - retryDelayMin = delayMin; - retryDelayMax = delayMax; - retryDelayFactor = delayFactor; -} - -const client::ConnectionSettings& ConnectionSettingsImpl::getClientSettings() const -{ - return clientSettings; -} - -void ConnectionSettingsImpl::getRetrySettings(int* min, int* max, int* factor) const -{ - *min = retryDelayMin; - *max = retryDelayMax; - *factor = retryDelayFactor; -} - -//================================================================== -// Wrappers -//================================================================== - -ConnectionSettings::ConnectionSettings(const ConnectionSettings& from) { impl = new ConnectionSettingsImpl(*from.impl); } -ConnectionSettings::ConnectionSettings() { impl = new ConnectionSettingsImpl(); } -ConnectionSettings::ConnectionSettings(const char* url) { impl = new ConnectionSettingsImpl(url); } -ConnectionSettings::~ConnectionSettings() { delete impl; } -bool ConnectionSettings::setAttr(const char* key, const Value& value) { return impl->setAttr(key, value); } -Value ConnectionSettings::getAttr(const char* key) const { return impl->getAttr(key); } -const char* ConnectionSettings::getAttrString() const { return impl->getAttrString().c_str(); } -void ConnectionSettings::transportTcp(uint16_t port) { impl->transportTcp(port); } -void ConnectionSettings::transportSsl(uint16_t port) { impl->transportSsl(port); } -void ConnectionSettings::transportRdma(uint16_t port) { impl->transportRdma(port); } -void ConnectionSettings::authAnonymous(const char* username) { impl->authAnonymous(username); } -void ConnectionSettings::authPlain(const char* username, const char* password) { impl->authPlain(username, password); } -void ConnectionSettings::authGssapi(const char* serviceName, uint32_t minSsf, uint32_t maxSsf) { impl->authGssapi(serviceName, minSsf, maxSsf); } -void ConnectionSettings::setRetry(int delayMin, int delayMax, int delayFactor) { impl->setRetry(delayMin, delayMax, delayFactor); } - diff --git a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h deleted file mode 100644 index 98bf87868b..0000000000 --- a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h +++ /dev/null @@ -1,63 +0,0 @@ -#ifndef _QmfEngineConnectionSettingsImpl_ -#define _QmfEngineConnectionSettingsImpl_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/ConnectionSettings.h" -#include "qmf/engine/Value.h" -#include "qpid/client/ConnectionSettings.h" -#include <string> -#include <map> - -namespace qmf { -namespace engine { - - class ConnectionSettingsImpl { - qpid::client::ConnectionSettings clientSettings; - mutable std::string attrString; - int retryDelayMin; - int retryDelayMax; - int retryDelayFactor; - bool sendUserId; - - public: - ConnectionSettingsImpl(); - ConnectionSettingsImpl(const std::string& url); - ~ConnectionSettingsImpl() {} - bool setAttr(const std::string& key, const Value& value); - Value getAttr(const std::string& key) const; - const std::string& getAttrString() const; - void transportTcp(uint16_t port); - void transportSsl(uint16_t port); - void transportRdma(uint16_t port); - void authAnonymous(const std::string& username); - void authPlain(const std::string& username, const std::string& password); - void authGssapi(const std::string& serviceName, uint32_t minSsf, uint32_t maxSsf); - void setRetry(int delayMin, int delayMax, int delayFactor); - - const qpid::client::ConnectionSettings& getClientSettings() const; - void getRetrySettings(int* delayMin, int* delayMax, int* delayFactor) const; - bool getSendUserId() const { return sendUserId; } - }; - -} -} - -#endif diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.h b/qpid/cpp/src/qmf/engine/ConsoleImpl.h index 8f99c5e6b9..5e9783b8a0 100644 --- a/qpid/cpp/src/qmf/engine/ConsoleImpl.h +++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.h @@ -23,18 +23,13 @@ #include "qmf/engine/Console.h" #include "qmf/engine/MessageImpl.h" #include "qmf/engine/SchemaImpl.h" -#include "qmf/engine/Typecode.h" #include "qmf/engine/ObjectImpl.h" #include "qmf/engine/ObjectIdImpl.h" #include "qmf/engine/QueryImpl.h" -#include "qmf/engine/ValueImpl.h" #include "qmf/engine/Protocol.h" #include "qmf/engine/SequenceManager.h" #include "qmf/engine/BrokerProxyImpl.h" -#include <qpid/framing/Buffer.h> #include <qpid/framing/Uuid.h> -#include <qpid/framing/FieldTable.h> -#include <qpid/framing/FieldValue.h> #include <qpid/sys/Mutex.h> #include <qpid/sys/Time.h> #include <qpid/sys/SystemInfo.h> diff --git a/qpid/cpp/src/qmf/engine/MessageImpl.cpp b/qpid/cpp/src/qmf/engine/MessageImpl.cpp deleted file mode 100644 index 0047d3eb9d..0000000000 --- a/qpid/cpp/src/qmf/engine/MessageImpl.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/MessageImpl.h" -#include <string.h> - -using namespace std; -using namespace qmf::engine; - -#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} - -Message MessageImpl::copy() -{ - Message item; - - ::memset(&item, 0, sizeof(Message)); - item.body = const_cast<char*>(body.c_str()); - item.length = body.length(); - STRING_REF(destination); - STRING_REF(routingKey); - STRING_REF(replyExchange); - STRING_REF(replyKey); - STRING_REF(userId); - - return item; -} - diff --git a/qpid/cpp/src/qmf/engine/MessageImpl.h b/qpid/cpp/src/qmf/engine/MessageImpl.h deleted file mode 100644 index b91291d2e4..0000000000 --- a/qpid/cpp/src/qmf/engine/MessageImpl.h +++ /dev/null @@ -1,44 +0,0 @@ -#ifndef _QmfEngineMessageImpl_ -#define _QmfEngineMessageImpl_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/Message.h" -#include <string> -#include <boost/shared_ptr.hpp> - -namespace qmf { -namespace engine { - - struct MessageImpl { - typedef boost::shared_ptr<MessageImpl> Ptr; - std::string body; - std::string destination; - std::string routingKey; - std::string replyExchange; - std::string replyKey; - std::string userId; - - Message copy(); - }; -} -} - -#endif diff --git a/qpid/cpp/src/qmf/engine/ObjectImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp index cae0e0da68..4256b9119e 100644 --- a/qpid/cpp/src/qmf/engine/ObjectImpl.cpp +++ b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp @@ -18,215 +18,50 @@ */ #include "qmf/engine/ObjectImpl.h" -#include "qmf/engine/ValueImpl.h" -#include "qmf/engine/BrokerProxyImpl.h" #include <qpid/sys/Time.h> using namespace std; using namespace qmf::engine; using namespace qpid::sys; -using qpid::framing::Buffer; +using namespace qpid::messaging; -ObjectImpl::ObjectImpl(const SchemaObjectClass* type) : objectClass(type), broker(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime) +ObjectImpl::ObjectImpl() : + objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime) { - int propCount = objectClass->getPropertyCount(); - int statCount = objectClass->getStatisticCount(); - int idx; - - for (idx = 0; idx < propCount; idx++) { - const SchemaProperty* prop = objectClass->getProperty(idx); - properties[prop->getName()] = ValuePtr(new Value(prop->getType())); - } - - for (idx = 0; idx < statCount; idx++) { - const SchemaStatistic* stat = objectClass->getStatistic(idx); - statistics[stat->getName()] = ValuePtr(new Value(stat->getType())); - } -} - -ObjectImpl::ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) : - objectClass(type), broker(b), createTime(0), destroyTime(0), lastUpdatedTime(0) -{ - int idx; - - if (managed) { - lastUpdatedTime = buffer.getLongLong(); - createTime = buffer.getLongLong(); - destroyTime = buffer.getLongLong(); - objectId.reset(ObjectIdImpl::factory(buffer)); - } - - if (prop) { - int propCount = objectClass->getPropertyCount(); - set<string> excludes; - parsePresenceMasks(buffer, excludes); - for (idx = 0; idx < propCount; idx++) { - const SchemaProperty* prop = objectClass->getProperty(idx); - if (excludes.count(prop->getName()) != 0) { - properties[prop->getName()] = ValuePtr(new Value(prop->getType())); - } else { - Value* pval = ValueImpl::factory(prop->getType(), buffer); - properties[prop->getName()] = ValuePtr(pval); - } - } - } - - if (stat) { - int statCount = objectClass->getStatisticCount(); - for (idx = 0; idx < statCount; idx++) { - const SchemaStatistic* stat = objectClass->getStatistic(idx); - Value* sval = ValueImpl::factory(stat->getType(), buffer); - statistics[stat->getName()] = ValuePtr(sval); - } - } } -Object* ObjectImpl::factory(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) -{ - ObjectImpl* impl(new ObjectImpl(type, b, buffer, prop, stat, managed)); - return new Object(impl); -} -ObjectImpl::~ObjectImpl() +ObjectImpl::ObjectImpl(SchemaObjectClass* type) : + objectClass(type), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime) { } -void ObjectImpl::destroy() -{ - destroyTime = uint64_t(Duration(now())); - // TODO - flag deletion -} -Value* ObjectImpl::getValue(const string& key) const +void ObjectImpl::touch() { - map<string, ValuePtr>::const_iterator iter; - - iter = properties.find(key); - if (iter != properties.end()) - return iter->second.get(); - - iter = statistics.find(key); - if (iter != statistics.end()) - return iter->second.get(); - - return 0; + lastUpdatedTime = uint64_t(Duration(now())); } -void ObjectImpl::invokeMethod(const string& methodName, const Value* inArgs, void* context) const -{ - if (broker != 0 && objectId.get() != 0) - broker->sendMethodRequest(objectId.get(), objectClass, methodName, inArgs, context); -} -void ObjectImpl::merge(const Object& from) -{ - for (map<string, ValuePtr>::const_iterator piter = from.impl->properties.begin(); - piter != from.impl->properties.end(); piter++) - properties[piter->first] = piter->second; - for (map<string, ValuePtr>::const_iterator siter = from.impl->statistics.begin(); - siter != from.impl->statistics.end(); siter++) - statistics[siter->first] = siter->second; -} - -void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList) -{ - int propCount = objectClass->getPropertyCount(); - excludeList.clear(); - uint8_t bit = 0; - uint8_t mask = 0; - - for (int idx = 0; idx < propCount; idx++) { - const SchemaProperty* prop = objectClass->getProperty(idx); - if (prop->isOptional()) { - if (bit == 0) { - mask = buffer.getOctet(); - bit = 1; - } - if ((mask & bit) == 0) - excludeList.insert(string(prop->getName())); - if (bit == 0x80) - bit = 0; - else - bit = bit << 1; - } - } -} - -void ObjectImpl::encodeSchemaKey(qpid::framing::Buffer& buffer) const -{ - buffer.putShortString(objectClass->getClassKey()->getPackageName()); - buffer.putShortString(objectClass->getClassKey()->getClassName()); - buffer.putBin128(const_cast<uint8_t*>(objectClass->getClassKey()->getHash())); -} - -void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const -{ - buffer.putLongLong(lastUpdatedTime); - buffer.putLongLong(createTime); - buffer.putLongLong(destroyTime); - objectId->impl->encode(buffer); -} - -void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const +void ObjectImpl::destroy() { - int propCount = objectClass->getPropertyCount(); - uint8_t bit = 0; - uint8_t mask = 0; - ValuePtr value; - - for (int idx = 0; idx < propCount; idx++) { - const SchemaProperty* prop = objectClass->getProperty(idx); - if (prop->isOptional()) { - value = properties[prop->getName()]; - if (bit == 0) - bit = 1; - if (!value->isNull()) - mask |= bit; - if (bit == 0x80) { - buffer.putOctet(mask); - bit = 0; - mask = 0; - } else - bit = bit << 1; - } - } - if (bit != 0) { - buffer.putOctet(mask); - } - - for (int idx = 0; idx < propCount; idx++) { - const SchemaProperty* prop = objectClass->getProperty(idx); - value = properties[prop->getName()]; - if (!prop->isOptional() || !value->isNull()) { - value->impl->encode(buffer); - } - } + destroyTime = uint64_t(Duration(now())); } -void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const -{ - int statCount = objectClass->getStatisticCount(); - for (int idx = 0; idx < statCount; idx++) { - const SchemaStatistic* stat = objectClass->getStatistic(idx); - ValuePtr value = statistics[stat->getName()]; - value->impl->encode(buffer); - } -} //================================================================== // Wrappers //================================================================== -Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(type)) {} -Object::Object(ObjectImpl* i) : impl(i) {} +Object::Object() : impl(new ObjectImpl()) {} +Object::Object(SchemaObjectClass* type) : impl(new ObjectImpl(type)) {} Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {} Object::~Object() { delete impl; } +const Variant::Map& Object::getValues() const { return impl->getValues(); } +Variant::Map& Object::getValues() { return impl->getValues(); } +const SchemaObjectClass* Object::getSchema() const { return impl->getSchema(); } +void Object::setSchema(SchemaObjectClass* schema) { impl->setSchema(schema); } +const char* Object::getKey() const { return impl->getKey(); } +void Object::setKey(const char* key) { impl->setKey(key); } +void Object::touch() { impl->touch(); } void Object::destroy() { impl->destroy(); } -const ObjectId* Object::getObjectId() const { return impl->getObjectId(); } -void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); } -const SchemaObjectClass* Object::getClass() const { return impl->getClass(); } -Value* Object::getValue(const char* key) const { return impl->getValue(key); } -void Object::invokeMethod(const char* m, const Value* a, void* c) const { impl->invokeMethod(m, a, c); } -bool Object::isDeleted() const { return impl->isDeleted(); } -void Object::merge(const Object& from) { impl->merge(from); } - diff --git a/qpid/cpp/src/qmf/engine/ObjectImpl.h b/qpid/cpp/src/qmf/engine/ObjectImpl.h index 6f25867004..f1d588271a 100644 --- a/qpid/cpp/src/qmf/engine/ObjectImpl.h +++ b/qpid/cpp/src/qmf/engine/ObjectImpl.h @@ -21,53 +21,55 @@ */ #include <qmf/engine/Object.h> -#include <qmf/engine/ObjectIdImpl.h> +#include <qpid/sys/Mutex.h> +#include <qpid/messaging/Variant.h> #include <map> #include <set> #include <string> -#include <qpid/framing/Buffer.h> #include <boost/shared_ptr.hpp> -#include <qpid/sys/Mutex.h> namespace qmf { namespace engine { - class BrokerProxyImpl; + class SchemaObjectClass; typedef boost::shared_ptr<Object> ObjectPtr; struct ObjectImpl { - typedef boost::shared_ptr<Value> ValuePtr; - const SchemaObjectClass* objectClass; - BrokerProxyImpl* broker; - boost::shared_ptr<ObjectId> objectId; + /** + * Content of the object's data + */ + qpid::messaging::Variant::Map values; + + /** + * Schema reference if this object is "described" + */ + SchemaObjectClass* objectClass; + + /** + * Address and lifecycle information if this object is "managed" + * The object is considered "managed" if the key is non-empty. + */ + std::string key; uint64_t createTime; uint64_t destroyTime; uint64_t lastUpdatedTime; - mutable std::map<std::string, ValuePtr> properties; - mutable std::map<std::string, ValuePtr> statistics; - ObjectImpl(const SchemaObjectClass* type); - ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer, - bool prop, bool stat, bool managed); - static Object* factory(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer, - bool prop, bool stat, bool managed); - ~ObjectImpl(); + ObjectImpl(); + ObjectImpl(SchemaObjectClass* type); + ~ObjectImpl() {} - void destroy(); - const ObjectId* getObjectId() const { return objectId.get(); } - void setObjectId(ObjectId* oid) { objectId.reset(new ObjectId(*oid)); } - const SchemaObjectClass* getClass() const { return objectClass; } - Value* getValue(const std::string& key) const; - void invokeMethod(const std::string& methodName, const Value* inArgs, void* context) const; - bool isDeleted() const { return destroyTime != 0; } - void merge(const Object& from); + const qpid::messaging::Variant::Map& getValues() const { return values; } + qpid::messaging::Variant::Map& getValues() { return values; } + + const SchemaObjectClass* getSchema() const { return objectClass; } + void setSchema(SchemaObjectClass* schema) { objectClass = schema; } - void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList); - void encodeSchemaKey(qpid::framing::Buffer& buffer) const; - void encodeManagedObjectData(qpid::framing::Buffer& buffer) const; - void encodeProperties(qpid::framing::Buffer& buffer) const; - void encodeStatistics(qpid::framing::Buffer& buffer) const; + const char* getKey() const { return key.c_str(); } + void setKey(const char* _key) { key = _key; } + + void touch(); + void destroy(); }; } } diff --git a/qpid/cpp/src/qmf/engine/Protocol.cpp b/qpid/cpp/src/qmf/engine/Protocol.cpp deleted file mode 100644 index 9e5f490604..0000000000 --- a/qpid/cpp/src/qmf/engine/Protocol.cpp +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/Protocol.h" -#include "qpid/framing/Buffer.h" - -using namespace std; -using namespace qmf::engine; -using namespace qpid::framing; - - -bool Protocol::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) -{ - if (buf.available() < 8) - return false; - - uint8_t h1 = buf.getOctet(); - uint8_t h2 = buf.getOctet(); - uint8_t h3 = buf.getOctet(); - - *opcode = buf.getOctet(); - *seq = buf.getLong(); - - return h1 == 'A' && h2 == 'M' && h3 == '2'; -} - -void Protocol::encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq) -{ - buf.putOctet('A'); - buf.putOctet('M'); - buf.putOctet('2'); - buf.putOctet(opcode); - buf.putLong (seq); -} - - diff --git a/qpid/cpp/src/qmf/engine/QueryImpl.cpp b/qpid/cpp/src/qmf/engine/QueryImpl.cpp index 6f2beeee87..0df49ff646 100644 --- a/qpid/cpp/src/qmf/engine/QueryImpl.cpp +++ b/qpid/cpp/src/qmf/engine/QueryImpl.cpp @@ -18,52 +18,20 @@ */ #include "qmf/engine/QueryImpl.h" -#include "qmf/engine/ObjectIdImpl.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/FieldTable.h" using namespace std; using namespace qmf::engine; -using namespace qpid::framing; +using namespace qpid::messaging; -bool QueryElementImpl::evaluate(const Object* /*object*/) const +bool QueryImpl::matches(const Object&) const { - // TODO: Implement this - return false; + return true; } -bool QueryExpressionImpl::evaluate(const Object* /*object*/) const -{ - // TODO: Implement this - return false; -} - -QueryImpl::QueryImpl(Buffer& buffer) -{ - FieldTable ft; - ft.decode(buffer); - // TODO -} - -Query* QueryImpl::factory(Buffer& buffer) -{ - QueryImpl* impl(new QueryImpl(buffer)); - return new Query(impl); -} -void QueryImpl::encode(Buffer& buffer) const +void QueryImpl::parsePredicate(const std::string&) { - FieldTable ft; - - if (oid.get() != 0) { - ft.setString("_objectid", oid->impl->asString()); - } else { - if (!packageName.empty()) - ft.setString("_package", packageName); - ft.setString("_class", className); - } - - ft.encode(buffer); + predicate.clear(); } @@ -71,33 +39,21 @@ void QueryImpl::encode(Buffer& buffer) const // Wrappers //================================================================== -QueryElement::QueryElement(const char* attrName, const Value* value, ValueOper oper) : impl(new QueryElementImpl(attrName, value, oper)) {} -QueryElement::QueryElement(QueryElementImpl* i) : impl(i) {} -QueryElement::~QueryElement() { delete impl; } -bool QueryElement::evaluate(const Object* object) const { return impl->evaluate(object); } - -QueryExpression::QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2) : impl(new QueryExpressionImpl(oper, operand1, operand2)) {} -QueryExpression::QueryExpression(QueryExpressionImpl* i) : impl(i) {} -QueryExpression::~QueryExpression() { delete impl; } -bool QueryExpression::evaluate(const Object* object) const { return impl->evaluate(object); } - -Query::Query(const char* className, const char* packageName) : impl(new QueryImpl(className, packageName)) {} -Query::Query(const SchemaClassKey* key) : impl(new QueryImpl(key)) {} -Query::Query(const ObjectId* oid) : impl(new QueryImpl(oid)) {} -Query::Query(QueryImpl* i) : impl(i) {} +Query::Query(const char* target) : impl(new QueryImpl(target)) {} +Query::Query(const char* target, const Variant::List& predicate) : impl(new QueryImpl(target, predicate)) {} +Query::Query(const char* target, const char* expression) : impl(new QueryImpl(target, expression)) {} Query::Query(const Query& from) : impl(new QueryImpl(*(from.impl))) {} Query::~Query() { delete impl; } -void Query::setSelect(const QueryOperand* criterion) { impl->setSelect(criterion); } -void Query::setLimit(uint32_t maxResults) { impl->setLimit(maxResults); } -void Query::setOrderBy(const char* attrName, bool decreasing) { impl->setOrderBy(attrName, decreasing); } -const char* Query::getPackage() const { return impl->getPackage().c_str(); } -const char* Query::getClass() const { return impl->getClass().c_str(); } -const ObjectId* Query::getObjectId() const { return impl->getObjectId(); } -bool Query::haveSelect() const { return impl->haveSelect(); } +void Query::where(const Variant::List& predicate) { impl->where(predicate); } +void Query::where(const char* expression) { impl->where(expression); } +void Query::limit(uint32_t maxResults) { impl->limit(maxResults); } +void Query::orderBy(const char* attrName, bool decreasing) { impl->orderBy(attrName, decreasing); } +bool Query::havePredicate() const { return impl->havePredicate(); } bool Query::haveLimit() const { return impl->haveLimit(); } bool Query::haveOrderBy() const { return impl->haveOrderBy(); } -const QueryOperand* Query::getSelect() const { return impl->getSelect(); } +const Variant::List& Query::getPredicate() const { return impl->getPredicate(); } uint32_t Query::getLimit() const { return impl->getLimit(); } -const char* Query::getOrderBy() const { return impl->getOrderBy().c_str(); } +const char* Query::getOrderBy() const { return impl->getOrderBy(); } bool Query::getDecreasing() const { return impl->getDecreasing(); } +bool Query::matches(const Object& object) const { return impl->matches(object); } diff --git a/qpid/cpp/src/qmf/engine/QueryImpl.h b/qpid/cpp/src/qmf/engine/QueryImpl.h index 8ebe0d932f..0ef8711f8e 100644 --- a/qpid/cpp/src/qmf/engine/QueryImpl.h +++ b/qpid/cpp/src/qmf/engine/QueryImpl.h @@ -21,79 +21,41 @@ */ #include "qmf/engine/Query.h" -#include "qmf/engine/Schema.h" +#include <qpid/messaging/Variant.h> #include <string> #include <boost/shared_ptr.hpp> -namespace qpid { - namespace framing { - class Buffer; - } -} - namespace qmf { namespace engine { - struct QueryElementImpl { - QueryElementImpl(const std::string& a, const Value* v, ValueOper o) : attrName(a), value(v), oper(o) {} - ~QueryElementImpl() {} - bool evaluate(const Object* object) const; - - std::string attrName; - const Value* value; - ValueOper oper; - }; - - struct QueryExpressionImpl { - QueryExpressionImpl(ExprOper o, const QueryOperand* operand1, const QueryOperand* operand2) : oper(o), left(operand1), right(operand2) {} - ~QueryExpressionImpl() {} - bool evaluate(const Object* object) const; - - ExprOper oper; - const QueryOperand* left; - const QueryOperand* right; - }; - struct QueryImpl { - // Constructors mapped to public - QueryImpl(const std::string& c, const std::string& p) : packageName(p), className(c), select(0), resultLimit(0) {} - QueryImpl(const SchemaClassKey* key) : packageName(key->getPackageName()), className(key->getClassName()), select(0), resultLimit(0) {} - QueryImpl(const ObjectId* oid) : oid(new ObjectId(*oid)), select(0), resultLimit(0) {} - - // Factory constructors - QueryImpl(qpid::framing::Buffer& buffer); - - ~QueryImpl() {}; - static Query* factory(qpid::framing::Buffer& buffer); - - void setSelect(const QueryOperand* criterion) { select = criterion; } - void setLimit(uint32_t maxResults) { resultLimit = maxResults; } - void setOrderBy(const std::string& attrName, bool decreasing) { - orderBy = attrName; orderDecreasing = decreasing; - } - - const std::string& getPackage() const { return packageName; } - const std::string& getClass() const { return className; } - const ObjectId* getObjectId() const { return oid.get(); } - - bool haveSelect() const { return select != 0; } - bool haveLimit() const { return resultLimit > 0; } - bool haveOrderBy() const { return !orderBy.empty(); } - const QueryOperand* getSelect() const { return select; } + QueryImpl(const char* _target) : target(_target), resultLimit(0) {} + QueryImpl(const char* _target, const qpid::messaging::Variant::List& _predicate) : + target(_target), predicate(_predicate), resultLimit(0) {} + QueryImpl(const char* _target, const char* expression) : + target(_target), resultLimit(0) { parsePredicate(expression); } + ~QueryImpl() {} + + void where(const qpid::messaging::Variant::List& _predicate) { predicate = _predicate; } + void where(const char* expression) { parsePredicate(expression); } + void limit(uint32_t maxResults) { resultLimit = maxResults; } + void orderBy(const char* attrName, bool decreasing) { sortAttr = attrName; orderDecreasing = decreasing; } + + bool havePredicate() const { return !predicate.empty(); } + bool haveLimit() const { return resultLimit != 0; } + bool haveOrderBy() const { return !sortAttr.empty(); } + const qpid::messaging::Variant::List& getPredicate() const { return predicate; } uint32_t getLimit() const { return resultLimit; } - const std::string& getOrderBy() const { return orderBy; } + const char* getOrderBy() const { return sortAttr.c_str(); } bool getDecreasing() const { return orderDecreasing; } + bool matches(const Object& object) const; - void encode(qpid::framing::Buffer& buffer) const; - bool singleAgent() const { return oid.get() != 0; } - uint32_t agentBank() const { return singleAgent() ? oid->getAgentBank() : 0; } + void parsePredicate(const std::string& expression); - std::string packageName; - std::string className; - boost::shared_ptr<ObjectId> oid; - const QueryOperand* select; + const std::string target; + qpid::messaging::Variant::List predicate; uint32_t resultLimit; - std::string orderBy; + std::string sortAttr; bool orderDecreasing; }; } diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp deleted file mode 100644 index ab65b8d768..0000000000 --- a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp +++ /dev/null @@ -1,514 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/ResilientConnection.h" -#include "qmf/engine/MessageImpl.h" -#include "qmf/engine/ConnectionSettingsImpl.h" -#include <qpid/client/Connection.h> -#include <qpid/client/Session.h> -#include <qpid/client/MessageListener.h> -#include <qpid/client/SubscriptionManager.h> -#include <qpid/client/Message.h> -#include <qpid/sys/Thread.h> -#include <qpid/sys/Runnable.h> -#include <qpid/sys/Mutex.h> -#include <qpid/sys/Condition.h> -#include <qpid/sys/Time.h> -#include <qpid/log/Statement.h> -#include <qpid/RefCounted.h> -#include <boost/bind.hpp> -#include <string> -#include <deque> -#include <vector> -#include <set> -#include <boost/intrusive_ptr.hpp> -#include <boost/noncopyable.hpp> -#include <unistd.h> -#include <fcntl.h> - -using namespace std; -using namespace qmf::engine; -using namespace qpid; -using qpid::sys::Mutex; - -namespace qmf { -namespace engine { - struct ResilientConnectionEventImpl { - ResilientConnectionEvent::EventKind kind; - void* sessionContext; - string errorText; - MessageImpl message; - - ResilientConnectionEventImpl(ResilientConnectionEvent::EventKind k, - const MessageImpl& m = MessageImpl()) : - kind(k), sessionContext(0), message(m) {} - ResilientConnectionEvent copy(); - }; - - struct RCSession : public client::MessageListener, public qpid::sys::Runnable, public qpid::RefCounted { - typedef boost::intrusive_ptr<RCSession> Ptr; - ResilientConnectionImpl& connImpl; - string name; - client::Connection& connection; - client::Session session; - client::SubscriptionManager* subscriptions; - string userId; - void* userContext; - vector<string> dests; - qpid::sys::Thread thread; - - RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc); - ~RCSession(); - void received(client::Message& msg); - void run(); - void stop(); - }; - - class ResilientConnectionImpl : public qpid::sys::Runnable, public boost::noncopyable { - public: - ResilientConnectionImpl(const ConnectionSettings& settings); - ~ResilientConnectionImpl(); - - bool isConnected() const; - bool getEvent(ResilientConnectionEvent& event); - void popEvent(); - bool createSession(const char* name, void* sessionContext, SessionHandle& handle); - void destroySession(SessionHandle handle); - void sendMessage(SessionHandle handle, qmf::engine::Message& message); - void declareQueue(SessionHandle handle, char* queue); - void deleteQueue(SessionHandle handle, char* queue); - void bind(SessionHandle handle, char* exchange, char* queue, char* key); - void unbind(SessionHandle handle, char* exchange, char* queue, char* key); - void setNotifyFd(int fd); - void notify(); - - void run(); - void failure(); - void sessionClosed(RCSession* sess); - - void EnqueueEvent(ResilientConnectionEvent::EventKind kind, - void* sessionContext = 0, - const MessageImpl& message = MessageImpl(), - const string& errorText = ""); - - private: - int notifyFd; - bool connected; - bool shutdown; - string lastError; - const ConnectionSettings settings; - client::Connection connection; - mutable qpid::sys::Mutex lock; - int delayMin; - int delayMax; - int delayFactor; - qpid::sys::Condition cond; - deque<ResilientConnectionEventImpl> eventQueue; - set<RCSession::Ptr> sessions; - qpid::sys::Thread connThread; - }; -} -} - -ResilientConnectionEvent ResilientConnectionEventImpl::copy() -{ - ResilientConnectionEvent item; - - ::memset(&item, 0, sizeof(ResilientConnectionEvent)); - item.kind = kind; - item.sessionContext = sessionContext; - item.message = message.copy(); - item.errorText = const_cast<char*>(errorText.c_str()); - - return item; -} - -RCSession::RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) : - connImpl(ci), name(n), connection(c), session(connection.newSession(name)), - subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this) -{ - const qpid::client::ConnectionSettings& operSettings = connection.getNegotiatedSettings(); - userId = operSettings.username; -} - -RCSession::~RCSession() -{ - subscriptions->stop(); - thread.join(); - session.close(); - delete subscriptions; -} - -void RCSession::run() -{ - try { - subscriptions->run(); - } catch (exception& /*e*/) { - connImpl.sessionClosed(this); - } -} - -void RCSession::stop() -{ - subscriptions->stop(); -} - -void RCSession::received(client::Message& msg) -{ - MessageImpl qmsg; - qmsg.body = msg.getData(); - - qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties(); - if (dp.hasRoutingKey()) { - qmsg.routingKey = dp.getRoutingKey(); - } - - qpid::framing::MessageProperties mp = msg.getMessageProperties(); - if (mp.hasReplyTo()) { - const qpid::framing::ReplyTo& rt = mp.getReplyTo(); - qmsg.replyExchange = rt.getExchange(); - qmsg.replyKey = rt.getRoutingKey(); - } - - if (mp.hasUserId()) { - qmsg.userId = mp.getUserId(); - } - - connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg); -} - -ResilientConnectionImpl::ResilientConnectionImpl(const ConnectionSettings& _settings) : - notifyFd(-1), connected(false), shutdown(false), settings(_settings), delayMin(1), connThread(*this) -{ - connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this)); - settings.impl->getRetrySettings(&delayMin, &delayMax, &delayFactor); -} - -ResilientConnectionImpl::~ResilientConnectionImpl() -{ - shutdown = true; - connected = false; - cond.notify(); - connThread.join(); - connection.close(); -} - -bool ResilientConnectionImpl::isConnected() const -{ - Mutex::ScopedLock _lock(lock); - return connected; -} - -bool ResilientConnectionImpl::getEvent(ResilientConnectionEvent& event) -{ - Mutex::ScopedLock _lock(lock); - if (eventQueue.empty()) - return false; - event = eventQueue.front().copy(); - return true; -} - -void ResilientConnectionImpl::popEvent() -{ - Mutex::ScopedLock _lock(lock); - if (!eventQueue.empty()) - eventQueue.pop_front(); -} - -bool ResilientConnectionImpl::createSession(const char* name, void* sessionContext, - SessionHandle& handle) -{ - Mutex::ScopedLock _lock(lock); - if (!connected) - return false; - - RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext)); - - handle.impl = (void*) sess.get(); - sessions.insert(sess); - - return true; -} - -void ResilientConnectionImpl::destroySession(SessionHandle handle) -{ - Mutex::ScopedLock _lock(lock); - RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl); - set<RCSession::Ptr>::iterator iter = sessions.find(sess); - if (iter != sessions.end()) { - for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++) - sess->subscriptions->cancel(dIter->c_str()); - sess->subscriptions->stop(); - sess->subscriptions->wait(); - - sessions.erase(iter); - return; - } -} - -void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::engine::Message& message) -{ - Mutex::ScopedLock _lock(lock); - RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl); - set<RCSession::Ptr>::iterator iter = sessions.find(sess); - qpid::client::Message msg; - string data(message.body, message.length); - msg.getDeliveryProperties().setRoutingKey(message.routingKey); - msg.getMessageProperties().setReplyTo(qpid::framing::ReplyTo(message.replyExchange, message.replyKey)); - if (settings.impl->getSendUserId()) - msg.getMessageProperties().setUserId(sess->userId); - msg.setData(data); - - try { - sess->session.messageTransfer(client::arg::content=msg, client::arg::destination=message.destination); - } catch(exception& e) { - QPID_LOG(error, "Session Exception during message-transfer: " << e.what()); - sessions.erase(iter); - EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, (*iter)->userContext); - } -} - -void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue) -{ - Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.impl; - - sess->session.queueDeclare(client::arg::queue=queue, client::arg::autoDelete=true, client::arg::exclusive=true); - sess->subscriptions->setAcceptMode(client::ACCEPT_MODE_NONE); - sess->subscriptions->setAcquireMode(client::ACQUIRE_MODE_PRE_ACQUIRED); - sess->subscriptions->subscribe(*sess, queue, queue); - sess->subscriptions->setFlowControl(queue, client::FlowControl::unlimited()); - sess->dests.push_back(string(queue)); -} - -void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue) -{ - Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.impl; - - sess->session.queueDelete(client::arg::queue=queue); - for (vector<string>::iterator iter = sess->dests.begin(); - iter != sess->dests.end(); iter++) - if (*iter == queue) { - sess->subscriptions->cancel(queue); - sess->dests.erase(iter); - break; - } -} - -void ResilientConnectionImpl::bind(SessionHandle handle, - char* exchange, char* queue, char* key) -{ - Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.impl; - - sess->session.exchangeBind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key); -} - -void ResilientConnectionImpl::unbind(SessionHandle handle, - char* exchange, char* queue, char* key) -{ - Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.impl; - - sess->session.exchangeUnbind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key); -} - -void ResilientConnectionImpl::notify() -{ - if (notifyFd != -1) - { - int unused_ret; //Suppress warnings about ignoring return value. - unused_ret = ::write(notifyFd, ".", 1); - } -} - - -void ResilientConnectionImpl::setNotifyFd(int fd) -{ - notifyFd = fd; - if (notifyFd > 0) { - int original = fcntl(notifyFd, F_GETFL); - fcntl(notifyFd, F_SETFL, O_NONBLOCK | original); - } -} - -void ResilientConnectionImpl::run() -{ - int delay(delayMin); - - while (true) { - try { - QPID_LOG(trace, "Trying to open connection..."); - connection.open(settings.impl->getClientSettings()); - { - Mutex::ScopedLock _lock(lock); - connected = true; - EnqueueEvent(ResilientConnectionEvent::CONNECTED); - - while (connected) - cond.wait(lock); - delay = delayMin; - - while (!sessions.empty()) { - set<RCSession::Ptr>::iterator iter = sessions.begin(); - RCSession::Ptr sess = *iter; - sessions.erase(iter); - EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext); - Mutex::ScopedUnlock _u(lock); - sess->stop(); - - // Nullify the intrusive pointer within the scoped unlock, otherwise, - // the reference is held until overwritted above (under lock) which causes - // the session destructor to be called with the lock held. - sess = 0; - } - - EnqueueEvent(ResilientConnectionEvent::DISCONNECTED); - - if (shutdown) - return; - } - connection.close(); - } catch (exception &e) { - QPID_LOG(debug, "connection.open exception: " << e.what()); - Mutex::ScopedLock _lock(lock); - lastError = e.what(); - if (delay < delayMax) - delay *= delayFactor; - } - - ::qpid::sys::sleep(delay); - } -} - -void ResilientConnectionImpl::failure() -{ - Mutex::ScopedLock _lock(lock); - - connected = false; - lastError = "Closed by Peer"; - cond.notify(); -} - -void ResilientConnectionImpl::sessionClosed(RCSession*) -{ - Mutex::ScopedLock _lock(lock); - connected = false; - lastError = "Closed due to Session failure"; - cond.notify(); -} - -void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind kind, - void* sessionContext, - const MessageImpl& message, - const string& errorText) -{ - { - Mutex::ScopedLock _lock(lock); - ResilientConnectionEventImpl event(kind, message); - - event.sessionContext = sessionContext; - event.errorText = errorText; - - eventQueue.push_back(event); - } - - if (notifyFd != -1) - { - int unused_ret; //Suppress warnings about ignoring return value. - unused_ret = ::write(notifyFd, ".", 1); - } -} - - -//================================================================== -// Wrappers -//================================================================== - -ResilientConnection::ResilientConnection(const ConnectionSettings& settings) -{ - impl = new ResilientConnectionImpl(settings); -} - -ResilientConnection::~ResilientConnection() -{ - delete impl; -} - -bool ResilientConnection::isConnected() const -{ - return impl->isConnected(); -} - -bool ResilientConnection::getEvent(ResilientConnectionEvent& event) -{ - return impl->getEvent(event); -} - -void ResilientConnection::popEvent() -{ - impl->popEvent(); -} - -bool ResilientConnection::createSession(const char* name, void* sessionContext, SessionHandle& handle) -{ - return impl->createSession(name, sessionContext, handle); -} - -void ResilientConnection::destroySession(SessionHandle handle) -{ - impl->destroySession(handle); -} - -void ResilientConnection::sendMessage(SessionHandle handle, qmf::engine::Message& message) -{ - impl->sendMessage(handle, message); -} - -void ResilientConnection::declareQueue(SessionHandle handle, char* queue) -{ - impl->declareQueue(handle, queue); -} - -void ResilientConnection::deleteQueue(SessionHandle handle, char* queue) -{ - impl->deleteQueue(handle, queue); -} - -void ResilientConnection::bind(SessionHandle handle, char* exchange, char* queue, char* key) -{ - impl->bind(handle, exchange, queue, key); -} - -void ResilientConnection::unbind(SessionHandle handle, char* exchange, char* queue, char* key) -{ - impl->unbind(handle, exchange, queue, key); -} - -void ResilientConnection::setNotifyFd(int fd) -{ - impl->setNotifyFd(fd); -} - -void ResilientConnection::notify() -{ - impl->notify(); -} - diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp index 249a08ba7f..dcfc0db8e5 100644 --- a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp @@ -18,18 +18,14 @@ */ #include "qmf/engine/SchemaImpl.h" -#include <qpid/framing/Buffer.h> -#include <qpid/framing/FieldTable.h> -#include <qpid/framing/Uuid.h> +#include "qmf/Protocol.h" #include <string.h> #include <string> #include <vector> using namespace std; using namespace qmf::engine; -using qpid::framing::Buffer; -using qpid::framing::FieldTable; -using qpid::framing::Uuid; +using namespace qpid::messaging; SchemaHash::SchemaHash() { @@ -37,16 +33,6 @@ SchemaHash::SchemaHash() hash[idx] = 0x5A; } -void SchemaHash::encode(Buffer& buffer) const -{ - buffer.putBin128(hash); -} - -void SchemaHash::decode(Buffer& buffer) -{ - buffer.getBin128(hash); -} - void SchemaHash::update(uint8_t data) { update((char*) &data, 1); @@ -81,48 +67,68 @@ bool SchemaHash::operator>(const SchemaHash& other) const return ::memcmp(&hash, &other.hash, 16) > 0; } -SchemaArgumentImpl::SchemaArgumentImpl(Buffer& buffer) + +SchemaArgumentImpl::SchemaArgumentImpl(const Variant::Map& map) { - FieldTable map; - map.decode(buffer); + Variant::Map::const_iterator iter; - name = map.getAsString("name"); - typecode = (Typecode) map.getAsInt("type"); - unit = map.getAsString("unit"); - description = map.getAsString("desc"); + iter = map.find(Protocol::SCHEMA_ELT_NAME); + if (iter == map.end()) + throw SchemaException("SchemaArgument", Protocol::SCHEMA_ELT_NAME); + name = iter->second.asString(); + + iter = map.find(Protocol::SCHEMA_ELT_TYPE); + if (iter == map.end()) + throw SchemaException("SchemaArgument", Protocol::SCHEMA_ELT_TYPE); + typecode = (Typecode) iter->second.asUint8(); + + iter = map.find(Protocol::SCHEMA_ELT_UNIT); + if (iter != map.end()) + unit = iter->second.asString(); + + iter = map.find(Protocol::SCHEMA_ELT_DESC); + if (iter != map.end()) + description = iter->second.asString(); dir = DIR_IN; - string dstr(map.getAsString("dir")); - if (dstr == "O") - dir = DIR_OUT; - else if (dstr == "IO") - dir = DIR_IN_OUT; + iter = map.find(Protocol::SCHEMA_ELT_DIR); + if (iter != map.end()) { + string dstr(iter->second.asString()); + if (dstr == "O") + dir = DIR_OUT; + else if (dstr == "IO") + dir = DIR_IN_OUT; + } } -SchemaArgument* SchemaArgumentImpl::factory(Buffer& buffer) +SchemaArgument* SchemaArgumentImpl::factory(Variant::Map& map) { - SchemaArgumentImpl* impl(new SchemaArgumentImpl(buffer)); + SchemaArgumentImpl* impl(new SchemaArgumentImpl(map)); return new SchemaArgument(impl); } -void SchemaArgumentImpl::encode(Buffer& buffer) const +Variant::Map SchemaArgumentImpl::asMap() const { - FieldTable map; + Variant::Map map; + + map[Protocol::SCHEMA_ELT_NAME] = Variant(name); + map[Protocol::SCHEMA_ELT_TYPE] = Variant((uint8_t) typecode); - map.setString("name", name); - map.setInt("type", (int) typecode); + string dirStr; if (dir == DIR_IN) - map.setString("dir", "I"); + dirStr = "I"; else if (dir == DIR_OUT) - map.setString("dir", "O"); + dirStr = "O"; else - map.setString("dir", "IO"); + dirStr = "IO"; + map[Protocol::SCHEMA_ELT_DIR] = Variant(dirStr); + if (!unit.empty()) - map.setString("unit", unit); + map[Protocol::SCHEMA_ELT_UNIT] = Variant(unit); if (!description.empty()) - map.setString("desc", description); + map[Protocol::SCHEMA_ELT_DESC] = Variant(description); - map.encode(buffer); + return map; } void SchemaArgumentImpl::updateHash(SchemaHash& hash) const @@ -134,41 +140,51 @@ void SchemaArgumentImpl::updateHash(SchemaHash& hash) const hash.update(description); } -SchemaMethodImpl::SchemaMethodImpl(Buffer& buffer) +SchemaMethodImpl::SchemaMethodImpl(const Variant::Map& map) { - FieldTable map; - int argCount; + Variant::Map::const_iterator iter; - map.decode(buffer); - name = map.getAsString("name"); - argCount = map.getAsInt("argCount"); - description = map.getAsString("desc"); + iter = map.find(Protocol::SCHEMA_ELT_NAME); + if (iter == map.end()) + throw SchemaException("SchemaMethod", Protocol::SCHEMA_ELT_NAME); + name = iter->second.asString(); - for (int idx = 0; idx < argCount; idx++) { - SchemaArgument* arg = SchemaArgumentImpl::factory(buffer); - addArgument(arg); + iter = map.find(Protocol::SCHEMA_ELT_DESC); + if (iter != map.end()) + description = iter->second.asString(); + + iter = map.find(Protocol::SCHEMA_ARGS); + if (iter != map.end()) { + Variant::List list(iter->second.asList()); + for (Variant::List::const_iterator aiter = list.begin(); aiter != list.end(); aiter++) { + Variant::Map argMap(aiter->asMap()); + SchemaArgument* arg = SchemaArgumentImpl::factory(argMap); + addArgument(arg); + } } } -SchemaMethod* SchemaMethodImpl::factory(Buffer& buffer) +SchemaMethod* SchemaMethodImpl::factory(Variant::Map& map) { - SchemaMethodImpl* impl(new SchemaMethodImpl(buffer)); + SchemaMethodImpl* impl(new SchemaMethodImpl(map)); return new SchemaMethod(impl); } -void SchemaMethodImpl::encode(Buffer& buffer) const +Variant::Map SchemaMethodImpl::asMap() const { - FieldTable map; + Variant::Map map; - map.setString("name", name); - map.setInt("argCount", arguments.size()); + map[Protocol::SCHEMA_ELT_NAME] = Variant(name); if (!description.empty()) - map.setString("desc", description); - map.encode(buffer); + map[Protocol::SCHEMA_ELT_DESC] = Variant(description); + Variant::List list; for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++) - (*iter)->impl->encode(buffer); + list.push_back((*iter)->impl->asMap()); + map[Protocol::SCHEMA_ARGS] = list; + + return map; } void SchemaMethodImpl::addArgument(const SchemaArgument* argument) @@ -195,41 +211,58 @@ void SchemaMethodImpl::updateHash(SchemaHash& hash) const (*iter)->impl->updateHash(hash); } -SchemaPropertyImpl::SchemaPropertyImpl(Buffer& buffer) +SchemaPropertyImpl::SchemaPropertyImpl(const Variant::Map& map) { - FieldTable map; - map.decode(buffer); + Variant::Map::const_iterator iter; + + iter = map.find(Protocol::SCHEMA_ELT_NAME); + if (iter == map.end()) + throw SchemaException("SchemaProperty", Protocol::SCHEMA_ELT_NAME); + name = iter->second.asString(); + + iter = map.find(Protocol::SCHEMA_ELT_TYPE); + if (iter == map.end()) + throw SchemaException("SchemaProperty", Protocol::SCHEMA_ELT_TYPE); + typecode = (Typecode) iter->second.asUint8(); + + iter = map.find(Protocol::SCHEMA_ELT_ACCESS); + if (iter != map.end()) + access = (Access) iter->second.asUint8(); + + iter = map.find(Protocol::SCHEMA_ELT_UNIT); + if (iter != map.end()) + unit = iter->second.asString(); - name = map.getAsString("name"); - typecode = (Typecode) map.getAsInt("type"); - access = (Access) map.getAsInt("access"); - index = map.getAsInt("index") != 0; - optional = map.getAsInt("optional") != 0; - unit = map.getAsString("unit"); - description = map.getAsString("desc"); + iter = map.find(Protocol::SCHEMA_ELT_DESC); + if (iter != map.end()) + description = iter->second.asString(); + + iter = map.find(Protocol::SCHEMA_ELT_OPTIONAL); + if (iter != map.end()) + optional = true; } -SchemaProperty* SchemaPropertyImpl::factory(Buffer& buffer) +SchemaProperty* SchemaPropertyImpl::factory(Variant::Map& map) { - SchemaPropertyImpl* impl(new SchemaPropertyImpl(buffer)); + SchemaPropertyImpl* impl(new SchemaPropertyImpl(map)); return new SchemaProperty(impl); } -void SchemaPropertyImpl::encode(Buffer& buffer) const +Variant::Map SchemaPropertyImpl::asMap() const { - FieldTable map; + Variant::Map map; - map.setString("name", name); - map.setInt("type", (int) typecode); - map.setInt("access", (int) access); - map.setInt("index", index ? 1 : 0); - map.setInt("optional", optional ? 1 : 0); + map[Protocol::SCHEMA_ELT_NAME] = Variant(name); + map[Protocol::SCHEMA_ELT_TYPE] = Variant((uint8_t) typecode); + map[Protocol::SCHEMA_ELT_ACCESS] = Variant((uint8_t) access); + if (optional) + map[Protocol::SCHEMA_ELT_OPTIONAL] = Variant(); if (!unit.empty()) - map.setString("unit", unit); + map[Protocol::SCHEMA_ELT_UNIT] = Variant(unit); if (!description.empty()) - map.setString("desc", description); + map[Protocol::SCHEMA_ELT_DESC] = Variant(description); - map.encode(buffer); + return map; } void SchemaPropertyImpl::updateHash(SchemaHash& hash) const @@ -243,52 +276,28 @@ void SchemaPropertyImpl::updateHash(SchemaHash& hash) const hash.update(description); } -SchemaStatisticImpl::SchemaStatisticImpl(Buffer& buffer) -{ - FieldTable map; - map.decode(buffer); - - name = map.getAsString("name"); - typecode = (Typecode) map.getAsInt("type"); - unit = map.getAsString("unit"); - description = map.getAsString("desc"); -} - -SchemaStatistic* SchemaStatisticImpl::factory(Buffer& buffer) -{ - SchemaStatisticImpl* impl(new SchemaStatisticImpl(buffer)); - return new SchemaStatistic(impl); -} +SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) : + package(p), name(n), hash(h) {} -void SchemaStatisticImpl::encode(Buffer& buffer) const +SchemaClassKeyImpl::SchemaClassKeyImpl(const Variant::Map& map) : + package(packageContainer), name(nameContainer), hash(hashContainer) { - FieldTable map; + Variant::Map::const_iterator iter; - map.setString("name", name); - map.setInt("type", (int) typecode); - if (!unit.empty()) - map.setString("unit", unit); - if (!description.empty()) - map.setString("desc", description); + iter = map.find(Protocol::SCHEMA_PACKAGE); + if (iter == map.end()) + throw SchemaException("SchemaClassKey", Protocol::SCHEMA_PACKAGE); + packageContainer = iter->second.asString(); - map.encode(buffer); -} + iter = map.find(Protocol::SCHEMA_CLASS); + if (iter == map.end()) + throw SchemaException("SchemaClassKey", Protocol::SCHEMA_CLASS); + nameContainer = iter->second.asString(); -void SchemaStatisticImpl::updateHash(SchemaHash& hash) const -{ - hash.update(name); - hash.update(typecode); - hash.update(unit); - hash.update(description); -} - -SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) : package(p), name(n), hash(h) {} - -SchemaClassKeyImpl::SchemaClassKeyImpl(Buffer& buffer) : package(packageContainer), name(nameContainer), hash(hashContainer) -{ - buffer.getShortString(packageContainer); - buffer.getShortString(nameContainer); - hashContainer.decode(buffer); + iter = map.find(Protocol::SCHEMA_HASH); + if (iter == map.end()) + throw SchemaException("SchemaClassKey", Protocol::SCHEMA_HASH); + hashContainer.set(iter->second.asUuid().data()); } SchemaClassKey* SchemaClassKeyImpl::factory(const string& package, const string& name, const SchemaHash& hash) @@ -297,17 +306,21 @@ SchemaClassKey* SchemaClassKeyImpl::factory(const string& package, const string& return new SchemaClassKey(impl); } -SchemaClassKey* SchemaClassKeyImpl::factory(Buffer& buffer) +SchemaClassKey* SchemaClassKeyImpl::factory(Variant::Map& map) { - SchemaClassKeyImpl* impl(new SchemaClassKeyImpl(buffer)); + SchemaClassKeyImpl* impl(new SchemaClassKeyImpl(map)); return new SchemaClassKey(impl); } -void SchemaClassKeyImpl::encode(Buffer& buffer) const +Variant::Map SchemaClassKeyImpl::asMap() const { - buffer.putShortString(package); - buffer.putShortString(name); - hash.encode(buffer); + Variant::Map map; + + map[Protocol::SCHEMA_PACKAGE] = Variant(package); + map[Protocol::SCHEMA_CLASS] = Variant(name); + map[Protocol::SCHEMA_HASH] = Variant(); // TODO: use UUID type when available + + return map; } bool SchemaClassKeyImpl::operator==(const SchemaClassKeyImpl& other) const @@ -335,10 +348,21 @@ const string& SchemaClassKeyImpl::str() const return repr; } -SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash)) +SchemaObjectClassImpl::SchemaObjectClassImpl(const Variant::Map& map) : + hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash)) { - buffer.getShortString(package); - buffer.getShortString(name); + Variant::Map::const_iterator iter; + + iter = map.find(Protocol::SCHEMA_PACKAGE); + if (iter == map.end()) + throw SchemaException("SchemaObjectClass", Protocol::SCHEMA_PACKAGE); + package = iter->second.asString(); + + iter = map.find(Protocol::SCHEMA_CLASS); + if (iter == map.end()) + throw SchemaException("SchemaObjectClass", Protocol::SCHEMA_CLASS); + name = iter->second.asString(); + hash.decode(buffer); uint16_t propCount = buffer.getShort(); @@ -350,24 +374,19 @@ SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : hasHash(true), cl addProperty(property); } - for (uint16_t idx = 0; idx < statCount; idx++) { - const SchemaStatistic* statistic = SchemaStatisticImpl::factory(buffer); - addStatistic(statistic); - } - for (uint16_t idx = 0; idx < methodCount; idx++) { SchemaMethod* method = SchemaMethodImpl::factory(buffer); addMethod(method); } } -SchemaObjectClass* SchemaObjectClassImpl::factory(Buffer& buffer) +SchemaObjectClass* SchemaObjectClassImpl::factory(Variant::Map& map) { SchemaObjectClassImpl* impl(new SchemaObjectClassImpl(buffer)); return new SchemaObjectClass(impl); } -void SchemaObjectClassImpl::encode(Buffer& buffer) const +void SchemaObjectClassImpl::encode(Variant::Map& map) const { buffer.putOctet((uint8_t) CLASS_OBJECT); buffer.putShortString(package); @@ -454,11 +473,12 @@ const SchemaMethod* SchemaObjectClassImpl::getMethod(int idx) const return 0; } -SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash)) +SchemaEventClassImpl::SchemaEventClassImpl(Variant::Map& map) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash)) { buffer.getShortString(package); buffer.getShortString(name); hash.decode(buffer); + buffer.putOctet(0); // No parent class uint16_t argCount = buffer.getShort(); @@ -468,13 +488,13 @@ SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : hasHash(true), clas } } -SchemaEventClass* SchemaEventClassImpl::factory(Buffer& buffer) +SchemaEventClass* SchemaEventClassImpl::factory(Variant::Map& map) { SchemaEventClassImpl* impl(new SchemaEventClassImpl(buffer)); return new SchemaEventClass(impl); } -void SchemaEventClassImpl::encode(Buffer& buffer) const +void SchemaEventClassImpl::encode(Variant::Map& map) const { buffer.putOctet((uint8_t) CLASS_EVENT); buffer.putShortString(package); @@ -561,17 +581,6 @@ bool SchemaProperty::isOptional() const { return impl->isOptional(); } const char* SchemaProperty::getUnit() const { return impl->getUnit().c_str(); } const char* SchemaProperty::getDesc() const { return impl->getDesc().c_str(); } -SchemaStatistic::SchemaStatistic(const char* name, Typecode typecode) : impl(new SchemaStatisticImpl(name, typecode)) {} -SchemaStatistic::SchemaStatistic(SchemaStatisticImpl* i) : impl(i) {} -SchemaStatistic::SchemaStatistic(const SchemaStatistic& from) : impl(new SchemaStatisticImpl(*(from.impl))) {} -SchemaStatistic::~SchemaStatistic() { delete impl; } -void SchemaStatistic::setUnit(const char* val) { impl->setUnit(val); } -void SchemaStatistic::setDesc(const char* desc) { impl->setDesc(desc); } -const char* SchemaStatistic::getName() const { return impl->getName().c_str(); } -Typecode SchemaStatistic::getType() const { return impl->getType(); } -const char* SchemaStatistic::getUnit() const { return impl->getUnit().c_str(); } -const char* SchemaStatistic::getDesc() const { return impl->getDesc().c_str(); } - SchemaClassKey::SchemaClassKey(SchemaClassKeyImpl* i) : impl(i) {} SchemaClassKey::SchemaClassKey(const SchemaClassKey& from) : impl(new SchemaClassKeyImpl(*(from.impl))) {} SchemaClassKey::~SchemaClassKey() { delete impl; } @@ -587,24 +596,20 @@ SchemaObjectClass::SchemaObjectClass(SchemaObjectClassImpl* i) : impl(i) {} SchemaObjectClass::SchemaObjectClass(const SchemaObjectClass& from) : impl(new SchemaObjectClassImpl(*(from.impl))) {} SchemaObjectClass::~SchemaObjectClass() { delete impl; } void SchemaObjectClass::addProperty(const SchemaProperty* property) { impl->addProperty(property); } -void SchemaObjectClass::addStatistic(const SchemaStatistic* statistic) { impl->addStatistic(statistic); } void SchemaObjectClass::addMethod(const SchemaMethod* method) { impl->addMethod(method); } const SchemaClassKey* SchemaObjectClass::getClassKey() const { return impl->getClassKey(); } int SchemaObjectClass::getPropertyCount() const { return impl->getPropertyCount(); } -int SchemaObjectClass::getStatisticCount() const { return impl->getStatisticCount(); } int SchemaObjectClass::getMethodCount() const { return impl->getMethodCount(); } const SchemaProperty* SchemaObjectClass::getProperty(int idx) const { return impl->getProperty(idx); } -const SchemaStatistic* SchemaObjectClass::getStatistic(int idx) const { return impl->getStatistic(idx); } const SchemaMethod* SchemaObjectClass::getMethod(int idx) const { return impl->getMethod(idx); } -SchemaEventClass::SchemaEventClass(const char* package, const char* name, Severity s) : impl(new SchemaEventClassImpl(package, name, s)) {} +SchemaEventClass::SchemaEventClass(const char* package, const char* name) : impl(new SchemaEventClassImpl(package, name)) {} SchemaEventClass::SchemaEventClass(SchemaEventClassImpl* i) : impl(i) {} SchemaEventClass::SchemaEventClass(const SchemaEventClass& from) : impl(new SchemaEventClassImpl(*(from.impl))) {} SchemaEventClass::~SchemaEventClass() { delete impl; } void SchemaEventClass::addArgument(const SchemaArgument* argument) { impl->addArgument(argument); } void SchemaEventClass::setDesc(const char* desc) { impl->setDesc(desc); } const SchemaClassKey* SchemaEventClass::getClassKey() const { return impl->getClassKey(); } -Severity SchemaEventClass::getSeverity() const { return impl->getSeverity(); } int SchemaEventClass::getArgumentCount() const { return impl->getArgumentCount(); } const SchemaArgument* SchemaEventClass::getArgument(int idx) const { return impl->getArgument(idx); } diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h index 7be757ee8d..a26bc07f6d 100644 --- a/qpid/cpp/src/qmf/engine/SchemaImpl.h +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h @@ -21,9 +21,10 @@ */ #include "qmf/engine/Schema.h" +#include <string.h> #include <string> #include <vector> -#include <qpid/framing/Buffer.h> +#include <exception> namespace qmf { namespace engine { @@ -32,20 +33,33 @@ namespace engine { // TODO: Add "frozen" attribute for schema classes so they can't be modified after // they've been registered. + typedef qpid::messaging::VariantType Typecode; + + class SchemaException : public std::exception { + public: + SchemaException(const std::string& context, const std::string& expected) { + text = context + ": Expected item with key " + expected; + } + virtual ~SchemaException() throw(); + virtual const char* what() const throw() { return text.c_str(); } + + private: + std::string text; + }; + class SchemaHash { uint8_t hash[16]; public: SchemaHash(); - void encode(qpid::framing::Buffer& buffer) const; - void decode(qpid::framing::Buffer& buffer); void update(const char* data, uint32_t len); void update(uint8_t data); void update(const std::string& data) { update(data.c_str(), data.size()); } - void update(Typecode t) { update((uint8_t) t); } void update(Direction d) { update((uint8_t) d); } void update(Access a) { update((uint8_t) a); } + void update(Typecode a) { update((uint8_t) a); } void update(bool b) { update((uint8_t) (b ? 1 : 0)); } const uint8_t* get() const { return hash; } + void set(const uint8_t* val) { ::memcpy(hash, val, 16); } bool operator==(const SchemaHash& other) const; bool operator<(const SchemaHash& other) const; bool operator>(const SchemaHash& other) const; @@ -59,9 +73,9 @@ namespace engine { std::string description; SchemaArgumentImpl(const char* n, Typecode t) : name(n), typecode(t), dir(DIR_IN) {} - SchemaArgumentImpl(qpid::framing::Buffer& buffer); - static SchemaArgument* factory(qpid::framing::Buffer& buffer); - void encode(qpid::framing::Buffer& buffer) const; + SchemaArgumentImpl(const qpid::messaging::Variant::Map& map); + static SchemaArgument* factory(qpid::messaging::Variant::Map& map); + qpid::messaging::Variant::Map asMap() const; void setDirection(Direction d) { dir = d; } void setUnit(const char* val) { unit = val; } void setDesc(const char* desc) { description = desc; } @@ -79,9 +93,9 @@ namespace engine { std::vector<const SchemaArgument*> arguments; SchemaMethodImpl(const char* n) : name(n) {} - SchemaMethodImpl(qpid::framing::Buffer& buffer); - static SchemaMethod* factory(qpid::framing::Buffer& buffer); - void encode(qpid::framing::Buffer& buffer) const; + SchemaMethodImpl(const qpid::messaging::Variant::Map& map); + static SchemaMethod* factory(qpid::messaging::Variant::Map& map); + qpid::messaging::Variant::Map asMap() const; void addArgument(const SchemaArgument* argument); void setDesc(const char* desc) { description = desc; } const std::string& getName() const { return name; } @@ -101,9 +115,9 @@ namespace engine { std::string description; SchemaPropertyImpl(const char* n, Typecode t) : name(n), typecode(t), access(ACCESS_READ_ONLY), index(false), optional(false) {} - SchemaPropertyImpl(qpid::framing::Buffer& buffer); - static SchemaProperty* factory(qpid::framing::Buffer& buffer); - void encode(qpid::framing::Buffer& buffer) const; + SchemaPropertyImpl(const qpid::messaging::Variant::Map& map); + static SchemaProperty* factory(qpid::messaging::Variant::Map& map); + qpid::messaging::Variant::Map asMap() const; void setAccess(Access a) { access = a; } void setIndex(bool val) { index = val; } void setOptional(bool val) { optional = val; } @@ -119,25 +133,6 @@ namespace engine { void updateHash(SchemaHash& hash) const; }; - struct SchemaStatisticImpl { - std::string name; - Typecode typecode; - std::string unit; - std::string description; - - SchemaStatisticImpl(const char* n, Typecode t) : name(n), typecode(t) {} - SchemaStatisticImpl(qpid::framing::Buffer& buffer); - static SchemaStatistic* factory(qpid::framing::Buffer& buffer); - void encode(qpid::framing::Buffer& buffer) const; - void setUnit(const char* val) { unit = val; } - void setDesc(const char* desc) { description = desc; } - const std::string& getName() const { return name; } - Typecode getType() const { return typecode; } - const std::string& getUnit() const { return unit; } - const std::string& getDesc() const { return description; } - void updateHash(SchemaHash& hash) const; - }; - struct SchemaClassKeyImpl { const std::string& package; const std::string& name; @@ -151,15 +146,15 @@ namespace engine { SchemaHash hashContainer; SchemaClassKeyImpl(const std::string& package, const std::string& name, const SchemaHash& hash); - SchemaClassKeyImpl(qpid::framing::Buffer& buffer); + SchemaClassKeyImpl(const qpid::messaging::Variant::Map& map); static SchemaClassKey* factory(const std::string& package, const std::string& name, const SchemaHash& hash); - static SchemaClassKey* factory(qpid::framing::Buffer& buffer); + static SchemaClassKey* factory(qpid::messaging::Variant::Map& map); const std::string& getPackageName() const { return package; } const std::string& getClassName() const { return name; } const uint8_t* getHash() const { return hash.get(); } - void encode(qpid::framing::Buffer& buffer) const; + qpid::messaging::Variant::Map asMap() const; bool operator==(const SchemaClassKeyImpl& other) const; bool operator<(const SchemaClassKeyImpl& other) const; const std::string& str() const; @@ -172,25 +167,21 @@ namespace engine { mutable bool hasHash; std::auto_ptr<SchemaClassKey> classKey; std::vector<const SchemaProperty*> properties; - std::vector<const SchemaStatistic*> statistics; std::vector<const SchemaMethod*> methods; SchemaObjectClassImpl(const char* p, const char* n) : package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {} - SchemaObjectClassImpl(qpid::framing::Buffer& buffer); - static SchemaObjectClass* factory(qpid::framing::Buffer& buffer); + SchemaObjectClassImpl(const qpid::messaging::Variant::Map& map); + static SchemaObjectClass* factory(qpid::messaging::Variant::Map& map); - void encode(qpid::framing::Buffer& buffer) const; + qpid::messaging::Variant::Map asMap() const; void addProperty(const SchemaProperty* property); - void addStatistic(const SchemaStatistic* statistic); void addMethod(const SchemaMethod* method); const SchemaClassKey* getClassKey() const; int getPropertyCount() const { return properties.size(); } - int getStatisticCount() const { return statistics.size(); } int getMethodCount() const { return methods.size(); } const SchemaProperty* getProperty(int idx) const; - const SchemaStatistic* getStatistic(int idx) const; const SchemaMethod* getMethod(int idx) const; }; @@ -201,20 +192,18 @@ namespace engine { mutable bool hasHash; std::auto_ptr<SchemaClassKey> classKey; std::string description; - Severity severity; std::vector<const SchemaArgument*> arguments; - SchemaEventClassImpl(const char* p, const char* n, Severity sev) : - package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)), severity(sev) {} - SchemaEventClassImpl(qpid::framing::Buffer& buffer); - static SchemaEventClass* factory(qpid::framing::Buffer& buffer); + SchemaEventClassImpl(const char* p, const char* n) : + package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {} + SchemaEventClassImpl(const qpid::messaging::Variant::Map& map); + static SchemaEventClass* factory(qpid::messaging::Variant::Map& map); - void encode(qpid::framing::Buffer& buffer) const; + qpid::messaging::Variant::Map asMap() const; void addArgument(const SchemaArgument* argument); void setDesc(const char* desc) { description = desc; } const SchemaClassKey* getClassKey() const; - Severity getSeverity() const { return severity; } int getArgumentCount() const { return arguments.size(); } const SchemaArgument* getArgument(int idx) const; }; diff --git a/qpid/cpp/src/qmf/engine/ValueImpl.cpp b/qpid/cpp/src/qmf/engine/ValueImpl.cpp deleted file mode 100644 index b1c027520f..0000000000 --- a/qpid/cpp/src/qmf/engine/ValueImpl.cpp +++ /dev/null @@ -1,374 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/ValueImpl.h" -#include <qpid/framing/FieldTable.h> -#include <qpid/framing/FieldValue.h> - -using namespace std; -using namespace qmf::engine; -using qpid::framing::Buffer; -using qpid::framing::FieldTable; -using qpid::framing::FieldValue; - -ValueImpl::ValueImpl(Typecode t, Buffer& buf) : typecode(t) -{ - uint64_t first; - uint64_t second; - FieldTable ft; - - switch (typecode) { - case TYPE_UINT8 : value.u32 = (uint32_t) buf.getOctet(); break; - case TYPE_UINT16 : value.u32 = (uint32_t) buf.getShort(); break; - case TYPE_UINT32 : value.u32 = (uint32_t) buf.getLong(); break; - case TYPE_UINT64 : value.u64 = buf.getLongLong(); break; - case TYPE_SSTR : buf.getShortString(stringVal); break; - case TYPE_LSTR : buf.getMediumString(stringVal); break; - case TYPE_ABSTIME : value.s64 = buf.getLongLong(); break; - case TYPE_DELTATIME : value.u64 = buf.getLongLong(); break; - case TYPE_BOOL : value.boolVal = (buf.getOctet() != 0); break; - case TYPE_FLOAT : value.floatVal = buf.getFloat(); break; - case TYPE_DOUBLE : value.doubleVal = buf.getDouble(); break; - case TYPE_INT8 : value.s32 = (int32_t) ((int8_t) buf.getOctet()); break; - case TYPE_INT16 : value.s32 = (int32_t) ((int16_t) buf.getShort()); break; - case TYPE_INT32 : value.s32 = (int32_t) buf.getLong(); break; - case TYPE_INT64 : value.s64 = buf.getLongLong(); break; - case TYPE_UUID : buf.getBin128(value.uuidVal); break; - case TYPE_REF: - first = buf.getLongLong(); - second = buf.getLongLong(); - refVal.impl->setValue(first, second); - break; - - case TYPE_MAP: - ft.decode(buf); - initMap(ft); - break; - - case TYPE_LIST: - case TYPE_ARRAY: - case TYPE_OBJECT: - default: - break; - } -} - -ValueImpl::ValueImpl(Typecode t, Typecode at) : typecode(t), valid(false), arrayTypecode(at) -{ -} - -ValueImpl::ValueImpl(Typecode t) : typecode(t) -{ - ::memset(&value, 0, sizeof(value)); -} - -Value* ValueImpl::factory(Typecode t, Buffer& b) -{ - ValueImpl* impl(new ValueImpl(t, b)); - return new Value(impl); -} - -Value* ValueImpl::factory(Typecode t) -{ - ValueImpl* impl(new ValueImpl(t)); - return new Value(impl); -} - -ValueImpl::~ValueImpl() -{ -} - -void ValueImpl::initMap(const FieldTable& ft) -{ - for (FieldTable::ValueMap::const_iterator iter = ft.begin(); - iter != ft.end(); iter++) { - const string& name(iter->first); - const FieldValue& fvalue(*iter->second); - uint8_t amqType = fvalue.getType(); - - if (amqType == 0x32) { - Value* subval(new Value(TYPE_UINT64)); - subval->setUint64(fvalue.get<int64_t>()); - insert(name.c_str(), subval); - } else if ((amqType & 0xCF) == 0x02) { - Value* subval(new Value(TYPE_UINT32)); - switch (amqType) { - case 0x02 : subval->setUint(fvalue.get<int>()); break; - case 0x12 : subval->setUint(fvalue.get<int>()); break; - case 0x22 : subval->setUint(fvalue.get<int>()); break; - } - insert(name.c_str(), subval); - } else if ((amqType & 0xCF) == 0x01) { - Value* subval(new Value(TYPE_INT64)); - subval->setInt64(fvalue.get<int64_t>()); - insert(name.c_str(), subval); - } else if (amqType == 0x85 || amqType == 0x95) { - Value* subval(new Value(TYPE_LSTR)); - subval->setString(fvalue.get<string>().c_str()); - insert(name.c_str(), subval); - } else if (amqType == 0x23 || amqType == 0x33) { - Value* subval(new Value(TYPE_DOUBLE)); - subval->setDouble(fvalue.get<double>()); - insert(name.c_str(), subval); - } else { - FieldTable subFt; - bool valid = qpid::framing::getEncodedValue<FieldTable>(iter->second, subFt); - if (valid) { - Value* subval(new Value(TYPE_MAP)); - subval->impl->initMap(subFt); - insert(name.c_str(), subval); - } - } - } -} - -void ValueImpl::mapToFieldTable(FieldTable& ft) const -{ - FieldTable subFt; - - for (map<string, Value>::const_iterator iter = mapVal.begin(); - iter != mapVal.end(); iter++) { - const string& name(iter->first); - const Value& subval(iter->second); - - switch (subval.getType()) { - case TYPE_UINT8: - case TYPE_UINT16: - case TYPE_UINT32: - ft.setUInt64(name, (uint64_t) subval.asUint()); - break; - case TYPE_UINT64: - case TYPE_DELTATIME: - ft.setUInt64(name, subval.asUint64()); - break; - case TYPE_SSTR: - case TYPE_LSTR: - ft.setString(name, subval.asString()); - break; - case TYPE_INT64: - case TYPE_ABSTIME: - ft.setInt64(name, subval.asInt64()); - break; - case TYPE_BOOL: - ft.setInt(name, subval.asBool() ? 1 : 0); - break; - case TYPE_FLOAT: - ft.setFloat(name, subval.asFloat()); - break; - case TYPE_DOUBLE: - ft.setDouble(name, subval.asDouble()); - break; - case TYPE_INT8: - case TYPE_INT16: - case TYPE_INT32: - ft.setInt(name, subval.asInt()); - break; - case TYPE_MAP: - subFt.clear(); - subval.impl->mapToFieldTable(subFt); - ft.setTable(name, subFt); - break; - case TYPE_LIST: - case TYPE_ARRAY: - case TYPE_OBJECT: - case TYPE_UUID: - case TYPE_REF: - default: - break; - } - } - } - -void ValueImpl::encode(Buffer& buf) const -{ - FieldTable ft; - - switch (typecode) { - case TYPE_UINT8 : buf.putOctet((uint8_t) value.u32); break; - case TYPE_UINT16 : buf.putShort((uint16_t) value.u32); break; - case TYPE_UINT32 : buf.putLong(value.u32); break; - case TYPE_UINT64 : buf.putLongLong(value.u64); break; - case TYPE_SSTR : buf.putShortString(stringVal); break; - case TYPE_LSTR : buf.putMediumString(stringVal); break; - case TYPE_ABSTIME : buf.putLongLong(value.s64); break; - case TYPE_DELTATIME : buf.putLongLong(value.u64); break; - case TYPE_BOOL : buf.putOctet(value.boolVal ? 1 : 0); break; - case TYPE_FLOAT : buf.putFloat(value.floatVal); break; - case TYPE_DOUBLE : buf.putDouble(value.doubleVal); break; - case TYPE_INT8 : buf.putOctet((uint8_t) value.s32); break; - case TYPE_INT16 : buf.putShort((uint16_t) value.s32); break; - case TYPE_INT32 : buf.putLong(value.s32); break; - case TYPE_INT64 : buf.putLongLong(value.s64); break; - case TYPE_UUID : buf.putBin128(value.uuidVal); break; - case TYPE_REF : refVal.impl->encode(buf); break; - case TYPE_MAP: - mapToFieldTable(ft); - ft.encode(buf); - break; - case TYPE_LIST: - case TYPE_ARRAY: - case TYPE_OBJECT: - default: - break; - } -} - -bool ValueImpl::keyInMap(const char* key) const -{ - return typecode == TYPE_MAP && mapVal.count(key) > 0; -} - -Value* ValueImpl::byKey(const char* key) -{ - if (keyInMap(key)) { - map<string, Value>::iterator iter = mapVal.find(key); - if (iter != mapVal.end()) - return &iter->second; - } - return 0; -} - -const Value* ValueImpl::byKey(const char* key) const -{ - if (keyInMap(key)) { - map<string, Value>::const_iterator iter = mapVal.find(key); - if (iter != mapVal.end()) - return &iter->second; - } - return 0; -} - -void ValueImpl::deleteKey(const char* key) -{ - mapVal.erase(key); -} - -void ValueImpl::insert(const char* key, Value* val) -{ - pair<string, Value> entry(key, *val); - mapVal.insert(entry); -} - -const char* ValueImpl::key(uint32_t idx) const -{ - map<string, Value>::const_iterator iter = mapVal.begin(); - for (uint32_t i = 0; i < idx; i++) { - if (iter == mapVal.end()) - break; - iter++; - } - - if (iter == mapVal.end()) - return 0; - else - return iter->first.c_str(); -} - -Value* ValueImpl::listItem(uint32_t) -{ - return 0; -} - -void ValueImpl::appendToList(Value*) -{ -} - -void ValueImpl::deleteListItem(uint32_t) -{ -} - -Value* ValueImpl::arrayItem(uint32_t) -{ - return 0; -} - -void ValueImpl::appendToArray(Value*) -{ -} - -void ValueImpl::deleteArrayItem(uint32_t) -{ -} - - -//================================================================== -// Wrappers -//================================================================== - -Value::Value(const Value& from) : impl(new ValueImpl(*(from.impl))) {} -Value::Value(Typecode t, Typecode at) : impl(new ValueImpl(t, at)) {} -Value::Value(ValueImpl* i) : impl(i) {} -Value::~Value() { delete impl;} - -Typecode Value::getType() const { return impl->getType(); } -bool Value::isNull() const { return impl->isNull(); } -void Value::setNull() { impl->setNull(); } -bool Value::isObjectId() const { return impl->isObjectId(); } -const ObjectId& Value::asObjectId() const { return impl->asObjectId(); } -void Value::setObjectId(const ObjectId& oid) { impl->setObjectId(oid); } -bool Value::isUint() const { return impl->isUint(); } -uint32_t Value::asUint() const { return impl->asUint(); } -void Value::setUint(uint32_t val) { impl->setUint(val); } -bool Value::isInt() const { return impl->isInt(); } -int32_t Value::asInt() const { return impl->asInt(); } -void Value::setInt(int32_t val) { impl->setInt(val); } -bool Value::isUint64() const { return impl->isUint64(); } -uint64_t Value::asUint64() const { return impl->asUint64(); } -void Value::setUint64(uint64_t val) { impl->setUint64(val); } -bool Value::isInt64() const { return impl->isInt64(); } -int64_t Value::asInt64() const { return impl->asInt64(); } -void Value::setInt64(int64_t val) { impl->setInt64(val); } -bool Value::isString() const { return impl->isString(); } -const char* Value::asString() const { return impl->asString(); } -void Value::setString(const char* val) { impl->setString(val); } -bool Value::isBool() const { return impl->isBool(); } -bool Value::asBool() const { return impl->asBool(); } -void Value::setBool(bool val) { impl->setBool(val); } -bool Value::isFloat() const { return impl->isFloat(); } -float Value::asFloat() const { return impl->asFloat(); } -void Value::setFloat(float val) { impl->setFloat(val); } -bool Value::isDouble() const { return impl->isDouble(); } -double Value::asDouble() const { return impl->asDouble(); } -void Value::setDouble(double val) { impl->setDouble(val); } -bool Value::isUuid() const { return impl->isUuid(); } -const uint8_t* Value::asUuid() const { return impl->asUuid(); } -void Value::setUuid(const uint8_t* val) { impl->setUuid(val); } -bool Value::isObject() const { return impl->isObject(); } -const Object* Value::asObject() const { return impl->asObject(); } -void Value::setObject(Object* val) { impl->setObject(val); } -bool Value::isMap() const { return impl->isMap(); } -bool Value::keyInMap(const char* key) const { return impl->keyInMap(key); } -Value* Value::byKey(const char* key) { return impl->byKey(key); } -const Value* Value::byKey(const char* key) const { return impl->byKey(key); } -void Value::deleteKey(const char* key) { impl->deleteKey(key); } -void Value::insert(const char* key, Value* val) { impl->insert(key, val); } -uint32_t Value::keyCount() const { return impl->keyCount(); } -const char* Value::key(uint32_t idx) const { return impl->key(idx); } -bool Value::isList() const { return impl->isList(); } -uint32_t Value::listItemCount() const { return impl->listItemCount(); } -Value* Value::listItem(uint32_t idx) { return impl->listItem(idx); } -void Value::appendToList(Value* val) { impl->appendToList(val); } -void Value::deleteListItem(uint32_t idx) { impl->deleteListItem(idx); } -bool Value::isArray() const { return impl->isArray(); } -Typecode Value::arrayType() const { return impl->arrayType(); } -uint32_t Value::arrayItemCount() const { return impl->arrayItemCount(); } -Value* Value::arrayItem(uint32_t idx) { return impl->arrayItem(idx); } -void Value::appendToArray(Value* val) { impl->appendToArray(val); } -void Value::deleteArrayItem(uint32_t idx) { impl->deleteArrayItem(idx); } - diff --git a/qpid/cpp/src/qmf/engine/ValueImpl.h b/qpid/cpp/src/qmf/engine/ValueImpl.h deleted file mode 100644 index 84b0e768e6..0000000000 --- a/qpid/cpp/src/qmf/engine/ValueImpl.h +++ /dev/null @@ -1,160 +0,0 @@ -#ifndef _QmfEngineValueImpl_ -#define _QmfEngineValueImpl_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qmf/engine/Value.h> -#include <qmf/engine/ObjectIdImpl.h> -#include <qmf/engine/Object.h> -#include <qpid/framing/Buffer.h> -#include <string> -#include <string.h> -#include <map> -#include <vector> -#include <boost/shared_ptr.hpp> - -namespace qpid { -namespace framing { - class FieldTable; -} -} - -namespace qmf { -namespace engine { - - // TODO: set valid flag on all value settors - // TODO: add a modified flag and accessors - - struct ValueImpl { - const Typecode typecode; - bool valid; - - ObjectId refVal; - std::string stringVal; - std::auto_ptr<Object> objectVal; - std::map<std::string, Value> mapVal; - std::vector<Value> vectorVal; - Typecode arrayTypecode; - - union { - uint32_t u32; - uint64_t u64; - int32_t s32; - int64_t s64; - bool boolVal; - float floatVal; - double doubleVal; - uint8_t uuidVal[16]; - } value; - - ValueImpl(const ValueImpl& from) : - typecode(from.typecode), valid(from.valid), refVal(from.refVal), stringVal(from.stringVal), - objectVal(from.objectVal.get() ? new Object(*(from.objectVal)) : 0), - mapVal(from.mapVal), vectorVal(from.vectorVal), arrayTypecode(from.arrayTypecode), - value(from.value) {} - - ValueImpl(Typecode t, Typecode at); - ValueImpl(Typecode t, qpid::framing::Buffer& b); - ValueImpl(Typecode t); - static Value* factory(Typecode t, qpid::framing::Buffer& b); - static Value* factory(Typecode t); - ~ValueImpl(); - - void encode(qpid::framing::Buffer& b) const; - - Typecode getType() const { return typecode; } - bool isNull() const { return !valid; } - void setNull() { valid = false; } - - bool isObjectId() const { return typecode == TYPE_REF; } - const ObjectId& asObjectId() const { return refVal; } - void setObjectId(const ObjectId& o) { refVal = o; } // TODO - - bool isUint() const { return typecode >= TYPE_UINT8 && typecode <= TYPE_UINT32; } - uint32_t asUint() const { return value.u32; } - void setUint(uint32_t val) { value.u32 = val; } - - bool isInt() const { return typecode >= TYPE_INT8 && typecode <= TYPE_INT32; } - int32_t asInt() const { return value.s32; } - void setInt(int32_t val) { value.s32 = val; } - - bool isUint64() const { return typecode == TYPE_UINT64 || typecode == TYPE_DELTATIME; } - uint64_t asUint64() const { return value.u64; } - void setUint64(uint64_t val) { value.u64 = val; } - - bool isInt64() const { return typecode == TYPE_INT64 || typecode == TYPE_ABSTIME; } - int64_t asInt64() const { return value.s64; } - void setInt64(int64_t val) { value.s64 = val; } - - bool isString() const { return typecode == TYPE_SSTR || typecode == TYPE_LSTR; } - const char* asString() const { return stringVal.c_str(); } - void setString(const char* val) { stringVal = val; } - - bool isBool() const { return typecode == TYPE_BOOL; } - bool asBool() const { return value.boolVal; } - void setBool(bool val) { value.boolVal = val; } - - bool isFloat() const { return typecode == TYPE_FLOAT; } - float asFloat() const { return value.floatVal; } - void setFloat(float val) { value.floatVal = val; } - - bool isDouble() const { return typecode == TYPE_DOUBLE; } - double asDouble() const { return value.doubleVal; } - void setDouble(double val) { value.doubleVal = val; } - - bool isUuid() const { return typecode == TYPE_UUID; } - const uint8_t* asUuid() const { return value.uuidVal; } - void setUuid(const uint8_t* val) { ::memcpy(value.uuidVal, val, 16); } - - bool isObject() const { return typecode == TYPE_OBJECT; } - Object* asObject() const { return objectVal.get(); } - void setObject(Object* val) { objectVal.reset(val); } - - bool isMap() const { return typecode == TYPE_MAP; } - bool keyInMap(const char* key) const; - Value* byKey(const char* key); - const Value* byKey(const char* key) const; - void deleteKey(const char* key); - void insert(const char* key, Value* val); - uint32_t keyCount() const { return mapVal.size(); } - const char* key(uint32_t idx) const; - - bool isList() const { return typecode == TYPE_LIST; } - uint32_t listItemCount() const { return vectorVal.size(); } - Value* listItem(uint32_t idx); - void appendToList(Value* val); - void deleteListItem(uint32_t idx); - - bool isArray() const { return typecode == TYPE_ARRAY; } - Typecode arrayType() const { return arrayTypecode; } - uint32_t arrayItemCount() const { return vectorVal.size(); } - Value* arrayItem(uint32_t idx); - void appendToArray(Value* val); - void deleteArrayItem(uint32_t idx); - - private: - void mapToFieldTable(qpid::framing::FieldTable& ft) const; - void initMap(const qpid::framing::FieldTable& ft); - }; -} -} - -#endif - |